Skip to content

Commit

Permalink
Merge branch 'master' into pixi
Browse files Browse the repository at this point in the history
  • Loading branch information
jennydaman committed Aug 29, 2024
2 parents 2d5dd96 + adf9363 commit f2f1851
Show file tree
Hide file tree
Showing 11 changed files with 653 additions and 56 deletions.
76 changes: 75 additions & 1 deletion pfcon/base_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
"""

import logging
import os
import abc
import shutil
import shutil, errno


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -38,3 +39,76 @@ def delete_data(self, job_dir):
Delete job data from the local storage.
"""
shutil.rmtree(job_dir)

def process_chrislink_files(self, job_incoming_dir):
"""
Rearrange the local job incoming directory tree by creating folders that trace
the source dirs pointed by ChRIS link files.
"""
self.job_incoming_dir = job_incoming_dir
self._linked_paths = set()
self._nlinks = 0
self._already_copied_src_set = set()

self._process_chrislink_files(job_incoming_dir)

linked_path_top_folders = set()
for path in self._linked_paths:
linked_path_top_folders.add(path.split('/', 1)[0])

for folder in linked_path_top_folders:
if folder not in self._linked_paths:
self.deletesrc(os.path.join(job_incoming_dir, folder))

return self._nlinks

def _process_chrislink_files(self, dir):
"""
Recursively expand (substitute by actual folders) and remove ChRIS link files.
"""
for root, dirs, files in os.walk(dir):
for filename in files:
if filename.endswith('.chrislink'):
link_file_path = os.path.join(root, filename)

if not link_file_path.startswith(tuple(self._already_copied_src_set)): # only expand a link once
with open(link_file_path, 'rb') as f:
rel_path = f.read().decode().strip()
abs_path = os.path.join(self.job_incoming_dir, rel_path)

if os.path.isfile(abs_path):
rel_path = os.path.dirname(rel_path)
abs_path = os.path.dirname(abs_path)

source_trace_dir = rel_path.replace('/', '_')
dst_path = os.path.join(root, source_trace_dir)

if not os.path.isdir(dst_path): # only copy once to a dest path
self.copysrc(abs_path, dst_path)
self._already_copied_src_set.add(abs_path)
self._process_chrislink_files(dst_path) # recursive call

self._linked_paths.add(rel_path)

os.remove(link_file_path)
self._nlinks += 1

@staticmethod
def copysrc(src, dst):
try:
shutil.copytree(src, dst)
except OSError as e:
if e.errno in (errno.ENOTDIR, errno.EINVAL):
shutil.copy(src, dst)
else:
raise

@staticmethod
def deletesrc(src):
try:
shutil.rmtree(src)
except OSError as e:
if e.errno in (errno.ENOTDIR, errno.EINVAL):
os.remove(src)
else:
raise
2 changes: 1 addition & 1 deletion pfcon/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def __init__(self):

if self.PFCON_INNETWORK:
self.STORAGE_ENV = env('STORAGE_ENV', 'swift')
if self.STORAGE_ENV not in ('swift', 'filesystem'):
if self.STORAGE_ENV not in ('swift', 'filesystem', 'fslink'):
raise ValueError(f"Unsupported value '{self.STORAGE_ENV}' for STORAGE_ENV")
else:
self.STORAGE_ENV = env('STORAGE_ENV', 'zipfile')
Expand Down
8 changes: 5 additions & 3 deletions pfcon/filesystem_storage.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""
Handle filesystem-based (eg. mounted directory) storage. This is used when pfcon is
in-network and configured to directly access the data from a filesystem.
Handle filesystem-based storage. This is used when pfcon is in-network and configured
to directly access the data from a ChRIS shared filesystem. It assumes that both the
input (read-only)and the output (read-write) directories in the shared storage are
directly mounted into the plugin container.
"""

import logging
Expand All @@ -24,7 +26,7 @@ def __init__(self, config):

self.fs_mount_base_dir = config.get('STOREBASE_MOUNT')

def store_data(self, job_id, job_incoming_dir, data=None, **kwargs):
def store_data(self, job_id, job_incoming_dir, data, **kwargs):
"""
Count the number of files in the specified job incoming directory.
"""
Expand Down
114 changes: 114 additions & 0 deletions pfcon/fslink_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
"""
Handle filesystem-based storage. This is used when pfcon is in-network and configured
to directly access the data from a ChRIS shared filesystem. It only assumes that the
output (read-write) directory in the shared storage is directly mounted into the plugin
container. Unlike the 'filesystem' storage this supports ChRIS links.
"""

import logging
import datetime
import os
import shutil


from .filesystem_storage import FileSystemStorage


logger = logging.getLogger(__name__)


class FSLinkStorage(FileSystemStorage):

def store_data(self, job_id, job_incoming_dir, data, **kwargs):
"""
Copy all the files from the filesystem tree under each input folder (storage
prefix) in the specified data list into the specified job incoming directory.
"""
self.job_id = job_id
self.job_output_path = kwargs['job_output_path']

all_file_paths = set()

for storage_path in data:
storage_path = storage_path.strip('/')
file_paths = set()
visited_paths = set()

self._find_all_file_paths(storage_path, file_paths, visited_paths)

for f_path in file_paths:
if f_path not in all_file_paths: # copy a given file only once
fs_file_path = os.path.join(self.fs_mount_base_dir, f_path)

rel_file_path = f_path.replace(storage_path, '', 1).lstrip('/')
local_file_path = os.path.join(job_incoming_dir, rel_file_path)

try:
shutil.copy(fs_file_path, local_file_path)
except FileNotFoundError:
os.makedirs(os.path.dirname(local_file_path))
shutil.copy(fs_file_path, local_file_path)

all_file_paths.add(f_path)

nfiles = len(all_file_paths)
logger.info(f'{nfiles} files fetched from the filesystem for job {job_id}')

nlinks = self.process_chrislink_files(job_incoming_dir)
nfiles -= nlinks

return {
'jid': job_id,
'nfiles': nfiles,
'timestamp': f'{datetime.datetime.now()}',
'path': job_incoming_dir
}

def delete_data(self, job_dir):
"""
Delete job data from the local storage.
"""
shutil.rmtree(job_dir)

def _find_all_file_paths(self, storage_path, file_paths, visited_paths):
"""
Find all file paths under the passed storage path (prefix) by
recursively following ChRIS links. The resulting set of file paths is given
by the file_paths set argument.
"""
if not storage_path.startswith(tuple(visited_paths)): # avoid infinite loops
visited_paths.add(storage_path)
job_id = self.job_id
job_output_path = self.job_output_path
fs_abs_path = os.path.join(self.fs_mount_base_dir, storage_path)

l_ls = []
if os.path.isfile(fs_abs_path):
l_ls.append(fs_abs_path)
else:
for root, dirs, files in os.walk(fs_abs_path):
for filename in files:
l_ls.append(os.path.join(root, filename))

for abs_file_path in l_ls:
if abs_file_path.endswith('.chrislink'):
try:
with open(abs_file_path, 'rb') as f:
linked_path = f.read().decode().strip()
except Exception as e:
logger.error(f'Failed to read file {abs_file_path} for '
f'job {job_id}, detail: {str(e)}')
raise

if f'{job_output_path}/'.startswith(linked_path.rstrip('/') + '/'):
# link files are not allowed to point to the job output dir or
# any of its ancestors
logger.error(f'Found invalid input path {linked_path} for job '
f'{job_id} pointing to an ancestor of the job '
f'output dir: {job_output_path}')
raise ValueError(f'Invalid input path: {linked_path}')

self._find_all_file_paths(linked_path, file_paths,
visited_paths) # recursive call
file_paths.add(abs_file_path.replace(self.fs_mount_base_dir, '',
1).lstrip('/'))
53 changes: 40 additions & 13 deletions pfcon/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from .zip_file_storage import ZipFileStorage
from .swift_storage import SwiftStorage
from .filesystem_storage import FileSystemStorage
from .fslink_storage import FSLinkStorage


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -80,7 +81,7 @@ def post(self):
if self.pfcon_innetwork:
if args.input_dirs is None:
abort(400, message='input_dirs: field is required')
if self.storage_env == 'filesystem' and args.output_dir is None:
if args.output_dir is None:
abort(400, message='output_dir: field is required')
else:
if request.files['data_file'] is None:
Expand All @@ -99,31 +100,50 @@ def post(self):
input_dir = args.input_dirs[0].strip('/')
output_dir = args.output_dir.strip('/')
incoming_dir = os.path.join(self.storebase_mount, input_dir)

storage = FileSystemStorage(app.config)
try:
d_info = storage.store_data(job_id, incoming_dir)
d_info = storage.store_data(job_id, incoming_dir, None)
except Exception as e:
logger.error(f'Error while accessing files from shared filesystem '
f'for job {job_id}, detail: {str(e)}')
abort(400, message='input_dirs: Error accessing files from shared '
'filesystem')
abort(400,
message='input_dirs: Error accessing files from shared filesystem')
else:
incoming_dir = os.path.join(self.storebase_mount, input_dir)
outgoing_dir = os.path.join(self.storebase_mount, output_dir)
os.makedirs(incoming_dir, exist_ok=True)
os.makedirs(outgoing_dir, exist_ok=True)

if self.pfcon_innetwork:
if self.storage_env == 'swift':
outgoing_dir = os.path.join(self.storebase_mount, output_dir)
os.makedirs(outgoing_dir, exist_ok=True)

storage = SwiftStorage(app.config)
try:
d_info = storage.store_data(job_id, incoming_dir, args.input_dirs)
d_info = storage.store_data(job_id, incoming_dir, args.input_dirs,
job_output_path=args.output_dir.strip('/'))
except ClientException as e:
logger.error(f'Error while fetching files from swift and '
f'storing job {job_id} data, detail: {str(e)}')
abort(400, message='input_dirs: Error fetching files from swift')
abort(400,
message='input_dirs: Error fetching files from swift')

elif self.storage_env == 'fslink':
output_dir = args.output_dir.strip('/')
storage = FSLinkStorage(app.config)
try:
d_info = storage.store_data(job_id, incoming_dir, args.input_dirs,
job_output_path=output_dir)
except Exception as e:
logger.error(f'Error while accessing files from shared filesystem '
f'and storing job {job_id} data, detail: {str(e)}')
abort(400,
message='input_dirs: Error copying files from shared filesystem')
else:
if self.storage_env == 'zipfile':
outgoing_dir = os.path.join(self.storebase_mount, output_dir)
os.makedirs(outgoing_dir, exist_ok=True)

storage = ZipFileStorage(app.config)
data_file = request.files['data_file']
try:
Expand Down Expand Up @@ -187,11 +207,14 @@ def delete(self, job_id):
storage = None

if self.pfcon_innetwork:
if self.storage_env == 'swift':
if self.storage_env == 'filesystem':
storage = FileSystemStorage(app.config)

elif self.storage_env == 'swift':
storage = SwiftStorage(app.config)

elif self.storage_env == 'filesystem':
storage = FileSystemStorage(app.config)
elif self.storage_env == 'fslink':
storage = FSLinkStorage(app.config)
else:
if self.storage_env == 'zipfile':
storage = ZipFileStorage(app.config)
Expand Down Expand Up @@ -229,7 +252,7 @@ def get(self, job_id):
download_name = f'{job_id}.zip'
mimetype = 'application/zip'

if self.pfcon_innetwork and self.storage_env == 'filesystem':
if self.pfcon_innetwork and self.storage_env in ('filesystem', 'fslink'):
job_output_path = request.args.get('job_output_path')
if not job_output_path:
abort(400, message='job_output_path: query parameter is required')
Expand All @@ -239,7 +262,11 @@ def get(self, job_id):
if not os.path.isdir(outgoing_dir):
abort(404)

storage = FileSystemStorage(app.config)
if self.storage_env == 'filesystem':
storage = FileSystemStorage(app.config)
else:
storage = FSLinkStorage(app.config)

content = storage.get_data(job_id, outgoing_dir,
job_output_path=job_output_path)
download_name = f'{job_id}.json'
Expand Down
Loading

0 comments on commit f2f1851

Please sign in to comment.