Skip to content

Commit

Permalink
Merge pull request #718 from djarecka/env
Browse files Browse the repository at this point in the history
[WIP] cleaning, fixing tests
  • Loading branch information
djarecka committed Nov 3, 2023
2 parents 9cc0904 + 8d60dd1 commit 58038f5
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 214 deletions.
111 changes: 74 additions & 37 deletions pydra/engine/environments.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,42 @@


class Environment:
"""
Base class for environments that are used to execute tasks.
Right now it is asssumed that the environment, including container images,
are available and are not removed at the end
TODO: add setup and teardown methods
"""

def setup(self):
pass

Check warning on line 15 in pydra/engine/environments.py

View check run for this annotation

Codecov / codecov/patch

pydra/engine/environments.py#L15

Added line #L15 was not covered by tests

def execute(self, task):
"""
Execute the task in the environment.
Parameters
----------
task : TaskBase
the task to execute
Returns
-------
output
Output of the task.
"""
raise NotImplementedError

Check warning on line 31 in pydra/engine/environments.py

View check run for this annotation

Codecov / codecov/patch

pydra/engine/environments.py#L31

Added line #L31 was not covered by tests

def teardown(self):
pass

Check warning on line 34 in pydra/engine/environments.py

View check run for this annotation

Codecov / codecov/patch

pydra/engine/environments.py#L34

Added line #L34 was not covered by tests


class Native(Environment):
"""
Native environment, i.e. the tasks are executed in the current python environment.
"""

def execute(self, task):
# breakpoint()
# args = task.render_arguments_in_root()
keys = ["return_code", "stdout", "stderr"]
values = execute(task.command_args(), strip=task.strip)
output = dict(zip(keys, values))
Expand All @@ -31,43 +53,65 @@ def execute(self, task):
return output


class Docker(Environment):
def __init__(self, image, tag="latest", output_cpath="/output_pydra", xargs=None):
class Container(Environment):
"""
Base class for container environments used by Docker and Singularity.
Parameters
----------
image : str
Name of the container image
tag : str
Tag of the container image
output_cpath : str
Path to the output directory in the container
xargs : Union[str, List[str]]
Extra arguments to be passed to the container
"""

def __init__(self, image, tag="latest", root="/mnt/pydra", xargs=None):
self.image = image
self.tag = tag
if xargs is None:
xargs = []
elif isinstance(xargs, str):
xargs = xargs.split()
self.xargs = xargs
self.output_cpath = output_cpath
self.root = root

@staticmethod
def bind(loc, mode="ro", root="/mnt/pydra"): # TODO
# XXX Failure mode: {loc} overwrites a critical directory in image
# To fix, we'll need to update any args within loc to a new location
# such as /mnt/pydra/loc
def bind(loc, mode="ro", root="/mnt/pydra"):
loc_abs = Path(loc).absolute()
return f"{loc_abs}:{root}{loc_abs}:{mode}" # TODO: moving entire path?
return f"{loc_abs}:{root}{loc_abs}:{mode}"


def execute(self, task, root="/mnt/pydra"):
# XXX Need to mount all input locations
class Docker(Container):
"""Docker environment."""

def execute(self, task):
docker_img = f"{self.image}:{self.tag}"
# TODO ?
# Skips over any inputs in task.cache_dir
# Needs to include `out_file`s when not relative to working dir
# Possibly a `TargetFile` type to distinguish between `File` and `str`?
mounts = task.get_bindings(root=root)
# mounting all input locations
mounts = task.get_bindings(root=self.root)

# todo adding xargsy etc
docker_args = ["docker", "run", "-v", self.bind(task.cache_dir, "rw")]
docker_args = [
"docker",
"run",
"-v",
self.bind(task.cache_dir, "rw", self.root),
]
docker_args.extend(self.xargs)
docker_args.extend(
" ".join(
[f"-v {key}:{val[0]}:{val[1]}" for (key, val) in mounts.items()]
).split()
)
docker_args.extend(["-w", f"{root}{task.output_dir}"])
docker_args.extend(["-w", f"{self.root}{task.output_dir}"])
keys = ["return_code", "stdout", "stderr"]
# print("\n Docker args", docker_args)

values = execute(
docker_args + [docker_img] + task.command_args(root="/mnt/pydra"),
docker_args + [docker_img] + task.command_args(root=self.root),
strip=task.strip,
)
output = dict(zip(keys, values))
Expand All @@ -76,39 +120,35 @@ def execute(self, task, root="/mnt/pydra"):
raise RuntimeError(output["stderr"])

Check warning on line 120 in pydra/engine/environments.py

View check run for this annotation

Codecov / codecov/patch

pydra/engine/environments.py#L120

Added line #L120 was not covered by tests
else:
raise RuntimeError(output["stdout"])

Check warning on line 122 in pydra/engine/environments.py

View check run for this annotation

Codecov / codecov/patch

pydra/engine/environments.py#L122

Added line #L122 was not covered by tests
# Any outputs that have been created with a re-rooted path need
# to be de-rooted
# task.finalize_outputs("/mnt/pydra") TODO: probably don't need it
return output


class Singularity(Docker):
def execute(self, task, root="/mnt/pydra"):
# XXX Need to mount all input locations
class Singularity(Container):
"""Singularity environment."""

def execute(self, task):
singularity_img = f"{self.image}:{self.tag}"
# TODO ?
# Skips over any inputs in task.cache_dir
# Needs to include `out_file`s when not relative to working dir
# Possibly a `TargetFile` type to distinguish between `File` and `str`?
mounts = task.get_bindings(root=root)
# mounting all input locations
mounts = task.get_bindings(root=self.root)

# todo adding xargsy etc
singularity_args = [
"singularity",
"exec",
"-B",
self.bind(task.cache_dir, "rw"),
self.bind(task.cache_dir, "rw", self.root),
]
singularity_args.extend(self.xargs)
singularity_args.extend(
" ".join(
[f"-B {key}:{val[0]}:{val[1]}" for (key, val) in mounts.items()]
).split()
)
singularity_args.extend(["--pwd", f"{root}{task.output_dir}"])
singularity_args.extend(["--pwd", f"{self.root}{task.output_dir}"])
keys = ["return_code", "stdout", "stderr"]

values = execute(
singularity_args + [singularity_img] + task.command_args(root="/mnt/pydra"),
singularity_args + [singularity_img] + task.command_args(root=self.root),
strip=task.strip,
)
output = dict(zip(keys, values))
Expand All @@ -117,7 +157,4 @@ def execute(self, task, root="/mnt/pydra"):
raise RuntimeError(output["stderr"])

Check warning on line 157 in pydra/engine/environments.py

View check run for this annotation

Codecov / codecov/patch

pydra/engine/environments.py#L157

Added line #L157 was not covered by tests
else:
raise RuntimeError(output["stdout"])

Check warning on line 159 in pydra/engine/environments.py

View check run for this annotation

Codecov / codecov/patch

pydra/engine/environments.py#L159

Added line #L159 was not covered by tests
# Any outputs that have been created with a re-rooted path need
# to be de-rooted
# task.finalize_outputs("/mnt/pydra") TODO: probably don't need it
return output
17 changes: 0 additions & 17 deletions pydra/engine/specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -676,23 +676,6 @@ def _check_requires(self, fld, inputs):
return False


@attr.s(auto_attribs=True, kw_only=True)
class ContainerSpec(ShellSpec):
"""Refine the generic command-line specification to container execution."""

image: ty.Union[File, str] = attr.ib(
metadata={"help_string": "image", "mandatory": True}
)
"""The image to be containerized."""
container: ty.Union[File, str, None] = attr.ib(
metadata={"help_string": "container"}
)
"""The container."""
container_xargs: ty.Optional[ty.List[str]] = attr.ib(
default=None, metadata={"help_string": "todo"}
)


@attr.s
class LazyInterface:
_task: "core.TaskBase" = attr.ib()
Expand Down
Loading

0 comments on commit 58038f5

Please sign in to comment.