Skip to content

Commit

Permalink
expand TaskBase docstring, remove unused attributes
Browse files Browse the repository at this point in the history
  • Loading branch information
nicolocin committed Jul 23, 2020
1 parent 7194314 commit 7daa7af
Showing 1 changed file with 85 additions and 39 deletions.
124 changes: 85 additions & 39 deletions pydra/engine/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,39 +49,69 @@

class TaskBase:
"""
A base structure for the nodes in the processing graph.
The class that acts as a base structure for all tasks in a graph.
Tasks are a generic compute step from which both elemntary tasks and
A task is a generic compute step from which both :class:`FunctionTask` and
:class:`Workflow` instances inherit.
TODO: Which functions ares in Tasbase that are not overwritten in subclasses
Attributes
----------
name : :obj:`str`
Unique name of this node.
inputs : : TODO
XXXX
input_names : :obj:`List` of :obj:`str`
Name(s) of input(s) to this node.
input_spec : : TODO
XXXX
cache_dir : :obj:`os.pathlike` or None
Path to directory to store cache. Save new cache here if prior cache couldn't be
found here or in `cache_locations`.
cache_locations : obj:`os.pathlike` or :obj:`list` of :obj:`os.pathlike` or None
Path or list of paths to search for cached results. Reuse cache if found.
allow_cache_override : :obj:`bool`
TODO
rerun : :obj:`bool`
Whether results should be checked (does not propagate to nodes)
audit : :class:`Audit`
Configure provenance tracking. Default is no provenance tracking.
TODO: Finish this
"""

_api_version: str = "0.0.1" # Should generally not be touched by subclasses
_etelemetry_version_data = None # class variable to store etelemetry information
_version: str # Version of tool being wrapped
_task_version: ty.Optional[str] = None
# Task writers encouraged to define and increment when implementation changes sufficiently
_input_sets = None # Dictionaries of predefined input settings

audit_flags: AuditFlag = AuditFlag.NONE
"""What to audit -- available flags: :class:`~pydra.utils.messenger.AuditFlag`."""

_can_resume = False # Does the task allow resuming from previous state
_redirect_x = False # Whether an X session should be created/directed

_runtime_requirements = RuntimeSpec()
_runtime_hints = None

# Task writers encouraged to define and increment when implementation changes sufficiently
_cache_dir = None # Working directory in which to operate
_references = None # List of references for a task
_input_sets = None # Dictionaries of predefined input settings

audit_flags: AuditFlag = AuditFlag.NONE
"""What to audit -- available flags: :class:`~pydra.utils.messenger.AuditFlag`."""

def __init__(
self,
name: str,
audit_flags: AuditFlag = AuditFlag.NONE,
inputs: ty.Optional[ty.Union[ty.Text, File, ty.Dict]] = None,
cache_dir=None,
cache_locations=None,
inputs: ty.Optional[ty.Union[ty.Text, File, ty.Dict]] = None,
rerun=False,
messengers=None,
messenger_args=None,
Expand Down Expand Up @@ -109,38 +139,40 @@ def __init__(
audit_flags : :class:`AuditFlag`, optional
Configure provenance tracking. Default is no provenance tracking.
See available flags at :class:`~pydra.utils.messenger.AuditFlag`.
inputs : :obj:`typing.Text`, or :class:`File`, or :obj:`dict`, or `None`.
Name(s) of input(s) to this node.
cache_dir : :obj:`os.pathlike` or None
Path to directory to store cache. Save new cache here if prior cache couldn't be
found here or in `cache_locations`.
cache_locations : obj:`os.pathlike` or :obj:`list` of :obj:`os.pathlike` or None
Path or list of paths to search for cached results. Reuse cache if found.
inputs : :obj:`typing.Text`, or :class:`File`, or :obj:`dict`, or `None`.
Name(s) of input(s) to this node.
File path(s) are given if inputs are files.
Use in attributes `inputs` and `input_names`.
rerun : :obj:`bool`
TODO
Whether results should be checked (does not propagate to nodes)
Other Parameters
----------
messenger : :class:`Messenger` or :obj:`list` of :class:`Messenger` or None
Messenger(s) used by Audit.
Messenger(s) used by Audit. Saved in the `audit` attribute.
See available flags at :class:`~pydra.utils.messenger.Messenger`.
messengers_args : TODO what type?
Argument(s) used by `messenger`
Argument(s) used by `messegner`. Saved in the `audit` attribute.
"""
from .. import check_latest_version

if TaskBase._etelemetry_version_data is None:
TaskBase._etelemetry_version_data = check_latest_version()

# raise error if name is same as of attributes
if name in dir(self):
if name in dir(self): # raise error if name is same as of attributes
raise ValueError("Cannot use names of attributes or methods as task name")
self.name = name
if not self.input_spec:

# set inputs: TODO should we type check in spec?
if not self.input_spec: # `input_spec` saves information in inputs/outputs
raise Exception("No input_spec in class: %s" % self.__class__.__name__)
klass = make_klass(self.input_spec)
# todo should be used to input_check in spec??
self.inputs = klass(
**{
(f.name[1:] if f.name.startswith("_") else f.name): f.default
Expand All @@ -152,44 +184,46 @@ def __init__(
for field in attr.fields(klass)
if field.name not in ["_func", "_graph_checksums"]
]
# dictionary to save the connections with lazy fields
self.inp_lf = {}
self.state = None
self._output = {}
self._result = {}
# flag that says if node finished all jobs
self._done = False

# load inputs
if self._input_sets is None:
self._input_sets = {}
if inputs:
if isinstance(inputs, dict):
inputs = {k: v for k, v in inputs.items() if k in self.input_names}
elif Path(inputs).is_file():
inputs = json.loads(Path(inputs).read_text())
self.inputs = attr.evolve(self.inputs, **inputs)
self.inputs.check_metadata()
elif isinstance(inputs, str):
if self._input_sets is None or inputs not in self._input_sets:
raise ValueError(f"Unknown input set {inputs!r}")
inputs = self._input_sets[inputs]
self.inputs = attr.evolve(self.inputs, **inputs)
self.inputs.check_metadata()
self.state_inputs = inputs

self.cache_dir = cache_dir
self.cache_locations = cache_locations
self.allow_cache_override = True

self.inp_lf = {} # dict to save the connections with lazy fields
self._output = {}
self._result = {}

self.plugin = None
self.state = None
self.task_rerun = rerun

self._checksum = None
self._done = False # if node finished all jobs
self._errored = False

# messenger and hooks
self.audit = Audit(
audit_flags=audit_flags,
messengers=messengers,
messenger_args=messenger_args,
develop=develop,
)
self.cache_dir = cache_dir
self.cache_locations = cache_locations
self.allow_cache_override = True
self._checksum = None
# if True the results are not checked (does not propagate to nodes)
self.task_rerun = rerun

self.plugin = None
self.hooks = TaskHook()
self._errored = False

def __str__(self):
return self.name
Expand Down Expand Up @@ -469,7 +503,6 @@ def split(self, splitter, overwrite=False, **kwargs):
)
if kwargs:
self.inputs = attr.evolve(self.inputs, **kwargs)
self.state_inputs = kwargs
if not self.state or splitter != self.state.splitter:
self.set_state(splitter)
return self
Expand Down Expand Up @@ -577,6 +610,18 @@ def done(self):
return False

def _combined_output(self, return_inputs=False):
"""
Combine outputs of a splitted task
Parameters
----------
Returns
-------
combined_results : TODO
"""
combined_results = []
for (gr, ind_l) in self.state.final_combined_ind_mapping.items():
combined_results_gr = []
Expand Down Expand Up @@ -604,13 +649,14 @@ def result(self, state_index=None, return_inputs=False):
----------
state_index : :obj: `int`
index of the element for task with splitter and multiple states
return_inputs : :obj: `bool`, :obj:`str`
return_inputs : :obj: `bool` or :obj:`str`
if True or "val" result is returned together with values of the input fields,
if "ind" result is returned together with indices of the input fields
Returns
-------
result :
result
TODO
"""
# TODO: check if result is available in load_result and
Expand Down

0 comments on commit 7daa7af

Please sign in to comment.