diff --git a/SpiffWorkflow/bpmn/specs/bpmn_task_spec.py b/SpiffWorkflow/bpmn/specs/bpmn_task_spec.py index 108abbe2..22e805ef 100644 --- a/SpiffWorkflow/bpmn/specs/bpmn_task_spec.py +++ b/SpiffWorkflow/bpmn/specs/bpmn_task_spec.py @@ -93,3 +93,15 @@ def _on_complete_hook(self, my_task): my_task.data.pop(obj.bpmn_id, None) super()._on_complete_hook(my_task) + + def task_info(self, my_task): + # This method can be extended to provide task specific info for different spec types + # Since almost all spec types can be MI, add instance info here if present + info = {} + if 'key_or_index' in my_task.internal_data: + info['instance'] = my_task.internal_data.get('key_or_index') + if 'item' in my_task.internal_data: + info['instance'] = my_task.internal_data.get('item') + if 'iteration' in my_task.internal_data: + info['iteration'] = my_task.internal_data.get('iteration') + return info diff --git a/SpiffWorkflow/bpmn/specs/mixins/multiinstance_task.py b/SpiffWorkflow/bpmn/specs/mixins/multiinstance_task.py index 2589c26c..d3bad8a3 100644 --- a/SpiffWorkflow/bpmn/specs/mixins/multiinstance_task.py +++ b/SpiffWorkflow/bpmn/specs/mixins/multiinstance_task.py @@ -21,21 +21,21 @@ from collections.abc import Iterable, Sequence, Mapping, MutableSequence, MutableMapping from SpiffWorkflow.task import TaskState -from SpiffWorkflow.specs.base import TaskSpec from SpiffWorkflow.util.deep_merge import DeepMerge +from SpiffWorkflow.bpmn.specs.bpmn_task_spec import BpmnTaskSpec from SpiffWorkflow.bpmn.exceptions import WorkflowDataException -class LoopTask(TaskSpec): +class LoopTask(BpmnTaskSpec): def process_children(self, my_task): """ Handle any newly completed children and update merged tasks. Returns a boolean indicating whether there is a child currently running """ - merged = my_task.internal_data.get('merged') or [] + merged = self._merged_children(my_task) child_running = False - for child in filter(lambda c: c.task_spec.name == self.task_spec, my_task.children): + for child in self._instances(my_task): if child._has_state(TaskState.FINISHED_MASK) and str(child.id) not in merged: self.child_completed_action(my_task, child) merged.append(str(child.id)) @@ -47,6 +47,12 @@ def process_children(self, my_task): def child_completed_action(self, my_task, child): raise NotImplementedError + def _merged_children(self, my_task): + return my_task.internal_data.get('merged', []) + + def _instances(self, my_task): + return filter(lambda c: c.task_spec.name == self.task_spec, my_task.children) + class StandardLoopTask(LoopTask): @@ -57,6 +63,14 @@ def __init__(self, wf_spec, bpmn_id, task_spec, maximum, condition, test_before, self.condition = condition self.test_before = test_before + def task_info(self, my_task): + info = super().task_info(my_task) + info['iterations_completed'] = len(self._merged_children(my_task)) + if self.maximum: + info['iterations_remaining'] = self.maximum - info['iterations_completed'] + info['instance_map'] = dict((idx, str(t.id)) for idx, t in enumerate(self._instances(my_task))) + return info + def _update_hook(self, my_task): if my_task.state != TaskState.WAITING: @@ -77,12 +91,13 @@ def _update_hook(self, my_task): child = my_task._add_child(task_spec, TaskState.WAITING) child.triggered = True child.data = deepcopy(my_task.data) + child.internal_data['iteration'] = len(self._merged_children(my_task)) def child_completed_action(self, my_task, child): DeepMerge.merge(my_task.data, child.data) def loop_complete(self, my_task): - merged = my_task.internal_data.get('merged') or [] + merged = self._merged_children(my_task) if not self.test_before and len(merged) == 0: # "test before" isn't really compatible our execution model in a transparent way # This guarantees that the task will run at least once if test_before is False @@ -108,6 +123,27 @@ def __init__(self, wf_spec, bpmn_id, task_spec, cardinality=None, data_input=Non self.output_item = output_item self.condition = condition + def task_info(self, my_task): + info = super().task_info(my_task) + info.update({ + 'completed': [], + 'running': [], + 'future': my_task.internal_data.get('remaining', []), + 'instance_map': {}, + }) + for task in self._instances(my_task): + key_or_index = task.internal_data.get('key_or_index') + value = task.internal_data.get('item') if key_or_index is None else key_or_index + if task._has_state(TaskState.FINISHED_MASK): + info['completed'].append(value) + else: + info['running'].append(value) + try: + info['instance_map'][value] = str(task.id) + except TypeError: + info['instance_map'][str(value)] = str(task.id) + return info + def child_completed_action(self, my_task, child): """This merges child data into this task's data.""" @@ -138,11 +174,13 @@ def create_child(self, my_task, item, key_or_index=None): child.data[self.input_item.bpmn_id] = deepcopy(item) if key_or_index is not None: child.internal_data['key_or_index'] = key_or_index + else: + child.internal_data['item'] = item child.task_spec._update(child) def check_completion_condition(self, my_task): - merged = my_task.internal_data.get('merged', []) + merged = self._merged_children(my_task) if len(merged) > 0: last_child = [c for c in my_task.children if str(c.id) == merged[-1]][0] return my_task.workflow.script_engine.evaluate(last_child, self.condition) @@ -198,6 +236,13 @@ def _update_hook(self, my_task): else: return self.add_next_child(my_task) + def task_info(self, my_task): + info = super().task_info(my_task) + cardinality = my_task.internal_data.get('cardinality') + if cardinality is not None: + info['future'] = [v for v in range(len(info['completed']) + len(info['running']), cardinality)] + return info + def add_next_child(self, my_task): if self.data_input is not None: @@ -307,7 +352,7 @@ def create_children(self, my_task): else: # For tasks specifying the cardinality, use the index as the "item" cardinality = my_task.workflow.script_engine.evaluate(my_task, self.cardinality) - children = ((None, idx) for idx in range(cardinality)) + children = ((idx, idx) for idx in range(cardinality)) if not my_task.internal_data.get('started', False): @@ -323,4 +368,4 @@ def create_children(self, my_task): my_task.internal_data['started'] = True else: - return len(my_task.internal_data.get('merged', [])) == len(children) + return len(self._merged_children(my_task)) == len(children) diff --git a/tests/SpiffWorkflow/bpmn/ParallelMultiInstanceTest.py b/tests/SpiffWorkflow/bpmn/ParallelMultiInstanceTest.py index 6ebc1955..ab50da67 100644 --- a/tests/SpiffWorkflow/bpmn/ParallelMultiInstanceTest.py +++ b/tests/SpiffWorkflow/bpmn/ParallelMultiInstanceTest.py @@ -17,14 +17,23 @@ def set_io_and_run_workflow(self, data, data_input=None, data_output=None, save_ any_task = self.workflow.get_tasks_from_spec_name('any_task')[0] any_task.task_spec.data_input = TaskDataReference(data_input) if data_input is not None else None any_task.task_spec.data_output = TaskDataReference(data_output) if data_output is not None else None - self.workflow.do_engine_steps() + + task_info = any_task.task_spec.task_info(any_task) + self.assertEqual(len(task_info['completed']), 0) + self.assertEqual(len(task_info['running']), 3) + self.assertEqual(len(task_info['future']), 0) + self.assertEqual(len(task_info['instance_map']), 3) + instance_map = task_info['instance_map'] + ready_tasks = self.workflow.get_ready_user_tasks() self.assertEqual(len(ready_tasks), 3) while len(ready_tasks) > 0: task = ready_tasks[0] + task_info = task.task_spec.task_info(task) self.assertEqual(task.task_spec.name, 'any_task [child]') self.assertIn('input_item', task.data) + self.assertEqual(instance_map[task_info['instance']], str(task.id)) task.data['output_item'] = task.data['input_item'] * 2 task.run() if save_restore: @@ -32,6 +41,12 @@ def set_io_and_run_workflow(self, data, data_input=None, data_output=None, save_ ready_tasks = self.workflow.get_ready_user_tasks() self.workflow.refresh_waiting_tasks() self.workflow.do_engine_steps() + + any_task = self.workflow.get_tasks_from_spec_name('any_task')[0] + task_info = any_task.task_spec.task_info(any_task) + self.assertEqual(len(task_info['completed']), 3) + self.assertEqual(len(task_info['running']), 0) + self.assertEqual(len(task_info['future']), 0) self.assertTrue(self.workflow.is_completed()) def run_workflow_with_condition(self, data): diff --git a/tests/SpiffWorkflow/bpmn/SequentialMultiInstanceTest.py b/tests/SpiffWorkflow/bpmn/SequentialMultiInstanceTest.py index 9a91d449..9928eba9 100644 --- a/tests/SpiffWorkflow/bpmn/SequentialMultiInstanceTest.py +++ b/tests/SpiffWorkflow/bpmn/SequentialMultiInstanceTest.py @@ -18,8 +18,14 @@ def set_io_and_run_workflow(self, data, data_input=None, data_output=None, save_ self.workflow.do_engine_steps() self.workflow.refresh_waiting_tasks() - ready_tasks = self.workflow.get_ready_user_tasks() + task_info = any_task.task_spec.task_info(any_task) + self.assertEqual(len(task_info['completed']), 0) + self.assertEqual(len(task_info['running']), 1) + self.assertEqual(len(task_info['future']), 2) + self.assertEqual(len(task_info['instance_map']), 1) + + ready_tasks = self.workflow.get_ready_user_tasks() while len(ready_tasks) > 0: self.assertEqual(len(ready_tasks), 1) task = ready_tasks[0] @@ -32,7 +38,19 @@ def set_io_and_run_workflow(self, data, data_input=None, data_output=None, save_ ready_tasks = self.workflow.get_ready_user_tasks() self.workflow.do_engine_steps() + + any_task = self.workflow.get_tasks_from_spec_name('any_task')[0] + task_info = any_task.task_spec.task_info(any_task) + self.assertEqual(len(task_info['completed']), 3) + self.assertEqual(len(task_info['running']), 0) + self.assertEqual(len(task_info['future']), 0) + self.assertEqual(len(task_info['instance_map']), 3) + children = self.workflow.get_tasks_from_spec_name('any_task [child]') + for child in children: + info = child.task_spec.task_info(child) + instance = info['instance'] + self.assertEqual(task_info['instance_map'][instance], str(child.id)) self.assertEqual(len(children), 3) self.assertTrue(self.workflow.is_completed()) diff --git a/tests/SpiffWorkflow/bpmn/StandardLoopTest.py b/tests/SpiffWorkflow/bpmn/StandardLoopTest.py index 18a81976..f1587cc2 100644 --- a/tests/SpiffWorkflow/bpmn/StandardLoopTest.py +++ b/tests/SpiffWorkflow/bpmn/StandardLoopTest.py @@ -15,6 +15,13 @@ def testLoopMaximum(self): start = self.workflow.get_tasks_from_spec_name('StartEvent_1') start[0].data['done'] = False + + any_task = self.workflow.get_tasks_from_spec_name('any_task')[0] + task_info = any_task.task_spec.task_info(any_task) + self.assertEqual(task_info['iterations_completed'], 0) + self.assertEqual(task_info['iterations_remaining'], 3) + self.assertEqual(len(task_info['instance_map']), 0) + for idx in range(3): self.workflow.do_engine_steps() self.workflow.refresh_waiting_tasks() @@ -22,8 +29,16 @@ def testLoopMaximum(self): self.assertEqual(len(ready_tasks), 1) ready_tasks[0].data[str(idx)] = True ready_tasks[0].run() + task_info = ready_tasks[0].task_spec.task_info(ready_tasks[0]) + self.assertEqual(task_info['iteration'], idx) self.workflow.do_engine_steps() + any_task = self.workflow.get_tasks_from_spec_name('any_task')[0] + task_info = any_task.task_spec.task_info(any_task) + self.assertEqual(task_info['iterations_completed'], 3) + self.assertEqual(task_info['iterations_remaining'], 0) + self.assertEqual(len(task_info['instance_map']), 3) + self.assertTrue(self.workflow.is_completed()) def testLoopCondition(self):