Skip to content

Commit

Permalink
make TaskDataReference do something
Browse files Browse the repository at this point in the history
  • Loading branch information
essweine committed Feb 15, 2024
1 parent ae3ed3e commit d1dade4
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 37 deletions.
10 changes: 5 additions & 5 deletions SpiffWorkflow/bpmn/specs/bpmn_task_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ def _update_hook(self, my_task):
if self.io_specification is not None and len(self.io_specification.data_inputs) > 0:
data = {}
for var in self.io_specification.data_inputs:
if var.bpmn_id not in my_task.data:
if not var.exists(my_task):
raise WorkflowDataException("Missing data input", task=my_task, data_input=var)
data[var.bpmn_id] = my_task.data[var.bpmn_id]
data[var.bpmn_id] = var.get(my_task)
my_task.data = data

return True
Expand All @@ -88,17 +88,17 @@ def _on_complete_hook(self, my_task):
if self.io_specification is not None and len(self.io_specification.data_outputs) > 0:
data = {}
for var in self.io_specification.data_outputs:
if var.bpmn_id not in my_task.data:
if not var.exists(my_task):
raise WorkflowDataException("Missing data ouput", task=my_task, data_output=var)
data[var.bpmn_id] = my_task.data[var.bpmn_id]
data[var.bpmn_id] = var.get(my_task)
my_task.data = data

for obj in self.data_output_associations:
obj.set(my_task)

for obj in self.data_input_associations:
# Remove the any copied input variables that might not have already been removed
my_task.data.pop(obj.bpmn_id, None)
obj.delete(my_task)

super()._on_complete_hook(my_task)

Expand Down
13 changes: 12 additions & 1 deletion SpiffWorkflow/bpmn/specs/data_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,18 @@ def set(self, my_task):
del my_task.data[self.bpmn_id]
data_log.info(f'Set workflow variable {self.bpmn_id}', extra=my_task.log_info())

def delete(self, my_task):
my_task.data.pop(self.bpmn_id, None)


class TaskDataReference(BpmnDataSpecification):
"""A representation of task data that can be used in a BPMN diagram"""
pass

def get(self, my_task):
return my_task.data.get(self.bpmn_id)

def set(self, my_task, value):
my_task.data[self.bpmn_id] = value

def exists(self, my_task):
return self.bpmn_id in my_task.data
56 changes: 28 additions & 28 deletions SpiffWorkflow/bpmn/specs/mixins/multiinstance_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,12 +149,12 @@ def child_completed_action(self, my_task, child):
"""This merges child data into this task's data."""

if self.data_output is not None and self.output_item is not None:
if self.output_item.bpmn_id not in child.data:
if not self.output_item.exists(child):
self.raise_data_exception("Expected an output item", child)
item = child.data[self.output_item.bpmn_id]
item = self.output_item.get(child)
key_or_index = child.internal_data.get('key_or_index')
data_output = my_task.data[self.data_output.bpmn_id]
data_input = my_task.data[self.data_input.bpmn_id] if self.data_input is not None else None
data_output = self.data_output.get(my_task)
data_input = self.data_input.get(my_task) if self.data_input is not None else None
if key_or_index is not None and (isinstance(data_output, Mapping) or data_input is data_output):
data_output[key_or_index] = item
else:
Expand All @@ -167,12 +167,12 @@ def create_child(self, my_task, item, key_or_index=None):
task_spec = my_task.workflow.spec.task_specs[self.task_spec]
child = my_task._add_child(task_spec, TaskState.WAITING)
child.triggered = True
if self.input_item is not None and self.input_item.bpmn_id in my_task.data:
if self.input_item is not None and self.input_item.exists(my_task):
raise WorkflowDataException(f'Multiinstance input item {self.input_item.bpmn_id} already exists.', my_task)
if self.output_item is not None and self.output_item.bpmn_id in my_task.data:
if self.output_item is not None and self.output_item.exists(my_task):
raise WorkflowDataException(f'Multiinstance output item {self.output_item.bpmn_id} already exists.', my_task)
if self.input_item is not None:
child.data[self.input_item.bpmn_id] = deepcopy(item)
if self.input_item is not None:
self.input_item.set(child, deepcopy(item))
if key_or_index is not None:
child.internal_data['key_or_index'] = key_or_index
else:
Expand All @@ -188,19 +188,18 @@ def check_completion_condition(self, my_task):

def init_data_output_with_input_data(self, my_task, input_data):

name = self.data_output.bpmn_id
if name not in my_task.data:
if not self.data_output.exists(my_task):
if isinstance(input_data, (MutableMapping, MutableSequence)):
# We can use the same class if it implements __setitem__
my_task.data[name] = input_data.__class__()
self.data_output.set(my_task, input_data.__class__())
elif isinstance(input_data, Mapping):
# If we have a map without __setitem__, use a dict
my_task.data[name] = dict()
self.data_output.set(my_task, dict())
else:
# For all other types, we'll append to a list
my_task.data[name] = list()
self.data_output.set(my_task, list())
else:
output_data = my_task.data[self.data_output.bpmn_id]
output_data = self.data_output.get(my_task)
if not isinstance(output_data, (MutableSequence, MutableMapping)):
self.raise_data_exception("Only a mutable map (dict) or sequence (list) can be used for output", my_task)
if input_data is not output_data and not isinstance(output_data, Mapping) and len(output_data) > 0:
Expand All @@ -209,14 +208,15 @@ def init_data_output_with_input_data(self, my_task, input_data):

def init_data_output_with_cardinality(self, my_task):

name = self.data_output.bpmn_id
if name not in my_task.data:
my_task.data[name] = list()
elif not isinstance(my_task.data[name], MutableMapping) and len(my_task.data[name]) > 0:
self.raise_data_exception(
"If loop cardinality is specificied, the output must be a map (dict) or empty sequence (list)",
my_task
)
if not self.data_output.exists(my_task):
self.data_output.set(my_task, list())
else:
data_output = self.data_output.get(my_task)
if not isinstance(data_ouput, MutableMapping) and len(data_output) > 0:
self.raise_data_exception(
"If loop cardinality is specificied, the output must be a map (dict) or empty sequence (list)",
my_task
)

def raise_data_exception(self, message, my_task):
raise WorkflowDataException(message, my_task, data_input=self.data_input, data_output=self.data_output)
Expand Down Expand Up @@ -260,7 +260,7 @@ def add_next_child(self, my_task):

def get_next_input_item(self, my_task):

input_data = my_task.data[self.data_input.bpmn_id]
input_data = self.data_input.get(my_task)
remaining = my_task.internal_data.get('remaining')

if remaining is None:
Expand All @@ -282,9 +282,9 @@ def get_next_input_item(self, my_task):

def init_remaining_items(self, my_task):

if self.data_input.bpmn_id not in my_task.data:
if not self.data_input.exists(my_task):
self.raise_data_exception("Missing data input for multiinstance task", my_task)
input_data = my_task.data[self.data_input.bpmn_id]
input_data = self.data_input.get(my_task)

# This is internal bookkeeping, so we know where we are; we get the actual items when we create the task
if isinstance(input_data, Sequence):
Expand Down Expand Up @@ -340,8 +340,8 @@ def _update_hook(self, my_task):

def create_children(self, my_task):

data_input = my_task.data[self.data_input.bpmn_id] if self.data_input is not None else None
if data_input is not None:
if self.data_input is not None:
data_input = self.data_input.get(my_task)
# We have to preserve the key or index for maps/sequences, in case we're updating in place, or the output is a mapping
if isinstance(data_input, Mapping):
children = data_input.items()
Expand All @@ -359,7 +359,7 @@ def create_children(self, my_task):

if self.data_output is not None:
if self.data_input is not None:
self.init_data_output_with_input_data(my_task, my_task.data[self.data_input.bpmn_id])
self.init_data_output_with_input_data(my_task, self.data_input.get(my_task))
else:
self.init_data_output_with_cardinality(my_task)

Expand Down
6 changes: 3 additions & 3 deletions SpiffWorkflow/bpmn/specs/mixins/subworkflow_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,13 @@ def copy_data(self, my_task, subworkflow):
else:
# Otherwise copy only task data with the specified names
for var in subworkflow.spec.io_specification.data_inputs:
if var.bpmn_id not in my_task.data:
if not var.exists(my_task):
raise WorkflowDataException(
"You are missing a required Data Input for a call activity.",
task=my_task,
data_input=var,
)
start.data[var.bpmn_id] = my_task.data[var.bpmn_id]
var.set(start, var.get(my_task))

def update_data(self, my_task, subworkflow):

Expand All @@ -119,7 +119,7 @@ def update_data(self, my_task, subworkflow):
task=my_task,
data_output=var,
)
my_task.data[var.bpmn_id] = end.data[var.bpmn_id]
var.set(my_task, var.get(end))


class TransactionSubprocess(SubWorkflowTask):
Expand Down
3 changes: 3 additions & 0 deletions tests/SpiffWorkflow/bpmn/BpmnLoaderForTests.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ def set(self, my_task):
TestDataStore._value = my_task.data[self.bpmn_id]
del my_task.data[self.bpmn_id]

def delete(self, my_task):
del my_task.data[self.bpmn_id]

class TestDataStoreConverter(BpmnConverter):

def to_dict(self, spec):
Expand Down

0 comments on commit d1dade4

Please sign in to comment.