Skip to content

Commit

Permalink
split workflow & task logs, add state and update time to task logs
Browse files Browse the repository at this point in the history
  • Loading branch information
essweine committed Jul 16, 2024
1 parent cd9c3eb commit e959112
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 38 deletions.
6 changes: 3 additions & 3 deletions SpiffWorkflow/bpmn/specs/data_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

from SpiffWorkflow.bpmn.exceptions import WorkflowDataException

data_log = logging.getLogger('spiff.data')
logger = logging.getLogger('spiff.data')


class BpmnDataSpecification:
Expand Down Expand Up @@ -73,7 +73,7 @@ def get(self, my_task):
raise WorkflowDataException(message, my_task, data_input=self)

my_task.data[self.bpmn_id] = deepcopy(wf.data_objects[self.bpmn_id])
data_log.info(f'Read workflow variable {self.bpmn_id}', extra=my_task.log_info())
logger.info(f'Read workflow variable', extra=my_task.collect_log_extras({'bpmn_id': self.bpmn_id}))

def set(self, my_task):
"""Copy a value from the task data to the workflow data"""
Expand All @@ -88,7 +88,7 @@ def set(self, my_task):

wf.data_objects[self.bpmn_id] = deepcopy(my_task.data[self.bpmn_id])
del my_task.data[self.bpmn_id]
data_log.info(f'Set workflow variable {self.bpmn_id}', extra=my_task.log_info())
logger.info(f'Set workflow variable', extra=my_task.collect_log_extras({'bpmn_id': self.bpmn_id}))

def delete(self, my_task):
my_task.data.pop(self.bpmn_id, None)
Expand Down
1 change: 0 additions & 1 deletion SpiffWorkflow/bpmn/util/diff.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ def _aligned(self, original, candidates):
all(first is not None and first.name == second.name for first, second in zip(subs, candidates))

def _compare_task_specs(self, spec, candidate):

s1 = self._registry.convert(spec)
s2 = self._registry.convert(candidate)
if s1.get('typename') != s2.get('typename'):
Expand Down
6 changes: 3 additions & 3 deletions SpiffWorkflow/operators.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import logging
import re

logger = logging.getLogger('spiff')
logger = logging.getLogger('spiff.task')


class Term(object):
Expand Down Expand Up @@ -182,7 +182,7 @@ def valueof(scope, op, default=None):
return default
elif isinstance(op, Attrib):
if op.name not in scope.data:
logger.debug("Attrib('{op.name}') not present in task data", extra=scope.log_info({'data': scope.data}))
logger.debug("Attrib('{op.name}') not present in task data", extra=scope.collect_log_extras({'data': scope.data}))
return scope.get_data(op.name, default)
elif isinstance(op, PathAttrib):
if not op.path:
Expand All @@ -191,7 +191,7 @@ def valueof(scope, op, default=None):
data = scope.data
for part in parts:
if part not in data:
logger.debug(f"PathAttrib('{op.name}') not present in task data", extra=scope.log_info({'data': scope.data}))
logger.debug(f"PathAttrib('{op.name}') not present in task data", extra=scope.collect_log_extras({'data': scope.data}))
return default
data = data[part] # move down the path
return data
Expand Down
4 changes: 2 additions & 2 deletions SpiffWorkflow/specs/Transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

from .base import TaskSpec

logger = logging.getLogger('spiff')
logger = logging.getLogger('spiff.task')


class Transform(TaskSpec):
Expand Down Expand Up @@ -56,7 +56,7 @@ def _update_hook(self, my_task):

if self.transforms:
for transform in self.transforms:
logger.debug('Execute transform', extra=my_task.log_info({'transform': transform}))
logger.debug('Execute transform', extra=my_task.collect_log_extras({'transform': transform}))
exec(transform)
return True

Expand Down
26 changes: 12 additions & 14 deletions SpiffWorkflow/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@
from .util.deep_merge import DeepMerge
from .exceptions import WorkflowException

logger = logging.getLogger('spiff')
metrics = logging.getLogger('spiff.metrics')
data_log = logging.getLogger('spiff.data')
logger = logging.getLogger('spiff.task')


class Task(object):
Expand Down Expand Up @@ -100,7 +98,6 @@ def state(self):

@state.setter
def state(self, value):

if value < self._state:
raise WorkflowException(
'state went from %s to %s!' % (TaskState.get_name(self._state), TaskState.get_name(value)),
Expand Down Expand Up @@ -173,6 +170,7 @@ def reset_branch(self, data):
Returns:
list(`Task`): tasks removed from the tree
"""
logger.info(f'Branch reset', extra=self.collect_log_extras())
self.internal_data = {}
self.data = deepcopy(self.parent.data) if data is None else data
descendants = [t for t in self]
Expand Down Expand Up @@ -294,11 +292,11 @@ def _set_state(self, value):
"""Force set the state on a task"""

if value != self.state:
logger.info(f'State change to {TaskState.get_name(value)}', extra=self.log_info())
self.last_state_change = time.time()
self._state = value
logger.info(f'State changed to {TaskState.get_name(value)}', extra=self.collect_log_extras())
else:
logger.debug(f'State set to {TaskState.get_name(value)}', extra=self.log_info())
logger.debug(f'State set to {TaskState.get_name(value)}', extra=self.collect_log_extras())

def _assign_new_thread_id(self, recursive=True):
"""Assigns a new thread id to the task."""
Expand Down Expand Up @@ -347,11 +345,6 @@ def run(self):
"""
start = time.time()
retval = self.task_spec._run(self)
extra = self.log_info({
'action': 'Complete',
'elapsed': time.time() - start
})
metrics.debug('', extra=extra)
if retval is None:
self._set_state(TaskState.STARTED)
elif retval is False:
Expand Down Expand Up @@ -388,17 +381,22 @@ def trigger(self, *args):
"""
self.task_spec._on_trigger(self, *args)

def log_info(self, dct=None):
def collect_log_extras(self, dct=None):
"""Return logging details for this task"""
extra = dct or {}
extra.update({
'workflow_spec': self.workflow.spec.name,
'task_spec': self.task_spec.name,
'task_id': self.id,
'task_type': self.task_spec.__class__.__name__,
'data': self.data if logger.level < 20 else None,
'internal_data': self.internal_data if logger.level <= 10 else None,
'state': TaskState.get_name(self._state),
'last_state_change': self.last_state_change,
})
if logger.level < 20:
extra.update({
'data': self.data if logger.level < 20 else None,
'internal_data': self.internal_data if logger.level < 20 else None,
})
return extra

def __iter__(self):
Expand Down
30 changes: 15 additions & 15 deletions SpiffWorkflow/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from .util.event import Event
from .exceptions import TaskNotFoundException, WorkflowException

logger = logging.getLogger('spiff')
logger = logging.getLogger('spiff.workflow')


class Workflow(object):
Expand Down Expand Up @@ -64,8 +64,8 @@ def __init__(self, workflow_spec, deserializing=False):
if not deserializing:
self.task_tree = Task(self, self.spec.start, state=TaskState.FUTURE)
self.task_tree.task_spec._predict(self.task_tree, mask=TaskState.NOT_FINISHED_MASK)
logger.info('Initialized workflow', extra=self.collect_log_extras())
self.task_tree._ready()
logger.info('Initialize', extra=self.log_info())

def is_completed(self):
"""Checks whether the workflow is complete.
Expand Down Expand Up @@ -215,13 +215,13 @@ def cancel(self, success=False):
list(`Task`): the cancelled tasks
"""
self.success = success
cancel = []
logger.info(f'Workflow cancelled', extra=self.collect_log_extras())
cancelled = []
for task in TaskIterator(self.task_tree, state=TaskState.NOT_FINISHED_MASK):
cancel.append(task)
for task in cancel:
cancelled.append(task)
for task in cancelled:
task.cancel()
logger.info(f'Cancel with {len(cancel)} remaining', extra=self.log_info())
return cancel
return cancelled

def set_data(self, **kwargs):
"""Defines the given attribute/value pairs."""
Expand Down Expand Up @@ -254,16 +254,16 @@ def reset_from_task_id(self, task_id, data=None):
self.last_task = task.parent
return task.reset_branch(data)

def log_info(self, dct=None):
def collect_log_extras(self, dct=None):
"""Return logging details for this workflow"""
extra = dct or {}
extra.update({
'workflow_spec': self.spec.name,
'task_spec': None,
'task_type': None,
'task_id': None,
'data': None,
})
extra.update({'workflow_spec': self.spec.name})
if logger.level < 20:
extra.update({
'finished': len([t for t in self.tasks.values() if t.has_state(TaskState.FINISHED_MASK)]),
'definite': len([t for t in self.tasks.values() if t.has_state(TaskState.DEFINITE_MASK)]),
'predicted': len([t for t in self.tasks.values() if t.has_state(TaskState.PREDICTED_MASK)]),
})
return extra

def _predict(self, mask=TaskState.NOT_FINISHED_MASK):
Expand Down

0 comments on commit e959112

Please sign in to comment.