Skip to content

Commit

Permalink
Merge pull request #3347 from oesteban/enh/interface-cleanup
Browse files Browse the repository at this point in the history
RF: Clean-up the BaseInterface ``run()`` function using context
  • Loading branch information
effigies authored Jul 23, 2021
2 parents 72aac96 + 24f2cbc commit 7080ef9
Show file tree
Hide file tree
Showing 3 changed files with 186 additions and 130 deletions.
178 changes: 51 additions & 127 deletions nipype/interfaces/base/core.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,31 @@
# -*- coding: utf-8 -*-
# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*-
# vi: set ft=python sts=4 ts=4 sw=4 et:
"""
Nipype interfaces core
......................
Defines the ``Interface`` API and the body of the
most basic interfaces.
The I/O specifications corresponding to these base
interfaces are found in the ``specs`` module.
"""
from copy import deepcopy
from datetime import datetime as dt
import os
import platform
import subprocess as sp
import shlex
import sys
import simplejson as json
from dateutil.parser import parse as parseutc
from traits.trait_errors import TraitError

from ... import config, logging, LooseVersion
from ...utils.provenance import write_provenance
from ...utils.misc import str2bool, rgetcwd
from ...utils.filemanip import split_filename, which, get_dependencies, canonicalize_env
from ...utils.misc import str2bool
from ...utils.filemanip import (
canonicalize_env,
get_dependencies,
indirectory,
split_filename,
which,
)
from ...utils.subprocess import run_command

from ...external.due import due
Expand All @@ -39,7 +38,12 @@
MpiCommandLineInputSpec,
get_filecopy_info,
)
from .support import Bunch, InterfaceResult, NipypeInterfaceError, format_help
from .support import (
RuntimeContext,
InterfaceResult,
NipypeInterfaceError,
format_help,
)

iflogger = logging.getLogger("nipype.interface")

Expand All @@ -63,8 +67,15 @@ class Interface(object):
"""

input_spec = None # A traited input specification
output_spec = None # A traited output specification
input_spec = None
"""
The specification of the input, defined by a :py:class:`~traits.has_traits.HasTraits` class.
"""
output_spec = None
"""
The specification of the output, defined by a :py:class:`~traits.has_traits.HasTraits` class.
"""

_can_resume = False # See property below
_always_run = False # See property below

Expand Down Expand Up @@ -365,131 +376,44 @@ def run(self, cwd=None, ignore_exception=None, **inputs):
if successful, results
"""
from ...utils.profiler import ResourceMonitor

# if ignore_exception is not provided, taking self.ignore_exception
if ignore_exception is None:
ignore_exception = self.ignore_exception

# Tear-up: get current and prev directories
syscwd = rgetcwd(error=False) # Recover when wd does not exist
if cwd is None:
cwd = syscwd

os.chdir(cwd) # Change to the interface wd
rtc = RuntimeContext(
resource_monitor=config.resource_monitor and self.resource_monitor,
ignore_exception=ignore_exception
if ignore_exception is not None
else self.ignore_exception,
)

enable_rm = config.resource_monitor and self.resource_monitor
self.inputs.trait_set(**inputs)
with indirectory(cwd or os.getcwd()):
self.inputs.trait_set(**inputs)
self._check_mandatory_inputs()
self._check_version_requirements(self.inputs)
interface = self.__class__
self._duecredit_cite()

# initialize provenance tracking
store_provenance = str2bool(
config.get("execution", "write_provenance", "false")
)
env = deepcopy(dict(os.environ))
if self._redirect_x:
env["DISPLAY"] = config.get_display()

runtime = Bunch(
cwd=cwd,
prevcwd=syscwd,
returncode=None,
duration=None,
environ=env,
startTime=dt.isoformat(dt.utcnow()),
endTime=None,
platform=platform.platform(),
hostname=platform.node(),
version=self.version,
)
runtime_attrs = set(runtime.dictcopy())

mon_sp = None
if enable_rm:
mon_freq = float(config.get("execution", "resource_monitor_frequency", 1))
proc_pid = os.getpid()
iflogger.debug(
"Creating a ResourceMonitor on a %s interface, PID=%d.",
self.__class__.__name__,
proc_pid,
)
mon_sp = ResourceMonitor(proc_pid, freq=mon_freq)
mon_sp.start()
with rtc(self, cwd=cwd, redirect_x=self._redirect_x) as runtime:

# Grab inputs now, as they should not change during execution
inputs = self.inputs.get_traitsfree()
outputs = None

try:
# Grab inputs now, as they should not change during execution
inputs = self.inputs.get_traitsfree()
outputs = None
# Run interface
runtime = self._pre_run_hook(runtime)
runtime = self._run_interface(runtime)
runtime = self._post_run_hook(runtime)
# Collect outputs
outputs = self.aggregate_outputs(runtime)
except Exception as e:
import traceback

# Retrieve the maximum info fast
runtime.traceback = traceback.format_exc()
# Gather up the exception arguments and append nipype info.
exc_args = e.args if getattr(e, "args") else tuple()
exc_args += (
"An exception of type %s occurred while running interface %s."
% (type(e).__name__, self.__class__.__name__),
)
if config.get("logging", "interface_level", "info").lower() == "debug":
exc_args += ("Inputs: %s" % str(self.inputs),)

runtime.traceback_args = ("\n".join(["%s" % arg for arg in exc_args]),)

if not ignore_exception:
raise
finally:
if runtime is None or runtime_attrs - set(runtime.dictcopy()):
raise RuntimeError(
"{} interface failed to return valid "
"runtime object".format(interface.__class__.__name__)
)
# This needs to be done always
runtime.endTime = dt.isoformat(dt.utcnow())
timediff = parseutc(runtime.endTime) - parseutc(runtime.startTime)
runtime.duration = (
timediff.days * 86400 + timediff.seconds + timediff.microseconds / 1e6
)
results = InterfaceResult(
interface, runtime, inputs=inputs, outputs=outputs, provenance=None
)

# Add provenance (if required)
if store_provenance:
# Provenance will only throw a warning if something went wrong
results.provenance = write_provenance(results)

# Make sure runtime profiler is shut down
if enable_rm:
import numpy as np

mon_sp.stop()

runtime.mem_peak_gb = None
runtime.cpu_percent = None

# Read .prof file in and set runtime values
vals = np.loadtxt(mon_sp.fname, delimiter=",")
if vals.size:
vals = np.atleast_2d(vals)
runtime.mem_peak_gb = vals[:, 2].max() / 1024
runtime.cpu_percent = vals[:, 1].max()

runtime.prof_dict = {
"time": vals[:, 0].tolist(),
"cpus": vals[:, 1].tolist(),
"rss_GiB": (vals[:, 2] / 1024).tolist(),
"vms_GiB": (vals[:, 3] / 1024).tolist(),
}
os.chdir(syscwd)
results = InterfaceResult(
self.__class__,
rtc.runtime,
inputs=inputs,
outputs=outputs,
provenance=None,
)

# Add provenance (if required)
if str2bool(config.get("execution", "write_provenance", "false")):
# Provenance will only throw a warning if something went wrong
results.provenance = write_provenance(results)

self._duecredit_cite()

return results

Expand Down
98 changes: 96 additions & 2 deletions nipype/interfaces/base/support.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,113 @@
"""
import os
from contextlib import AbstractContextManager
from copy import deepcopy
from textwrap import wrap
import re
from datetime import datetime as dt
from dateutil.parser import parse as parseutc
import platform

from ... import logging
from ...utils.misc import is_container
from ... import logging, config
from ...utils.misc import is_container, rgetcwd
from ...utils.filemanip import md5, hash_infile

iflogger = logging.getLogger("nipype.interface")

HELP_LINEWIDTH = 70


class RuntimeContext(AbstractContextManager):
"""A context manager to run NiPype interfaces."""

__slots__ = ("_runtime", "_resmon", "_ignore_exc")

def __init__(self, resource_monitor=False, ignore_exception=False):
"""Initialize the context manager object."""
self._ignore_exc = ignore_exception
_proc_pid = os.getpid()
if resource_monitor:
from ...utils.profiler import ResourceMonitor
else:
from ...utils.profiler import ResourceMonitorMock as ResourceMonitor

self._resmon = ResourceMonitor(
_proc_pid,
freq=float(config.get("execution", "resource_monitor_frequency", 1)),
)

def __call__(self, interface, cwd=None, redirect_x=False):
"""Generate a new runtime object."""
# Tear-up: get current and prev directories
_syscwd = rgetcwd(error=False) # Recover when wd does not exist
if cwd is None:
cwd = _syscwd

self._runtime = Bunch(
cwd=str(cwd),
duration=None,
endTime=None,
environ=deepcopy(dict(os.environ)),
hostname=platform.node(),
interface=interface.__class__.__name__,
platform=platform.platform(),
prevcwd=str(_syscwd),
redirect_x=redirect_x,
resmon=self._resmon.fname or "off",
returncode=None,
startTime=None,
version=interface.version,
)
return self

def __enter__(self):
"""Tear-up the execution of an interface."""
if self._runtime.redirect_x:
self._runtime.environ["DISPLAY"] = config.get_display()

self._runtime.startTime = dt.isoformat(dt.utcnow())
self._resmon.start()
# TODO: Perhaps clean-up path and ensure it exists?
os.chdir(self._runtime.cwd)
return self._runtime

def __exit__(self, exc_type, exc_value, exc_tb):
"""Tear-down interface execution."""
self._runtime.endTime = dt.isoformat(dt.utcnow())
timediff = parseutc(self._runtime.endTime) - parseutc(self._runtime.startTime)
self._runtime.duration = (
timediff.days * 86400 + timediff.seconds + timediff.microseconds / 1e6
)
# Collect monitored data
for k, v in self._resmon.stop():
setattr(self._runtime, k, v)

os.chdir(self._runtime.prevcwd)

if exc_type is not None or exc_value is not None or exc_tb is not None:
import traceback

# Retrieve the maximum info fast
self._runtime.traceback = "".join(
traceback.format_exception(exc_type, exc_value, exc_tb)
)
# Gather up the exception arguments and append nipype info.
exc_args = exc_value.args if getattr(exc_value, "args") else tuple()
exc_args += (
f"An exception of type {exc_type.__name__} occurred while "
f"running interface {self._runtime.interface}.",
)
self._runtime.traceback_args = ("\n".join([f"{arg}" for arg in exc_args]),)

if self._ignore_exc:
return True

@property
def runtime(self):
return self._runtime


class NipypeInterfaceError(Exception):
"""Custom error for interfaces"""

Expand Down
Loading

0 comments on commit 7080ef9

Please sign in to comment.