Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding BoshTask #231

Merged
merged 20 commits into from
May 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
8293c71
Adding BoshTask - Shell Command Task that uses boutiques descriptor …
djarecka Apr 9, 2020
7576fdc
fixing bindings for input; temp. default for output fields
djarecka Apr 16, 2020
00c668d
fixing output spec for bosh; changing output_file_template so it work…
djarecka Apr 22, 2020
8f2e841
small changes in bosh input/output specs; adding tests with workflows
djarecka Apr 22, 2020
0073e15
Merge branch 'master' into enh/bosh
djarecka Apr 22, 2020
bd7b312
adding shell task to the bosh wf
djarecka Apr 22, 2020
f9bf7ac
skipping bosh for windows
djarecka Apr 22, 2020
7b88604
adding a simple retry for pulling zenodo file
djarecka Apr 22, 2020
07d61fc
testing json decoder error
djarecka Apr 27, 2020
2549f61
testing json decoder error
djarecka Apr 27, 2020
73fa5ca
adding retry logic to the json.load of zenodo file (randomly one test…
djarecka Apr 27, 2020
1fa629d
adding retries number
djarecka Apr 27, 2020
2a2c3d8
fix
djarecka Apr 27, 2020
8bdeed1
changing the way how the zenodo file is downloaded
djarecka Apr 28, 2020
46ad553
removing run_task from boshtask, small edit to run_task from shellcom…
djarecka Apr 28, 2020
f171828
using input/output_spec_names instead of input/output_spec for BoshTa…
djarecka Apr 28, 2020
53bd585
checking if retry logic still needed with travis
djarecka Apr 30, 2020
d1c386d
removing retry block, looks like travis is fine after changing downlo…
djarecka Apr 30, 2020
7a7b1b3
adding pytest-rerunfailures and reruning the first boutique test (fai…
djarecka May 3, 2020
65cdd56
Merge branch 'master' into enh/bosh
djarecka May 3, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
215 changes: 215 additions & 0 deletions pydra/engine/boutiques.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
import typing as ty
import json
import attr
from urllib.request import urlretrieve
from pathlib import Path
from functools import reduce

from ..utils.messenger import AuditFlag
from ..engine import ShellCommandTask
from ..engine.specs import SpecInfo, ShellSpec, ShellOutSpec, File, attr_fields
from .helpers_file import is_local_file


class BoshTask(ShellCommandTask):
"""Shell Command Task based on the Boutiques descriptor"""

def __init__(
self,
zenodo_id=None,
bosh_file=None,
audit_flags: AuditFlag = AuditFlag.NONE,
cache_dir=None,
input_spec_names: ty.Optional[ty.List] = None,
messenger_args=None,
messengers=None,
name=None,
output_spec_names: ty.Optional[ty.List] = None,
rerun=False,
strip=False,
**kwargs,
):
"""
Initialize this task.

Parameters
----------
zenodo_id: :obj: str
Zenodo ID
bosh_file : : str
json file with the boutiques descriptors
audit_flags : :obj:`pydra.utils.messenger.AuditFlag`
Auditing configuration
cache_dir : :obj:`os.pathlike`
Cache directory
input_spec_names : :obj: list
Input names for input_spec.
messenger_args :
TODO
messengers :
TODO
name : :obj:`str`
Name of this task.
output_spec_names : :obj: list
Output names for output_spec.
strip : :obj:`bool`
TODO

"""
self.cache_dir = cache_dir
if (bosh_file and zenodo_id) or not (bosh_file or zenodo_id):
raise Exception("either bosh or zenodo_id has to be specified")
elif zenodo_id:
self.bosh_file = self._download_spec(zenodo_id)
else: # bosh_file
self.bosh_file = bosh_file

with self.bosh_file.open() as f:
self.bosh_spec = json.load(f)

self.input_spec = self._prepare_input_spec(names_subset=input_spec_names)
self.output_spec = self._prepare_output_spec(names_subset=output_spec_names)
self.bindings = ["-v", f"{self.bosh_file.parent}:{self.bosh_file.parent}:ro"]

super(BoshTask, self).__init__(
name=name,
input_spec=self.input_spec,
output_spec=self.output_spec,
executable=["bosh", "exec", "launch"],
args=["-s"],
audit_flags=audit_flags,
messengers=messengers,
messenger_args=messenger_args,
cache_dir=self.cache_dir,
strip=strip,
rerun=rerun,
**kwargs,
)
self.strip = strip

def _download_spec(self, zenodo_id):
"""
usind boutiques Searcher to find url of zenodo file for a specific id,
and download the file to self.cache_dir
"""
from boutiques.searcher import Searcher

searcher = Searcher(zenodo_id, exact_match=True)
hits = searcher.zenodo_search().json()["hits"]["hits"]
if len(hits) == 0:
raise Exception(f"can't find zenodo spec for {zenodo_id}")
elif len(hits) > 1:
raise Exception(f"too many hits for {zenodo_id}")
else:
zenodo_url = hits[0]["files"][0]["links"]["self"]
zenodo_file = self.cache_dir / f"zenodo.{zenodo_id}.json"
urlretrieve(zenodo_url, zenodo_file)
return zenodo_file

def _prepare_input_spec(self, names_subset=None):
""" creating input spec from the zenodo file
if name_subset provided, only names from the subset will be used in the spec
"""
binputs = self.bosh_spec["inputs"]
self._input_spec_keys = {}
fields = []
for input in binputs:
name = input["id"]
if names_subset is None:
pass
elif name not in names_subset:
continue
else:
names_subset.remove(name)
if input["type"] == "File":
tp = File
elif input["type"] == "String":
tp = str
elif input["type"] == "Number":
tp = float
elif input["type"] == "Flag":
tp = bool
else:
tp = None
# adding list
if tp and "list" in input and input["list"]:
tp = ty.List[tp]

mdata = {
"help_string": input.get("description", None) or input["name"],
"mandatory": not input["optional"],
"argstr": input.get("command-line-flag", None),
}
fields.append((name, tp, mdata))
self._input_spec_keys[input["value-key"]] = "{" + f"{name}" + "}"
if names_subset:
raise RuntimeError(f"{names_subset} are not in the zenodo input spec")
spec = SpecInfo(name="Inputs", fields=fields, bases=(ShellSpec,))
return spec

def _prepare_output_spec(self, names_subset=None):
""" creating output spec from the zenodo file
if name_subset provided, only names from the subset will be used in the spec
"""
boutputs = self.bosh_spec["output-files"]
fields = []
for output in boutputs:
name = output["id"]
if names_subset is None:
pass
elif name not in names_subset:
continue
else:
names_subset.remove(name)
path_template = reduce(
lambda s, r: s.replace(*r),
self._input_spec_keys.items(),
output["path-template"],
)
mdata = {
"help_string": output.get("description", None) or output["name"],
"mandatory": not output["optional"],
"output_file_template": path_template,
}
fields.append((name, attr.ib(type=File, metadata=mdata)))

if names_subset:
raise RuntimeError(f"{names_subset} are not in the zenodo output spec")
spec = SpecInfo(name="Outputs", fields=fields, bases=(ShellOutSpec,))
return spec

def _command_args_single(self, state_ind, ind=None):
"""Get command line arguments for a single state"""
input_filepath = self._bosh_invocation_file(state_ind=state_ind, ind=ind)
cmd_list = (
self.inputs.executable
+ [str(self.bosh_file), input_filepath]
+ self.inputs.args
+ self.bindings
)
return cmd_list

def _bosh_invocation_file(self, state_ind, ind=None):
"""creating bosh invocation file - json file with inputs values"""
input_json = {}
for f in attr_fields(self.inputs):
if f.name in ["executable", "args"]:
continue
if self.state and f"{self.name}.{f.name}" in state_ind:
value = getattr(self.inputs, f.name)[state_ind[f"{self.name}.{f.name}"]]
else:
value = getattr(self.inputs, f.name)
# adding to the json file if specified by the user
if value is not attr.NOTHING and value != "NOTHING":
if is_local_file(f):
value = Path(value)
self.bindings.extend(["-v", f"{value.parent}:{value.parent}:ro"])
value = str(value)

input_json[f.name] = value

filename = self.cache_dir / f"{self.name}-{ind}.json"
with open(filename, "w") as jsonfile:
json.dump(input_json, jsonfile)

return str(filename)
23 changes: 18 additions & 5 deletions pydra/engine/specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,9 +311,8 @@ def collect_additional_outputs(self, input_spec, inputs, output_dir):
if fld.type is File:
# assuming that field should have either default or metadata, but not both
if (
not (fld.default is None or fld.default == attr.NOTHING)
and fld.metadata
):
fld.default is None or fld.default == attr.NOTHING
) and not fld.metadata: # TODO: is it right?
raise Exception("File has to have default value or metadata")
elif not fld.default == attr.NOTHING:
additional_out[fld.name] = self._field_defaultvalue(
Expand Down Expand Up @@ -360,9 +359,23 @@ def _field_metadata(self, fld, inputs, output_dir):
if "value" in fld.metadata:
return output_dir / fld.metadata["value"]
elif "output_file_template" in fld.metadata:
return output_dir / fld.metadata["output_file_template"].format(
**inputs.__dict__
sfx_tmpl = (output_dir / fld.metadata["output_file_template"]).suffixes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how do you detect suffixes. for imaging files or other modalities this may need to be quite custom? for example nii.gz, BRIK/HEAD, etc.,.

how will pydra generalize this across domains?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my main problem was that input_1 could be filename, filename.nii or filename.nii.gz and output should be always filename.nii.gz, and the templaete is [input_1].nii.gz...
so I decided to remove all suffixes if template has it's own, happy to hear better suggestions

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is that specific to bosh or boutiques that output is always compressed nifti? also does bosh/boutiques enforce nifti as output? i don't think anything in boutiques enforces it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you see example for bet - you will see that "output-files" has "path-template": "[MASK].nii.gz", but maskfile (that has "value-key": "[MASK]"), could be either filename or filename.nii.gz. For both cases output-files is filename.nii.gz

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that may be true of bet, but boutiques can support other tools as well. so there needs to be a general way of deciding what the outputs would be.

this is the part where knowing what outputs should be created given the inputs plays a role, but i don't think that's captured in boutiques yet. so as a first pass, you can simply leave inputs/outputs as separate things.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understood that it's not always a case, but I thought this is a good start. Not sure how do you want me to keep it separately. From the zenodo file I have only a template that uses inputs values

if sfx_tmpl:
# removing suffix from input field if template has it's own suffix
inputs_templ = {
k: v.split(".")[0]
for k, v in inputs.__dict__.items()
if isinstance(v, str)
}
else:
inputs_templ = {
k: v for k, v in inputs.__dict__.items() if isinstance(v, str)
}
out_path = output_dir / fld.metadata["output_file_template"].format(
**inputs_templ
)
return out_path

elif "callable" in fld.metadata:
return fld.metadata["callable"](fld.name, output_dir)
else:
Expand Down
7 changes: 5 additions & 2 deletions pydra/engine/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,13 +399,16 @@ def _run_task(self):
else:
args = self.command_args
if args:
# removing emty strings
# removing empty strings
args = [str(el) for el in args if el not in ["", " "]]
keys = ["return_code", "stdout", "stderr"]
values = execute(args, strip=self.strip)
self.output_ = dict(zip(keys, values))
if self.output_["return_code"]:
raise RuntimeError(self.output_["stderr"])
if self.output_["stderr"]:
raise RuntimeError(self.output_["stderr"])
else:
raise RuntimeError(self.output_["stdout"])


class ContainerTask(ShellCommandTask):
Expand Down
Binary file added pydra/engine/tests/data_tests/test.nii.gz
Binary file not shown.
Loading