Skip to content

Commit

Permalink
add task spec methods for MI info
Browse files Browse the repository at this point in the history
  • Loading branch information
essweine committed Aug 2, 2023
1 parent 282b9e0 commit c9a584c
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 10 deletions.
12 changes: 12 additions & 0 deletions SpiffWorkflow/bpmn/specs/bpmn_task_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,15 @@ def _on_complete_hook(self, my_task):
my_task.data.pop(obj.bpmn_id, None)

super()._on_complete_hook(my_task)

def task_info(self, my_task):
# This method can be extended to provide task specific info for different spec types
# Since almost all spec types can be MI, add instance info here if present
info = {}
if 'key_or_index' in my_task.internal_data:
info['instance'] = my_task.internal_data.get('key_or_index')
if 'item' in my_task.internal_data:
info['instance'] = my_task.internal_data.get('item')
if 'iteration' in my_task.internal_data:
info['iteration'] = my_task.internal_data.get('iteration')
return info
61 changes: 53 additions & 8 deletions SpiffWorkflow/bpmn/specs/mixins/multiinstance_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,21 @@
from collections.abc import Iterable, Sequence, Mapping, MutableSequence, MutableMapping

from SpiffWorkflow.task import TaskState
from SpiffWorkflow.specs.base import TaskSpec
from SpiffWorkflow.util.deep_merge import DeepMerge
from SpiffWorkflow.bpmn.specs.bpmn_task_spec import BpmnTaskSpec
from SpiffWorkflow.bpmn.exceptions import WorkflowDataException


class LoopTask(TaskSpec):
class LoopTask(BpmnTaskSpec):

def process_children(self, my_task):
"""
Handle any newly completed children and update merged tasks.
Returns a boolean indicating whether there is a child currently running
"""
merged = my_task.internal_data.get('merged') or []
merged = self._merged_children(my_task)
child_running = False
for child in filter(lambda c: c.task_spec.name == self.task_spec, my_task.children):
for child in self._instances(my_task):
if child._has_state(TaskState.FINISHED_MASK) and str(child.id) not in merged:
self.child_completed_action(my_task, child)
merged.append(str(child.id))
Expand All @@ -47,6 +47,12 @@ def process_children(self, my_task):
def child_completed_action(self, my_task, child):
raise NotImplementedError

def _merged_children(self, my_task):
return my_task.internal_data.get('merged', [])

def _instances(self, my_task):
return filter(lambda c: c.task_spec.name == self.task_spec, my_task.children)


class StandardLoopTask(LoopTask):

Expand All @@ -57,6 +63,14 @@ def __init__(self, wf_spec, bpmn_id, task_spec, maximum, condition, test_before,
self.condition = condition
self.test_before = test_before

def task_info(self, my_task):
info = super().task_info(my_task)
info['iterations_completed'] = len(self._merged_children(my_task))
if self.maximum:
info['iterations_remaining'] = self.maximum - info['iterations_completed']
info['instance_map'] = dict((idx, str(t.id)) for idx, t in enumerate(self._instances(my_task)))
return info

def _update_hook(self, my_task):

if my_task.state != TaskState.WAITING:
Expand All @@ -77,12 +91,13 @@ def _update_hook(self, my_task):
child = my_task._add_child(task_spec, TaskState.WAITING)
child.triggered = True
child.data = deepcopy(my_task.data)
child.internal_data['iteration'] = len(self._merged_children(my_task))

def child_completed_action(self, my_task, child):
DeepMerge.merge(my_task.data, child.data)

def loop_complete(self, my_task):
merged = my_task.internal_data.get('merged') or []
merged = self._merged_children(my_task)
if not self.test_before and len(merged) == 0:
# "test before" isn't really compatible our execution model in a transparent way
# This guarantees that the task will run at least once if test_before is False
Expand All @@ -108,6 +123,27 @@ def __init__(self, wf_spec, bpmn_id, task_spec, cardinality=None, data_input=Non
self.output_item = output_item
self.condition = condition

def task_info(self, my_task):
info = super().task_info(my_task)
info.update({
'completed': [],
'running': [],
'future': my_task.internal_data.get('remaining', []),
'instance_map': {},
})
for task in self._instances(my_task):
key_or_index = task.internal_data.get('key_or_index')
value = task.internal_data.get('item') if key_or_index is None else key_or_index
if task._has_state(TaskState.FINISHED_MASK):
info['completed'].append(value)
else:
info['running'].append(value)
try:
info['instance_map'][value] = str(task.id)
except TypeError:
info['instance_map'][str(value)] = str(task.id)
return info

def child_completed_action(self, my_task, child):
"""This merges child data into this task's data."""

Expand Down Expand Up @@ -138,11 +174,13 @@ def create_child(self, my_task, item, key_or_index=None):
child.data[self.input_item.bpmn_id] = deepcopy(item)
if key_or_index is not None:
child.internal_data['key_or_index'] = key_or_index
else:
child.internal_data['item'] = item
child.task_spec._update(child)

def check_completion_condition(self, my_task):

merged = my_task.internal_data.get('merged', [])
merged = self._merged_children(my_task)
if len(merged) > 0:
last_child = [c for c in my_task.children if str(c.id) == merged[-1]][0]
return my_task.workflow.script_engine.evaluate(last_child, self.condition)
Expand Down Expand Up @@ -198,6 +236,13 @@ def _update_hook(self, my_task):
else:
return self.add_next_child(my_task)

def task_info(self, my_task):
info = super().task_info(my_task)
cardinality = my_task.internal_data.get('cardinality')
if cardinality is not None:
info['future'] = [v for v in range(len(info['completed']) + len(info['running']), cardinality)]
return info

def add_next_child(self, my_task):

if self.data_input is not None:
Expand Down Expand Up @@ -307,7 +352,7 @@ def create_children(self, my_task):
else:
# For tasks specifying the cardinality, use the index as the "item"
cardinality = my_task.workflow.script_engine.evaluate(my_task, self.cardinality)
children = ((None, idx) for idx in range(cardinality))
children = ((idx, idx) for idx in range(cardinality))

if not my_task.internal_data.get('started', False):

Expand All @@ -323,4 +368,4 @@ def create_children(self, my_task):

my_task.internal_data['started'] = True
else:
return len(my_task.internal_data.get('merged', [])) == len(children)
return len(self._merged_children(my_task)) == len(children)
17 changes: 16 additions & 1 deletion tests/SpiffWorkflow/bpmn/ParallelMultiInstanceTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,36 @@ def set_io_and_run_workflow(self, data, data_input=None, data_output=None, save_
any_task = self.workflow.get_tasks_from_spec_name('any_task')[0]
any_task.task_spec.data_input = TaskDataReference(data_input) if data_input is not None else None
any_task.task_spec.data_output = TaskDataReference(data_output) if data_output is not None else None

self.workflow.do_engine_steps()

task_info = any_task.task_spec.task_info(any_task)
self.assertEqual(len(task_info['completed']), 0)
self.assertEqual(len(task_info['running']), 3)
self.assertEqual(len(task_info['future']), 0)
self.assertEqual(len(task_info['instance_map']), 3)
instance_map = task_info['instance_map']

ready_tasks = self.workflow.get_ready_user_tasks()
self.assertEqual(len(ready_tasks), 3)
while len(ready_tasks) > 0:
task = ready_tasks[0]
task_info = task.task_spec.task_info(task)
self.assertEqual(task.task_spec.name, 'any_task [child]')
self.assertIn('input_item', task.data)
self.assertEqual(instance_map[task_info['instance']], str(task.id))
task.data['output_item'] = task.data['input_item'] * 2
task.run()
if save_restore:
self.save_restore()
ready_tasks = self.workflow.get_ready_user_tasks()
self.workflow.refresh_waiting_tasks()
self.workflow.do_engine_steps()

any_task = self.workflow.get_tasks_from_spec_name('any_task')[0]
task_info = any_task.task_spec.task_info(any_task)
self.assertEqual(len(task_info['completed']), 3)
self.assertEqual(len(task_info['running']), 0)
self.assertEqual(len(task_info['future']), 0)
self.assertTrue(self.workflow.is_completed())

def run_workflow_with_condition(self, data):
Expand Down
20 changes: 19 additions & 1 deletion tests/SpiffWorkflow/bpmn/SequentialMultiInstanceTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,14 @@ def set_io_and_run_workflow(self, data, data_input=None, data_output=None, save_

self.workflow.do_engine_steps()
self.workflow.refresh_waiting_tasks()
ready_tasks = self.workflow.get_ready_user_tasks()

task_info = any_task.task_spec.task_info(any_task)
self.assertEqual(len(task_info['completed']), 0)
self.assertEqual(len(task_info['running']), 1)
self.assertEqual(len(task_info['future']), 2)
self.assertEqual(len(task_info['instance_map']), 1)

ready_tasks = self.workflow.get_ready_user_tasks()
while len(ready_tasks) > 0:
self.assertEqual(len(ready_tasks), 1)
task = ready_tasks[0]
Expand All @@ -32,7 +38,19 @@ def set_io_and_run_workflow(self, data, data_input=None, data_output=None, save_
ready_tasks = self.workflow.get_ready_user_tasks()

self.workflow.do_engine_steps()

any_task = self.workflow.get_tasks_from_spec_name('any_task')[0]
task_info = any_task.task_spec.task_info(any_task)
self.assertEqual(len(task_info['completed']), 3)
self.assertEqual(len(task_info['running']), 0)
self.assertEqual(len(task_info['future']), 0)
self.assertEqual(len(task_info['instance_map']), 3)

children = self.workflow.get_tasks_from_spec_name('any_task [child]')
for child in children:
info = child.task_spec.task_info(child)
instance = info['instance']
self.assertEqual(task_info['instance_map'][instance], str(child.id))
self.assertEqual(len(children), 3)
self.assertTrue(self.workflow.is_completed())

Expand Down
15 changes: 15 additions & 0 deletions tests/SpiffWorkflow/bpmn/StandardLoopTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,30 @@ def testLoopMaximum(self):

start = self.workflow.get_tasks_from_spec_name('StartEvent_1')
start[0].data['done'] = False

any_task = self.workflow.get_tasks_from_spec_name('any_task')[0]
task_info = any_task.task_spec.task_info(any_task)
self.assertEqual(task_info['iterations_completed'], 0)
self.assertEqual(task_info['iterations_remaining'], 3)
self.assertEqual(len(task_info['instance_map']), 0)

for idx in range(3):
self.workflow.do_engine_steps()
self.workflow.refresh_waiting_tasks()
ready_tasks = self.workflow.get_ready_user_tasks()
self.assertEqual(len(ready_tasks), 1)
ready_tasks[0].data[str(idx)] = True
ready_tasks[0].run()
task_info = ready_tasks[0].task_spec.task_info(ready_tasks[0])
self.assertEqual(task_info['iteration'], idx)

self.workflow.do_engine_steps()
any_task = self.workflow.get_tasks_from_spec_name('any_task')[0]
task_info = any_task.task_spec.task_info(any_task)
self.assertEqual(task_info['iterations_completed'], 3)
self.assertEqual(task_info['iterations_remaining'], 0)
self.assertEqual(len(task_info['instance_map']), 3)

self.assertTrue(self.workflow.is_completed())

def testLoopCondition(self):
Expand Down

0 comments on commit c9a584c

Please sign in to comment.