Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvement/better subworkflow management #339

Merged
merged 7 commits into from
Jun 13, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions SpiffWorkflow/bpmn/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,12 @@ def __init__(self, error_msg, task=None, exception=None, line_number=None, offse
@staticmethod
def get_task_trace(task):
task_trace = [f"{task.task_spec.bpmn_name} ({task.workflow.spec.file})"]
workflow = task.workflow
while workflow != workflow.outer_workflow:
caller = workflow.name
workflow = workflow.outer_workflow
task_trace.append(f"{workflow.spec.task_specs[caller].bpmn_name} ({workflow.spec.file})")
top = task.workflow.top_workflow
parent = None if task.workflow is top else task.workflow.parent_workflow
while parent is not None:
caller = parent.get_task_from_id(task.workflow.parent)
task_trace.append(f"{caller.task_spec.bpmn_name} ({parent.spec.file})")
parent = None if caller.workflow is top else caller.workflow.parent_workflow
return task_trace

@staticmethod
Expand Down
24 changes: 15 additions & 9 deletions SpiffWorkflow/bpmn/serializer/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from uuid import UUID

from SpiffWorkflow.task import Task
from SpiffWorkflow.bpmn.workflow import BpmnMessage, BpmnWorkflow
from SpiffWorkflow.bpmn.workflow import BpmnMessage, BpmnWorkflow, BpmnSubWorkflow
from SpiffWorkflow.bpmn.specs.mixins.subworkflow_task import SubWorkflowTask

from .migration.version_migration import MIGRATIONS
Expand Down Expand Up @@ -104,20 +104,22 @@ def configure_workflow_spec_converter(spec_config=None, registry=None):
cls(spec_converter)
return spec_converter

def __init__(self, spec_converter=None, data_converter=None, wf_class=None, version=VERSION,
def __init__(self, spec_converter=None, data_converter=None, wf_class=None, sub_wf_class=None, version=VERSION,
json_encoder_cls=DEFAULT_JSON_ENCODER_CLS, json_decoder_cls=DEFAULT_JSON_DECODER_CLS):
"""Intializes a Workflow Serializer with the given Workflow, Task and Data Converters.

:param spec_converter: the workflow spec converter
:param data_converter: the data converter
:param wf_class: the workflow class
:param sub_wf_class: the subworkflow class
:param json_encoder_cls: JSON encoder class to be used for dumps/dump operations
:param json_decoder_cls: JSON decoder class to be used for loads/load operations
"""
super().__init__()
self.spec_converter = spec_converter if spec_converter is not None else self.configure_workflow_spec_converter()
self.data_converter = data_converter if data_converter is not None else DefaultRegistry()
self.wf_class = wf_class if wf_class is not None else BpmnWorkflow
self.sub_wf_class = sub_wf_class if sub_wf_class is not None else BpmnSubWorkflow
self.json_encoder_cls = json_encoder_cls
self.json_decoder_cls = json_decoder_cls
self.VERSION = version
Expand Down Expand Up @@ -166,17 +168,14 @@ def workflow_to_dict(self, workflow):
"""
# These properties are applicable to top level & subprocesses
dct = self.process_to_dict(workflow)
# These are only used at the top-level
dct['spec'] = self.spec_converter.convert(workflow.spec)
# These are only used at the top-level
dct['subprocess_specs'] = dict(
(name, self.spec_converter.convert(spec)) for name, spec in workflow.subprocess_specs.items()
)
dct['subprocesses'] = dict(
(str(task_id), self.process_to_dict(sp)) for task_id, sp in workflow.subprocesses.items()
(str(task_id), self.subworkflow_to_dict(sp)) for task_id, sp in workflow.subprocesses.items()
)
dct['bpmn_messages'] = [self.message_to_dict(msg) for msg in workflow.bpmn_messages]

dct['correlations'] = workflow.correlations
return dct

def workflow_from_dict(self, dct):
Expand Down Expand Up @@ -216,6 +215,11 @@ def workflow_from_dict(self, dct):

return workflow

def subworkflow_to_dict(self, workflow):
dct = self.process_to_dict(workflow)
dct['parent'] = str(workflow.parent)
return dct

def task_to_dict(self, task):
return {
'id': str(task.id),
Expand All @@ -225,7 +229,6 @@ def task_to_dict(self, task):
'state': task.state,
'task_spec': task.task_spec.name,
'triggered': task.triggered,
'workflow_name': task.workflow.name,
'internal_data': self.data_converter.convert(task.internal_data),
'data': self.data_converter.convert(task.data),
}
Expand Down Expand Up @@ -265,14 +268,15 @@ def task_tree_from_dict(self, process_dct, task_id, parent_task, process, top_le

if isinstance(task_spec, SubWorkflowTask) and task_id in top_dct.get('subprocesses', {}):
subprocess_spec = top.subprocess_specs[task_spec.spec]
subprocess = self.wf_class(subprocess_spec, {}, name=task_spec.name, parent=process, deserializing=True)
subprocess = self.sub_wf_class(subprocess_spec, task.id, top_level_workflow, deserializing=True)
subprocess_dct = top_dct['subprocesses'].get(task_id, {})
subprocess.spec.data_objects.update(process.spec.data_objects)
if len(subprocess.spec.data_objects) > 0:
subprocess.data = process.data
else:
subprocess.data = self.data_converter.restore(subprocess_dct.pop('data'))
subprocess.success = subprocess_dct.pop('success')
subprocess.correlations = subprocess_dct.pop('correlations', {})
subprocess.task_tree = self.task_tree_from_dict(subprocess_dct, subprocess_dct.pop('root'), None, subprocess, top, top_dct)
subprocess.completed_event.connect(task_spec._on_subworkflow_completed, task)
top_level_workflow.subprocesses[task.id] = subprocess
Expand All @@ -288,7 +292,9 @@ def task_tree_from_dict(self, process_dct, task_id, parent_task, process, top_le

def process_to_dict(self, process):
return {
'spec': self.spec_converter.convert(process.spec),
'data': self.data_converter.convert(process.data),
'correlations': process.correlations,
'last_task': str(process.last_task.id) if process.last_task is not None else None,
'success': process.success,
'tasks': self.task_tree_to_dict(process.task_tree),
Expand Down
20 changes: 8 additions & 12 deletions SpiffWorkflow/bpmn/specs/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,6 @@ def __init__(self, wf_spec, name, main_child_task_spec, **kwargs):
super(_BoundaryEventParent, self).__init__(wf_spec, name, **kwargs)
self.main_child_task_spec = main_child_task_spec

@property
def spec_type(self):
return 'Boundary Event Parent'

def _run_hook(self, my_task):
# Clear any events that our children might have received and wait for new events
for child in my_task.children:
Expand All @@ -57,11 +53,15 @@ def _run_hook(self, my_task):
def _child_complete_hook(self, child_task):
# If the main child completes, or a cancelling event occurs, cancel any unfinished children
if child_task.task_spec == self.main_child_task_spec or child_task.task_spec.cancel_activity:
for sibling in child_task.parent.children:
if sibling == child_task:
for task in child_task.parent.children:
if task == child_task:
continue
if task.task_spec != self.main_child_task_spec and task._has_state(TaskState.READY):
# Don't cancel boundary events that became ready while this task was running
# Not clear that this is really the appropriate behavior but we have tests that depend on it
continue
if sibling.task_spec == self.main_child_task_spec or not sibling._is_finished():
sibling.cancel()
if task.task_spec == self.main_child_task_spec or not task._is_finished():
task.cancel()

def _predict_hook(self, my_task):
# Events attached to the main task might occur
Expand Down Expand Up @@ -89,10 +89,6 @@ def _check_threshold_unstructured(self, my_task, force=False):
w = task.workflow
if w == my_task.workflow:
is_mine = True
while w and w.outer_workflow != w:
w = w.outer_workflow
if w == my_task.workflow:
is_mine = True
if is_mine:
waiting_tasks.append(task)

Expand Down
83 changes: 27 additions & 56 deletions SpiffWorkflow/bpmn/specs/event_definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,37 +48,28 @@ def __init__(self, description=None):
self.internal, self.external = True, True
self.description = description

@property
def event_type(self):
return f'{self.__class__.__module__}.{self.__class__.__name__}'

def has_fired(self, my_task):
return my_task._get_internal_data('event_fired', False)

def catch(self, my_task, event_definition=None):
my_task._set_internal_data(event_fired=True)

def throw(self, my_task):
self._throw(
event=my_task.task_spec.event_definition,
workflow=my_task.workflow,
outer_workflow=my_task.workflow.outer_workflow
)
self._throw(my_task)

def reset(self, my_task):
my_task._set_internal_data(event_fired=False)

def _throw(self, event, workflow, outer_workflow, correlations=None):
# This method exists because usually we just want to send the event in our
# own task spec, but we can't do that for message events.
# We also don't have a more sophisticated method for addressing events to
# a particular process, but this at least provides a mechanism for distinguishing
# between processes and subprocesses.
if self.external and outer_workflow != workflow:
top = workflow._get_outermost_workflow()
top.catch(event, correlations)
def _throw(self, my_task, **kwargs):
if 'target' in kwargs:
target = kwargs.pop('target')
elif self.internal and not self.external:
target = my_task.workflow
else:
workflow.catch(event)
target = None

event_definition = kwargs.pop('event', my_task.task_spec.event_definition)
my_task.workflow.top_workflow.catch(event_definition, target, **kwargs)

def __eq__(self, other):
return self.__class__.__name__ == other.__class__.__name__
Expand All @@ -91,7 +82,6 @@ class NamedEventDefinition(EventDefinition):

:param name: the name of this event
"""

def __init__(self, name, **kwargs):
super(NamedEventDefinition, self).__init__(**kwargs)
self.name = name
Expand All @@ -112,6 +102,8 @@ def __init__(self, **kwargs):
super(CancelEventDefinition, self).__init__(**kwargs)
self.internal = False

def _throw(self, my_task, **kwargs):
return super()._throw(my_task, target=my_task.workflow.parent_workflow)

class ErrorEventDefinition(NamedEventDefinition):
"""
Expand Down Expand Up @@ -151,10 +143,11 @@ def __eq__(self, other):
class CorrelationProperty:
"""Rules for generating a correlation key when a message is sent or received."""

def __init__(self, name, retrieval_expression, correlation_keys, expected_value=None):
self.name = name # This is the property name
self.retrieval_expression = retrieval_expression # This is how it's generated
self.correlation_keys = correlation_keys # These are the keys it's used by
def __init__(self, name, retrieval_expression, correlation_keys):
self.name = name # This is the property name
self.retrieval_expression = retrieval_expression # This is how it's generated
self.correlation_keys = correlation_keys # These are the keys it's used by


class MessageEventDefinition(NamedEventDefinition):
"""The default message event."""
Expand All @@ -179,7 +172,7 @@ def throw(self, my_task):
event.payload = deepcopy(my_task.data)
correlations = self.get_correlations(my_task, event.payload)
my_task.workflow.correlations.update(correlations)
self._throw(event, my_task.workflow, my_task.workflow.outer_workflow, correlations)
self._throw(my_task, event=event, correlations=correlations)

def update_internal_data(self, my_task, event_definition):
my_task.internal_data[event_definition.name] = event_definition.payload
Expand All @@ -192,35 +185,17 @@ def update_task_data(self, my_task):
my_task.set_data(**payload)

def get_correlations(self, task, payload):
correlation_keys = {}
correlations = {}
for property in self.correlation_properties:
for key in property.correlation_keys:
if key not in correlation_keys:
correlation_keys[key] = {}
if key not in correlations:
correlations[key] = {}
try:
correlation_keys[key][property.name] = task.workflow.script_engine._evaluate(property.retrieval_expression, payload)
except WorkflowException as we:
we.add_note(
f"Failed to evaluate correlation property '{property.name}'"
f" invalid expression '{property.retrieval_expression}'")
we.task_spec = task.task_spec
raise we
return correlation_keys

def conversation(self):
"""An event may have many correlation properties, this figures out
which conversation exists across all of them, or return None if they
do not share a topic. """
conversation = None
if len(self.correlation_properties) > 0:
for prop in self.correlation_properties:
for key in prop.correlation_keys:
conversation = key
for prop in self.correlation_properties:
if conversation not in prop.correlation_keys:
break
return conversation
return None
correlations[key][property.name] = task.workflow.script_engine._evaluate(property.retrieval_expression, payload)
except WorkflowException:
# Just ignore missing keys. The dictionaries have to match exactly
pass
return correlations


class NoneEventDefinition(EventDefinition):
Expand Down Expand Up @@ -493,8 +468,4 @@ def __eq__(self, other):
def throw(self, my_task):
# Mutiple events throw all associated events when they fire
for event_definition in self.event_definitions:
self._throw(
event=event_definition,
workflow=my_task.workflow,
outer_workflow=my_task.workflow.outer_workflow
)
event_definition.throw(my_task)
2 changes: 1 addition & 1 deletion SpiffWorkflow/bpmn/specs/mixins/events/end_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def _on_complete_hook(self, my_task):

# We are finished. Set the workflow data and cancel all tasks
my_task.workflow.set_data(**my_task.data)
for task in my_task.workflow.get_tasks(TaskState.NOT_FINISHED_MASK, workflow=my_task.workflow):
for task in my_task.workflow.get_tasks(TaskState.NOT_FINISHED_MASK):
task.cancel()

elif isinstance(self.event_definition, CancelEventDefinition):
Expand Down
1 change: 1 addition & 0 deletions SpiffWorkflow/bpmn/specs/mixins/events/event_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def __init__(self, wf_spec, bpmn_id, event_definition, **kwargs):
self.event_definition = event_definition

def catches(self, my_task, event_definition, correlations=None):
correlations = correlations or {}
if self.event_definition == event_definition:
return all([correlations.get(key) == my_task.workflow.correlations.get(key) for key in correlations ])
else:
Expand Down
13 changes: 6 additions & 7 deletions SpiffWorkflow/bpmn/specs/mixins/subworkflow_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ def _on_subworkflow_completed(self, subworkflow, my_task):
self.update_data(my_task, subworkflow)

def _update_hook(self, my_task):
wf = my_task.workflow._get_outermost_workflow(my_task)
subprocess = wf.subprocesses.get(my_task.id)
subprocess = my_task.workflow.top_workflow.subprocesses.get(my_task.id)
if subprocess is None:
super()._update_hook(my_task)
self.create_workflow(my_task)
Expand All @@ -68,18 +67,18 @@ def copy_data(self, my_task, subworkflow):
# But our data management is already hopelessly messed up and in dire needs of reconsideration
if len(subworkflow.spec.data_objects) > 0:
subworkflow.data = my_task.workflow.data
start = subworkflow.get_tasks_from_spec_name('Start', workflow=subworkflow)
start = subworkflow.get_tasks_from_spec_name('Start')
start[0].set_data(**my_task.data)

def update_data(self, my_task, subworkflow):
my_task.data = deepcopy(subworkflow.last_task.data)

def create_workflow(self, my_task):
subworkflow = my_task.workflow.create_subprocess(my_task, self.spec, self.name)
subworkflow = my_task.workflow.top_workflow.create_subprocess(my_task, self.spec)
subworkflow.completed_event.connect(self._on_subworkflow_completed, my_task)

def start_workflow(self, my_task):
subworkflow = my_task.workflow.get_subprocess(my_task)
subworkflow = my_task.workflow.top_workflow.get_subprocess(my_task)
self.copy_data(my_task, subworkflow)
for child in subworkflow.task_tree.children:
child.task_spec._update(child)
Expand All @@ -93,7 +92,7 @@ def __init__(self, wf_spec, bpmn_id, subworkflow_spec, **kwargs):

def copy_data(self, my_task, subworkflow):

start = subworkflow.get_tasks_from_spec_name('Start', workflow=subworkflow)
start = subworkflow.get_tasks_from_spec_name('Start')
if subworkflow.spec.io_specification is None or len(subworkflow.spec.io_specification.data_inputs) == 0:
# Copy all task data into start task if no inputs specified
start[0].set_data(**my_task.data)
Expand All @@ -114,7 +113,7 @@ def update_data(self, my_task, subworkflow):
# Copy all workflow data if no outputs are specified
my_task.data = deepcopy(subworkflow.last_task.data)
else:
end = subworkflow.get_tasks_from_spec_name('End', workflow=subworkflow)
end = subworkflow.get_tasks_from_spec_name('End')
# Otherwise only copy data with the specified names
for var in subworkflow.spec.io_specification.data_outputs:
if var.bpmn_id not in end[0].data:
Expand Down
Loading