Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvement/updates to multiinstance #347

Merged
merged 3 commits into from
Aug 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions SpiffWorkflow/bpmn/parser/TaskParser.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def __init__(self, process_parser, spec_class, node, nsmap=None, lane=None):
self.spec_class = spec_class
self.spec = self.process_parser.spec

def _copy_task_attrs(self, original):
def _copy_task_attrs(self, original, loop_characteristics=None):

self.task.inputs = original.inputs
self.task.outputs = original.outputs
Expand Down Expand Up @@ -95,7 +95,7 @@ def _add_loop_task(self, loop_characteristics):

original = self.spec.task_specs.pop(self.task.name)
self.task = self.STANDARD_LOOP_CLASS(self.spec, original.name, '', maximum, condition, test_before)
self._copy_task_attrs(original)
self._copy_task_attrs(original, loop_characteristics)

def _add_multiinstance_task(self, loop_characteristics):

Expand Down Expand Up @@ -156,7 +156,7 @@ def _add_multiinstance_task(self, loop_characteristics):
self.task = self.SEQUENTIAL_MI_CLASS(self.spec, original.name, **params)
else:
self.task = self.PARALLEL_MI_CLASS(self.spec, original.name, **params)
self._copy_task_attrs(original)
self._copy_task_attrs(original, loop_characteristics)

def _add_boundary_event(self, children):

Expand Down
12 changes: 12 additions & 0 deletions SpiffWorkflow/bpmn/specs/bpmn_task_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,15 @@ def _on_complete_hook(self, my_task):
my_task.data.pop(obj.bpmn_id, None)

super()._on_complete_hook(my_task)

def task_info(self, my_task):
# This method can be extended to provide task specific info for different spec types
# Since almost all spec types can be MI, add instance info here if present
info = {}
if 'key_or_index' in my_task.internal_data:
info['instance'] = my_task.internal_data.get('key_or_index')
if 'item' in my_task.internal_data:
info['instance'] = my_task.internal_data.get('item')
if 'iteration' in my_task.internal_data:
info['iteration'] = my_task.internal_data.get('iteration')
return info
68 changes: 58 additions & 10 deletions SpiffWorkflow/bpmn/specs/mixins/multiinstance_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,21 @@
from collections.abc import Iterable, Sequence, Mapping, MutableSequence, MutableMapping

from SpiffWorkflow.task import TaskState
from SpiffWorkflow.specs.base import TaskSpec
from SpiffWorkflow.util.deep_merge import DeepMerge
from SpiffWorkflow.bpmn.specs.bpmn_task_spec import BpmnTaskSpec
from SpiffWorkflow.bpmn.exceptions import WorkflowDataException


class LoopTask(TaskSpec):
class LoopTask(BpmnTaskSpec):

def process_children(self, my_task):
"""
Handle any newly completed children and update merged tasks.
Returns a boolean indicating whether there is a child currently running
"""
merged = my_task.internal_data.get('merged') or []
merged = self._merged_children(my_task)
child_running = False
for child in filter(lambda c: c.task_spec.name == self.task_spec, my_task.children):
for child in self._instances(my_task):
if child._has_state(TaskState.FINISHED_MASK) and str(child.id) not in merged:
self.child_completed_action(my_task, child)
merged.append(str(child.id))
Expand All @@ -47,6 +47,12 @@ def process_children(self, my_task):
def child_completed_action(self, my_task, child):
raise NotImplementedError

def _merged_children(self, my_task):
return my_task.internal_data.get('merged', [])

def _instances(self, my_task):
return filter(lambda c: c.task_spec.name == self.task_spec, my_task.children)


class StandardLoopTask(LoopTask):

Expand All @@ -57,6 +63,14 @@ def __init__(self, wf_spec, bpmn_id, task_spec, maximum, condition, test_before,
self.condition = condition
self.test_before = test_before

def task_info(self, my_task):
info = super().task_info(my_task)
info['iterations_completed'] = len(self._merged_children(my_task))
if self.maximum:
info['iterations_remaining'] = self.maximum - info['iterations_completed']
info['instance_map'] = dict((idx, str(t.id)) for idx, t in enumerate(self._instances(my_task)))
return info

def _update_hook(self, my_task):

if my_task.state != TaskState.WAITING:
Expand All @@ -77,12 +91,13 @@ def _update_hook(self, my_task):
child = my_task._add_child(task_spec, TaskState.WAITING)
child.triggered = True
child.data = deepcopy(my_task.data)
child.internal_data['iteration'] = len(self._merged_children(my_task))

def child_completed_action(self, my_task, child):
DeepMerge.merge(my_task.data, child.data)

def loop_complete(self, my_task):
merged = my_task.internal_data.get('merged') or []
merged = self._merged_children(my_task)
if not self.test_before and len(merged) == 0:
# "test before" isn't really compatible our execution model in a transparent way
# This guarantees that the task will run at least once if test_before is False
Expand All @@ -108,6 +123,27 @@ def __init__(self, wf_spec, bpmn_id, task_spec, cardinality=None, data_input=Non
self.output_item = output_item
self.condition = condition

def task_info(self, my_task):
info = super().task_info(my_task)
info.update({
'completed': [],
'running': [],
'future': my_task.internal_data.get('remaining', []),
'instance_map': {},
})
for task in self._instances(my_task):
key_or_index = task.internal_data.get('key_or_index')
value = task.internal_data.get('item') if key_or_index is None else key_or_index
if task._has_state(TaskState.FINISHED_MASK):
info['completed'].append(value)
else:
info['running'].append(value)
try:
info['instance_map'][value] = str(task.id)
except TypeError:
info['instance_map'][str(value)] = str(task.id)
return info

def child_completed_action(self, my_task, child):
"""This merges child data into this task's data."""

Expand All @@ -130,16 +166,21 @@ def create_child(self, my_task, item, key_or_index=None):
task_spec = my_task.workflow.spec.task_specs[self.task_spec]
child = my_task._add_child(task_spec, TaskState.WAITING)
child.triggered = True
child.data = deepcopy(my_task.data)
if self.input_item is not None:
if self.input_item is not None and self.input_item.bpmn_id in my_task.data:
raise WorkflowDataException(f'Multiinstance input item {self.input_item.bpmn_id} already exists.', my_task)
if self.output_item is not None and self.output_item.bpmn_id in my_task.data:
raise WorkflowDataException(f'Multiinstance output item {self.output_item.bpmn_id} already exists.', my_task)
if self.input_item is not None:
child.data[self.input_item.bpmn_id] = deepcopy(item)
if key_or_index is not None:
child.internal_data['key_or_index'] = key_or_index
else:
child.internal_data['item'] = item
child.task_spec._update(child)

def check_completion_condition(self, my_task):

merged = my_task.internal_data.get('merged', [])
merged = self._merged_children(my_task)
if len(merged) > 0:
last_child = [c for c in my_task.children if str(c.id) == merged[-1]][0]
return my_task.workflow.script_engine.evaluate(last_child, self.condition)
Expand Down Expand Up @@ -195,6 +236,13 @@ def _update_hook(self, my_task):
else:
return self.add_next_child(my_task)

def task_info(self, my_task):
info = super().task_info(my_task)
cardinality = my_task.internal_data.get('cardinality')
if cardinality is not None:
info['future'] = [v for v in range(len(info['completed']) + len(info['running']), cardinality)]
return info

def add_next_child(self, my_task):

if self.data_input is not None:
Expand Down Expand Up @@ -304,7 +352,7 @@ def create_children(self, my_task):
else:
# For tasks specifying the cardinality, use the index as the "item"
cardinality = my_task.workflow.script_engine.evaluate(my_task, self.cardinality)
children = ((None, idx) for idx in range(cardinality))
children = ((idx, idx) for idx in range(cardinality))

if not my_task.internal_data.get('started', False):

Expand All @@ -320,4 +368,4 @@ def create_children(self, my_task):

my_task.internal_data['started'] = True
else:
return len(my_task.internal_data.get('merged', [])) == len(children)
return len(self._merged_children(my_task)) == len(children)
2 changes: 1 addition & 1 deletion SpiffWorkflow/camunda/parser/task_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def _add_multiinstance_task(self, loop_characteristics):
self.task = SequentialMultiInstanceTask(self.spec, original.name, **params)
else:
self.task = ParallelMultiInstanceTask(self.spec, original.name, **params)
self._copy_task_attrs(original)
self._copy_task_attrs(original, loop_characteristics)


class BusinessRuleTaskParser(CamundaTaskParser):
Expand Down
11 changes: 6 additions & 5 deletions SpiffWorkflow/spiff/parser/task_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,14 @@ def _parse_servicetask_operator(cls, node):
operator['parameters'] = parameters
return operator

def _copy_task_attrs(self, original):
def _copy_task_attrs(self, original, loop_characteristics):
# I am so disappointed I have to do this.
super()._copy_task_attrs(original)
self.task.prescript = original.prescript
self.task.postscript = original.postscript
original.prescript = None
original.postscript = None
if loop_characteristics.attrib.get('{' + SPIFFWORKFLOW_MODEL_NS + '}' + 'scriptsOnInstances') != 'true':
self.task.prescript = original.prescript
self.task.postscript = original.postscript
original.prescript = None
original.postscript = None

def create_task(self):
# The main task parser already calls this, and even sets an attribute, but
Expand Down
17 changes: 16 additions & 1 deletion tests/SpiffWorkflow/bpmn/ParallelMultiInstanceTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,36 @@ def set_io_and_run_workflow(self, data, data_input=None, data_output=None, save_
any_task = self.workflow.get_tasks_from_spec_name('any_task')[0]
any_task.task_spec.data_input = TaskDataReference(data_input) if data_input is not None else None
any_task.task_spec.data_output = TaskDataReference(data_output) if data_output is not None else None

self.workflow.do_engine_steps()

task_info = any_task.task_spec.task_info(any_task)
self.assertEqual(len(task_info['completed']), 0)
self.assertEqual(len(task_info['running']), 3)
self.assertEqual(len(task_info['future']), 0)
self.assertEqual(len(task_info['instance_map']), 3)
instance_map = task_info['instance_map']

ready_tasks = self.workflow.get_ready_user_tasks()
self.assertEqual(len(ready_tasks), 3)
while len(ready_tasks) > 0:
task = ready_tasks[0]
task_info = task.task_spec.task_info(task)
self.assertEqual(task.task_spec.name, 'any_task [child]')
self.assertIn('input_item', task.data)
self.assertEqual(instance_map[task_info['instance']], str(task.id))
task.data['output_item'] = task.data['input_item'] * 2
task.run()
if save_restore:
self.save_restore()
ready_tasks = self.workflow.get_ready_user_tasks()
self.workflow.refresh_waiting_tasks()
self.workflow.do_engine_steps()

any_task = self.workflow.get_tasks_from_spec_name('any_task')[0]
task_info = any_task.task_spec.task_info(any_task)
self.assertEqual(len(task_info['completed']), 3)
self.assertEqual(len(task_info['running']), 0)
self.assertEqual(len(task_info['future']), 0)
self.assertTrue(self.workflow.is_completed())

def run_workflow_with_condition(self, data):
Expand Down
20 changes: 19 additions & 1 deletion tests/SpiffWorkflow/bpmn/SequentialMultiInstanceTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,14 @@ def set_io_and_run_workflow(self, data, data_input=None, data_output=None, save_

self.workflow.do_engine_steps()
self.workflow.refresh_waiting_tasks()
ready_tasks = self.workflow.get_ready_user_tasks()

task_info = any_task.task_spec.task_info(any_task)
self.assertEqual(len(task_info['completed']), 0)
self.assertEqual(len(task_info['running']), 1)
self.assertEqual(len(task_info['future']), 2)
self.assertEqual(len(task_info['instance_map']), 1)

ready_tasks = self.workflow.get_ready_user_tasks()
while len(ready_tasks) > 0:
self.assertEqual(len(ready_tasks), 1)
task = ready_tasks[0]
Expand All @@ -32,7 +38,19 @@ def set_io_and_run_workflow(self, data, data_input=None, data_output=None, save_
ready_tasks = self.workflow.get_ready_user_tasks()

self.workflow.do_engine_steps()

any_task = self.workflow.get_tasks_from_spec_name('any_task')[0]
task_info = any_task.task_spec.task_info(any_task)
self.assertEqual(len(task_info['completed']), 3)
self.assertEqual(len(task_info['running']), 0)
self.assertEqual(len(task_info['future']), 0)
self.assertEqual(len(task_info['instance_map']), 3)

children = self.workflow.get_tasks_from_spec_name('any_task [child]')
for child in children:
info = child.task_spec.task_info(child)
instance = info['instance']
self.assertEqual(task_info['instance_map'][instance], str(child.id))
self.assertEqual(len(children), 3)
self.assertTrue(self.workflow.is_completed())

Expand Down
15 changes: 15 additions & 0 deletions tests/SpiffWorkflow/bpmn/StandardLoopTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,30 @@ def testLoopMaximum(self):

start = self.workflow.get_tasks_from_spec_name('StartEvent_1')
start[0].data['done'] = False

any_task = self.workflow.get_tasks_from_spec_name('any_task')[0]
task_info = any_task.task_spec.task_info(any_task)
self.assertEqual(task_info['iterations_completed'], 0)
self.assertEqual(task_info['iterations_remaining'], 3)
self.assertEqual(len(task_info['instance_map']), 0)

for idx in range(3):
self.workflow.do_engine_steps()
self.workflow.refresh_waiting_tasks()
ready_tasks = self.workflow.get_ready_user_tasks()
self.assertEqual(len(ready_tasks), 1)
ready_tasks[0].data[str(idx)] = True
ready_tasks[0].run()
task_info = ready_tasks[0].task_spec.task_info(ready_tasks[0])
self.assertEqual(task_info['iteration'], idx)

self.workflow.do_engine_steps()
any_task = self.workflow.get_tasks_from_spec_name('any_task')[0]
task_info = any_task.task_spec.task_info(any_task)
self.assertEqual(task_info['iterations_completed'], 3)
self.assertEqual(task_info['iterations_remaining'], 0)
self.assertEqual(len(task_info['instance_map']), 3)

self.assertTrue(self.workflow.is_completed())

def testLoopCondition(self):
Expand Down
23 changes: 23 additions & 0 deletions tests/SpiffWorkflow/spiff/MultiInstanceTaskTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,26 @@ def testMultiInstanceTask(self):
'input_data': [2, 3, 4], # Prescript adds 1 to input
'output_data': [3, 5, 7], # Postscript subtracts 1 from output
})

def testMultiInstanceTaskWithInstanceScripts(self):
spec, subprocesses = self.load_workflow_spec('script_on_mi.bpmn', 'Process_1')
self.workflow = BpmnWorkflow(spec, subprocesses)
start = self.workflow.get_tasks_from_spec_name('Start')[0]
start.data = {'input_data': [1, 2, 3]}
self.workflow.do_engine_steps()
task = self.workflow.get_tasks_from_spec_name('any_task')[0]
self.workflow.do_engine_steps()

self.save_restore()

ready_tasks = self.workflow.get_ready_user_tasks()
for task in ready_tasks:
task.data['output_item'] = task.data['input_item'] * 2
task.run()
self.workflow.do_engine_steps()

self.assertTrue(self.workflow.is_completed())
self.assertDictEqual(self.workflow.data, {
'input_data': [1, 2, 3], # Prescript modifies input item
'output_data': [3, 5, 7],
})
Loading