Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Liu 368 add services to allow persistence #253

Draft
wants to merge 12 commits into
base: LIU-368
Choose a base branch
from
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ dist
.pytest_cache
.vscode
.venv
venv
*.code-workspace
daliuge-translator/dlg/translator/dropmake/web/VERSION
daliuge-translator/dlg/translator/dropmake/web/LICENSE
Expand Down
15 changes: 15 additions & 0 deletions daliuge-common/dlg/common/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,18 @@


class CategoryType(str, Enum):
"""
CategoryType provides specific rules to the translation and usability of a
given drop.

The relationship between Drop CategoryType and Categories is
one-to-many: there will be many categories to a given Category Type.

New components will potentially introduce a new Category of drop to
DALiuGE/EAGLE, but will need to select from an existing CategoryType
supported by the engine.
"""

DATA = "Data"
APPLICATION = "Application"
CONSTRUCT = "Construct"
Expand Down Expand Up @@ -183,6 +195,9 @@ def get_roots(pg_spec):
dsc = _sanitize_links(dropspec["streamingConsumers"])
nonroots |= set(dsc)

# elif == CategoryType.SERVICE:
# nonroots = all_oids

return all_oids - nonroots


Expand Down
1 change: 1 addition & 0 deletions daliuge-engine/dlg/apps/bash_shell_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ def execute(self, data):
# @details An application component able to run an arbitrary command within the Bash Shell
# @par EAGLE_START
# @param category BashShellApp
# @param categorytype Application
# @param tag template
# @param command /String/ComponentParameter/NoPort/ReadWrite//False/False/The command to be executed
# @param input_redirection /String/ComponentParameter/NoPort/ReadWrite//False/False/The command line argument that specifies the input into this application
Expand Down
1 change: 1 addition & 0 deletions daliuge-engine/dlg/apps/branch.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
# @details A conditional branch to control flow
# @par EAGLE_START
# @param category Branch
# @param categorytype Control
# @param tag template
# @param dropclass dlg.apps.simple.SimpleBranch/String/ComponentParameter/NoPort/ReadOnly//False/False/Application class
# @param execution_time 5/Float/ConstraintParameter/NoPort/ReadOnly//False/False/Estimated execution time
Expand Down
8 changes: 8 additions & 0 deletions daliuge-engine/dlg/apps/constructs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
# @details A Scatter template drop
# @par EAGLE_START
# @param category Scatter
# @param categorytype Construct
# @param tag template
# @param num_of_copies 4/Integer/ConstructParameter/NoPort/ReadWrite//False/False/Specifies the number of replications of the content of the scatter construct
# @par EAGLE_END
Expand All @@ -22,6 +23,7 @@ class ScatterDrop(BarrierAppDROP):
# @details A Gather template drop
# @par EAGLE_START
# @param category Gather
# @param categorytype Construct
# @param tag template
# @param num_of_inputs 2/Integer/ConstructParameter/NoPort/ReadWrite//False/False/Number of inputs
# @param gather_axis 0/Integer/ApplicationArgument/NoPort/ReadWrite//False/False/Index of gather axis
Expand All @@ -39,6 +41,7 @@ class GatherDrop(BarrierAppDROP):
# @details A loop template drop
# @par EAGLE_START
# @param category Loop
# @param categorytype Construct
# @param tag template
# @param num_of_iter 2/Integer/ConstructParameter/NoPort/ReadWrite//False/False/Number of iterations
# @par EAGLE_END
Expand All @@ -55,6 +58,7 @@ class LoopDrop(BarrierAppDROP):
# @details A MKN template drop
# @par EAGLE_START
# @param category MKN
# @param categorytype Construct
# @param tag template
# @param k 1/Integer/ConstructParameter/NoPort/ReadWrite//False/False/Internal multiplicity
# @par EAGLE_END
Expand All @@ -71,6 +75,7 @@ class MKNDrop(BarrierAppDROP):
# @details A GroupBy template drop
# @par EAGLE_START
# @param category GroupBy
# @param categorytype Construct
# @param tag template
# @param num_of_inputs 2/Integer/ConstructParameter/NoPort/ReadWrite//False/False/Number of inputs
# @param gather_axis 0/Integer/ApplicationArgument/NoPort/ReadWrite//False/False/Index of gather axis
Expand All @@ -88,6 +93,7 @@ class GroupByDrop(BarrierAppDROP):
# @details A SubGraph template drop
# @par EAGLE_START
# @param category SubGraph
# @param categorytype Construct
# @param tag template
# @par EAGLE_END
class SubGraphDrop(BarrierAppDROP):
Expand All @@ -103,6 +109,7 @@ class SubGraphDrop(BarrierAppDROP):
# @details A comment template drop
# @par EAGLE_START
# @param category Comment
# @param categorytype Other
# @param dropclass dlg.apps.constructs.CommentDrop/String/ComponentParameter/NoPort/ReadWrite//False/False/Drop class
# @param tag template
# @par EAGLE_END
Expand All @@ -119,6 +126,7 @@ class CommentDrop(BarrierAppDROP):
# @details An Exclusive Force Node
# @par EAGLE_START
# @param category ExclusiveForceNode
# @param categorytype Control
# @param dropclass dlg.apps.constructs.ExclusiveForceDrop/String/ComponentParameter/NoPort/ReadWrite//False/False/Drop class
# @param tag template
# @par EAGLE_END
Expand Down
1 change: 1 addition & 0 deletions daliuge-engine/dlg/apps/dockerapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ def waitForIp(self, timeout=None):
# @details A component wrapping docker based applications.
# @par EAGLE_START
# @param category Docker
# @param categorytype Application
# @param tag template
# @param image /String/ComponentParameter/NoPort/ReadWrite//False/False/The name of the docker image to be used for this application
# @param docker_tag 1.0/String/ComponentParameter/NoPort/ReadWrite//False/False/The tag of the docker image to be used for this application
Expand Down
2 changes: 2 additions & 0 deletions daliuge-engine/dlg/apps/dynlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,7 @@ def generate_recompute_data(self):
# @details An application component run from a dynamic library
# @par EAGLE_START
# @param category DynlibApp
# @param categorytype Application
# @param tag template
# @param libpath /String/ComponentParameter/NoPort/ReadWrite//False/False/"The location of the shared object/DLL that implements this application"
# @param dropclass dlg.apps.dynlib.DynlibApp/String/ComponentParameter/NoPort/ReadWrite//False/False/Drop class
Expand Down Expand Up @@ -463,6 +464,7 @@ def get_from_subprocess(proc, q):
# @details An application component run from a dynamic library in a different process
# @par EAGLE_START
# @param category DynlibProcApp
# @param categorytype Application
# @param tag template
# @param libpath /String/ComponentParameter/NoPort/ReadWrite//False/False/"The location of the shared object/DLL that implements this application"
# @param execution_time 5/Float/ConstraintParameter/NoPort/ReadOnly//False/False/Estimated execution time
Expand Down
1 change: 1 addition & 0 deletions daliuge-engine/dlg/apps/mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
# @details An application component using the Message Passing Interface (MPI)
# @par EAGLE_START
# @param category Mpi
# @param categorytype Application
# @param tag template
# @param num_of_procs 1/Integer/ComponentParameter/NoPort/ReadWrite//False/False/Number of processes used for this application
# @param command /String/ComponentParameter/NoPort/ReadWrite//False/False/The command to be executed
Expand Down
2 changes: 2 additions & 0 deletions daliuge-engine/dlg/apps/pyfunc.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ def import_using_code(code):
# This is mainly useful (and used) when starting a new workflow from scratch.
# @par EAGLE_START
# @param category PythonMemberFunction
# @param categorytype Application
# @param tag daliuge
# @param func_name object.__init__/String/ComponentParameter/NoPort/ReadWrite//False/False/Python function name
# @param func_code /String/ComponentParameter/NoPort/ReadWrite//False/False/Python function code, e.g. 'def function_name(args): return args'
Expand Down Expand Up @@ -178,6 +179,7 @@ class PyMemberApp(BarrierAppDROP):
# being written to its corresponding output.
# @par EAGLE_START
# @param category PythonApp
# @param categorytype Application
# @param tag template
# @param func_name /String/ComponentParameter/NoPort/ReadWrite//False/False/Python function name
# @param func_code /String/ComponentParameter/NoPort/ReadWrite//False/False/Python function code, e.g. 'def function_name(args): return args'
Expand Down
2 changes: 2 additions & 0 deletions daliuge-engine/dlg/apps/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ def run(self):
# This is mainly useful (and used) when starting a new workflow from scratch.
# @par EAGLE_START
# @param category PythonApp
# @param categorytype Application
# @param tag template
# @param dropclass PythonApp/String/ComponentParameter/NoPort/ReadOnly//False/False/Application class
# @param num_cpus 1/Integer/ConstraintParameter/NoPort/ReadOnly//False/False/Number of cores used
Expand All @@ -94,6 +95,7 @@ class PythonApp(BarrierAppDROP):
# without executing real algorithms. Very useful for debugging.
# @par EAGLE_START
# @param category PythonApp
# @param categorytype Application
# @param tag daliuge
# @param sleep_time 5/Integer/ApplicationArgument/NoPort/ReadWrite//False/False/The number of seconds to sleep
# @param dropclass dlg.apps.simple.SleepApp/String/ComponentParameter/NoPort/ReadOnly//False/False/Application class
Expand Down
1 change: 1 addition & 0 deletions daliuge-engine/dlg/data/drops/data_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
# @details A generic Data drop, whose functionality can be provided by an arbitrary class, as specified in the 'dropclass' component parameter. It is not useful without additional development.
# @par EAGLE_START
# @param category Data
# @param categorytype Data
# @param tag template
# @param dropclass my.awesome.data.Component/String/ComponentParameter/NoPort/ReadOnly//False/False/The python class that implements this data component
# @param data_volume 5/Float/ConstraintParameter/NoPort/ReadWrite//False/False/Estimated size of the data contained in this node
Expand Down
2 changes: 1 addition & 1 deletion daliuge-engine/dlg/data/drops/directorycontainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

# TODO: This needs some more work
##
# @brief Directory
# @brief DirectoryContainer
# @details A ContainerDROP that represents a filesystem directory. It only allows
# FileDROPs and DirectoryContainers to be added as children. Children
# can only be added if they are placed directly within the directory
Expand Down
1 change: 1 addition & 0 deletions daliuge-engine/dlg/data/drops/environmentvar_drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def _filter_parameters(parameters: dict):
# @details A set of environment variables, wholly specified in EAGLE and accessible to all drops.
# @par EAGLE_START
# @param category EnvironmentVariables
# @param categorytype Data
# @param tag daliuge
# @param dropclass dlg.data.drops.environmentvar_drop.EnvironmentVarDROP/String/ComponentParameter/NoPort/ReadWrite//False/False/Drop class
# @param streaming False/Boolean/ComponentParameter/NoPort/ReadWrite//False/False/Specifies whether this data component streams input and output data
Expand Down
67 changes: 52 additions & 15 deletions daliuge-engine/dlg/data/drops/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,52 @@
# @details A standard file on a filesystem mounted to the deployment machine
# @par EAGLE_START
# @param category File
# @param categoryType Data
# @param tag daliuge
# @param filepath /String/ApplicationArgument/NoPort/ReadWrite//False/False/"File path for this file. In many cases this does not need to be specified. If it has a \/ at the end it will be treated as a directory name and the filename will be generated. If it does not have a \/, the last part will be treated as a filename. If filepath does not start with \/ (relative path) then the session directory will be prepended to make the path absolute.""
# @param check_filepath_exists False/Boolean/ComponentParameter/NoPort/ReadWrite//False/False/Perform a check to make sure the file path exists before proceeding with the application
# @param dropclass dlg.data.drops.file.FileDROP/String/ComponentParameter/NoPort/ReadWrite//False/False/Drop class
# @param streaming False/Boolean/ComponentParameter/NoPort/ReadWrite//False/False/Specifies whether this data component streams input and output data
# @param persist True/Boolean/ComponentParameter/NoPort/ReadWrite//False/False/Specifies whether this data component contains data that should not be deleted after execution
# @param expireAfterUse True/Boolean/ComponentParameter/NoPort/ReadWrite//False/False/Specifies whether this data component contains data that should not be deleted after execution
# @param data_volume 5/Float/ConstraintParameter/NoPort/ReadWrite//False/False/Estimated size of the data contained in this node
# @param group_end False/Boolean/ComponentParameter/NoPort/ReadWrite//False/False/Is this node the end of a group?
# @param dummy /Object/ApplicationArgument/InputOutput/ReadWrite//False/False/Dummy port
# @param filepath
# /String/ApplicationArgument/NoPort/ReadWrite//False/False/
# File path for this file. In many cases this does not need to be specified.
# If it has a \/ at the end it will be treated as a directory name and the
# filename will be generated. If it does not have a \/, the last part will
# be treated as a filename. If filepath does not start with \/ (relative
# path) then the session directory will be prepended to make the path
# absolute.
# @param check_filepath_exists
# False/Boolean/ComponentParameter/NoPort/ReadWrite//False/False/
# Perform a check to make sure the file path exists before proceeding with
# the application
# @param dropclass
# dlg.data.drops.file.FileDROP/String/ComponentParameter/NoPort/ReadWrite//False/False/
# Drop class
# @param streaming
# False/Boolean/ComponentParameter/NoPort/ReadWrite//False/False/
# Specifies whether this data component streams input and output data
# @param persist
# False/Boolean/ComponentParameter/NoPort/ReadWrite
# //False/False/
# Specifies whether this data component contains data that should not be
# deleted after execution
# @param persistentStoreType
# directory/Select/ComponentParameter/NoPort/ReadWrite
# /directory,filesystem
# //False/False/
# Specifies where the data will persist after drop has completed
# @param persistentStorePath
# String/ComponentParameter/NoPort/ReadWrite//False/False/
# Where in the persistent storage location we want to persist the data
# @param expireAfterUse
# False/Boolean/ComponentParameter/NoPort/ReadWrite//False/False/
# Specifies whether this data component contains data that should not be
# deleted after execution
# @param data_volume
# 5/Float/ConstraintParameter/NoPort/ReadWrite//False/False/
# Estimated size of the data contained in this node
# @param group_end
# False/Boolean/ComponentParameter/NoPort/ReadWrite//False/False/
# Is this node the end of a group?
# @param dummy
# /Object/ApplicationArgument/InputOutput/ReadWrite//False/False/
# Dummy port
# @par EAGLE_END
class FileDROP(DataDROP, PathBasedDrop):
"""
Expand All @@ -73,15 +109,16 @@ class FileDROP(DataDROP, PathBasedDrop):

def __init__(self, *args, **kwargs):
"""
Initialise default drop behaviour when it is completed with the following rules:

- "expireAfterUse": Remove the data from the workspace once it has been used
by all consumers. This is independent of the "persist" flag. This is false
by default for FileDrops.
Initialise default drop completion behaviour:

- "expireAfterUse": Remove the data from the workspace once it has
been used by all consumers. This is independent of the "persist"
flag. This is false by default for FileDrops.
- Ensure we do not set expireAfterUse in the event 'lifespan' has
been specified in the Drop arguments, as the two are mutually
exclusive.
"""

# 'lifespan' and 'expireAfterUse' are mutually exclusive
if "lifespan" not in kwargs and "expireAfterUse" not in kwargs:
kwargs["expireAfterUse"] = False
self.is_dir = False
Expand Down
2 changes: 2 additions & 0 deletions daliuge-engine/dlg/data/drops/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ def generate_reproduce_data(self):
# @details A Python object stored in memory
# @par EAGLE_START
# @param category PythonObject
# @param categorytype Data
# @param tag daliuge
# @param object /Object/ApplicationArgument/InputOutput/ReadWrite//False/False/object
# @param persist False/Boolean/ComponentParameter/NoPort/ReadWrite//False/False/Object should be serialized
Expand All @@ -186,6 +187,7 @@ class PythonObjectDROP(InMemoryDROP):
# @details Data stored in shared memory
# @par EAGLE_START
# @param category SharedMemory
# @param categorytype Data
# @param tag daliuge
# @param pydata None/String/ApplicationArgument/NoPort/ReadWrite//False/False/Data to be loaded into memory
# @param dummy /Object/ApplicationArgument/InputOutput/ReadWrite//False/False/Dummy port
Expand Down
1 change: 1 addition & 0 deletions daliuge-engine/dlg/data/drops/ngas.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
# @details An archive on the Next Generation Archive System (NGAS).
# @par EAGLE_START
# @param category NGAS
# @param categorytype Data
# @param tag daliuge
# @param ngasSrv localhost/String/ComponentParameter/NoPort/ReadWrite//False/False/The URL of the NGAS Server
# @param ngasPort 7777/Integer/ComponentParameter/NoPort/ReadWrite//False/False/The port of the NGAS Server
Expand Down
1 change: 1 addition & 0 deletions daliuge-engine/dlg/data/drops/parset_drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
# @details A set of parameters, which can be set and modified in EAGLE and thus is part of the graph. Multiple serialisation formats are available.
# @par EAGLE_START
# @param category ParameterSet
# @param categorytype Data
# @param tag daliuge
# @param data_volume 5/Float/ConstraintParameter/NoPort/ReadWrite//False/False/Estimated size of the data contained in this node
# @param group_end False/Boolean/ComponentParameter/NoPort/ReadWrite//False/False/Is this node the end of a group?
Expand Down
2 changes: 2 additions & 0 deletions daliuge-engine/dlg/data/drops/plasma.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
# @details An object in a Apache Arrow Plasma in-memory object store
# @par EAGLE_START
# @param category Plasma
# @param categorytype Service
# @param tag daliuge
# @param plasma_path /String/ApplicationArgument/NoPort/ReadWrite//False/False/Path to the local plasma store
# @param object_id /String/ApplicationArgument/NoPort/ReadWrite//False/False/PlasmaId of the object for all compute nodes
Expand Down Expand Up @@ -86,6 +87,7 @@ def dataURL(self) -> str:
# to a Plasma in-memory object store
# @par EAGLE_START
# @param category PlasmaFlight
# @param categorytype Service
# @param tag daliuge
# @param plasma_path /String/ApplicationArgument/NoPort/ReadWrite//False/False/Path to the local plasma store
# @param object_id /String/ApplicationArgument/NoPort/ReadWrite//False/False/PlasmaId of the object for all compute nodes
Expand Down
1 change: 1 addition & 0 deletions daliuge-engine/dlg/data/drops/rdbms.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
# @details A Drop allowing storage and retrieval from a SQL DB.
# @par EAGLE_START
# @param category RDBMS
# @param categorytype Service
# @param tag daliuge
# @param data_volume 5/Float/ConstraintParameter/NoPort/ReadWrite//False/False/Estimated size of the data contained in this node
# @param group_end False/Boolean/ComponentParameter/NoPort/ReadWrite//False/False/Is this node the end of a group?
Expand Down
1 change: 1 addition & 0 deletions daliuge-engine/dlg/data/drops/s3_drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
# @details An object available in a bucket on a S3 (Simple Storage Service) object storage platform
# @par EAGLE_START
# @param category S3
# @param categorytype Data
# @param tag daliuge
# @param data_volume 5/Float/ConstraintParameter/NoPort/ReadWrite//False/False/Estimated size of the data contained in this node
# @param group_end False/Boolean/ComponentParameter/NoPort/ReadWrite//False/False/Is this node the end of a group?
Expand Down
Loading
Loading