Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions docs/CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,21 @@ This project adheres to `Semantic Versioning <http://semver.org/>`_.
Unreleased
==========

Changed
-------
- **BACKWARD INCOMPATIBLE:** Removed ``FLOW_DOCKER_MAPPINGS`` in favor of new
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yay 🎉!

``FLOW_DOCKER_VOLUME_EXTRA_OPTIONS`` and ``FLOW_DOCKER_EXTRA_VOLUMES``

Added
-----
- Support for passing secrets to processes in a controlled way using a newly
defined ``basic:secret`` input type

Fixed
-----
- Set correct executor in flow manager
- Make executors more robust to unhandled failures


==================
5.1.0 - 2017-12-12
Expand Down
2 changes: 1 addition & 1 deletion resolwe/flow/execution_engines/bash/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def evaluate(self, data):
# Include special 'proc' variable in the context.
inputs['proc'] = {
'data_id': data.id,
'data_dir': settings.FLOW_EXECUTOR['DATA_DIR'],
'data_dir': self.manager.get_executor().resolve_data_path(),
}

# Include special 'requirements' variable in the context.
Expand Down
5 changes: 5 additions & 0 deletions resolwe/flow/executors/docker/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""Constants for the Docker executor."""
DATA_VOLUME = '/data'
DATA_ALL_VOLUME = '/data_all'
UPLOAD_VOLUME = '/upload'
SECRETS_VOLUME = '/secrets'
32 changes: 32 additions & 0 deletions resolwe/flow/executors/docker/prepare.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@
method.
"""

import os

from django.conf import settings
from django.core.management import call_command

from . import constants
from ..prepare import BaseFlowExecutorPreparer


Expand All @@ -20,3 +23,32 @@ def post_register_hook(self):
"""Pull Docker images needed by processes after registering."""
if not getattr(settings, 'FLOW_DOCKER_DONT_PULL', False):
call_command('list_docker_images', pull=True)

def resolve_data_path(self, data=None, filename=None):
"""Resolve data path for use with the executor.

:param data: Data object instance
:param filename: Filename to resolve
:return: Resolved filename, which can be used to access the
given data file in programs executed using this executor
"""
if data is None:
return constants.DATA_ALL_VOLUME

if filename is None:
return os.path.join(constants.DATA_ALL_VOLUME, str(data.id))

return os.path.join(constants.DATA_ALL_VOLUME, str(data.id), filename)

def resolve_upload_path(self, filename=None):
"""Resolve upload path for use with the executor.

:param filename: Filename to resolve
:return: Resolved filename, which can be used to access the
given uploaded file in programs executed using this
executor
"""
if filename is None:
return constants.UPLOAD_VOLUME

return os.path.join(constants.UPLOAD_VOLUME, filename)
98 changes: 69 additions & 29 deletions resolwe/flow/executors/docker/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
import subprocess
import tempfile

from . import constants
from ..local.run import FlowExecutor as LocalFlowExecutor
from ..protocol import ExecutorFiles # pylint: disable=import-error
from ..run import PROCESS_META, SETTINGS
from .seccomp import SECCOMP_POLICY

Expand All @@ -22,7 +24,7 @@ def __init__(self, *args, **kwargs):
super(FlowExecutor, self).__init__(*args, **kwargs)

self.container_name_prefix = None
self.mappings_tools = None
self.tools_volumes = None
self.temporary_files = []
self.command = SETTINGS.get('FLOW_DOCKER_COMMAND', 'docker')

Expand Down Expand Up @@ -94,11 +96,45 @@ def start(self):

command_args['security'] = ' '.join(security)

# render Docker mappings in FLOW_DOCKER_MAPPINGS setting
mappings_template = SETTINGS.get('FLOW_DOCKER_MAPPINGS', [])
context = {'data_id': self.data_id}
mappings = [{key.format(**context): value.format(**context) for key, value in template.items()}
for template in mappings_template]
# Setup Docker volumes.
def new_volume(kind, base_dir_name, volume, path=None, read_only=True):
"""Generate a new volume entry.

:param kind: Kind of volume, which is used for getting extra options from
settings (the ``FLOW_DOCKER_VOLUME_EXTRA_OPTIONS`` setting)
:param base_dir_name: Name of base directory setting for volume source path
:param volume: Destination volume mount point
:param path: Optional additional path atoms appended to source path
:param read_only: True to make the volume read-only
"""
if path is None:
path = []

path = [str(atom) for atom in path]

options = set(SETTINGS.get('FLOW_DOCKER_VOLUME_EXTRA_OPTIONS', {}).get(kind, '').split(','))
options.discard('')
# Do not allow modification of read-only option.
options.discard('ro')
options.discard('rw')

if read_only:
options.add('ro')
else:
options.add('rw')

return {
'src': os.path.join(SETTINGS['FLOW_EXECUTOR'].get(base_dir_name, ''), *path),
'dest': volume,
'options': ','.join(options),
}

volumes = [
new_volume('data', 'DATA_DIR', constants.DATA_VOLUME, [self.data_id], read_only=False),
new_volume('data_all', 'DATA_DIR', constants.DATA_ALL_VOLUME),
new_volume('upload', 'UPLOAD_DIR', constants.UPLOAD_VOLUME, read_only=False),
new_volume('secrets', 'RUNTIME_DIR', constants.SECRETS_VOLUME, [self.data_id, ExecutorFiles.SECRETS_DIR]),
]

# Generate dummy passwd and create mappings for it. This is required because some tools
# inside the container may try to lookup the given UID/GID and will crash if they don't
Expand All @@ -115,26 +151,33 @@ def start(self):
group_file.file.flush()
self.temporary_files.append(group_file)

mappings.append({'src': passwd_file.name, 'dest': '/etc/passwd', 'mode': 'ro,Z'})
mappings.append({'src': group_file.name, 'dest': '/etc/group', 'mode': 'ro,Z'})
volumes += [
new_volume('users', None, '/etc/passwd', [passwd_file.name]),
new_volume('users', None, '/etc/group', [group_file.name]),
]

# create mappings for tools
# Create volumes for tools.
# NOTE: To prevent processes tampering with tools, all tools are mounted read-only
# NOTE: Since the tools are shared among all containers they must use the shared SELinux
# label (z option)
self.mappings_tools = [{'src': tool, 'dest': '/usr/local/bin/resolwe/{}'.format(i), 'mode': 'ro,z'}
for i, tool in enumerate(self.get_tools())]
mappings += self.mappings_tools
# create Docker --volume parameters from mappings
command_args['volumes'] = ' '.join(['--volume="{src}":"{dest}":{mode}'.format(**map_)
for map_ in mappings])

# set working directory inside the container to the mapped directory of
# the current Data's directory
command_args['workdir'] = ''
for template in mappings_template:
if '{data_id}' in template['src']:
command_args['workdir'] = '--workdir={}'.format(template['dest'])
self.tools_volumes = []
for index, tool in enumerate(self.get_tools()):
self.tools_volumes.append(new_volume(
'tools',
None,
os.path.join('/usr/local/bin/resolwe', str(index)),
[tool]
))

volumes += self.tools_volumes

# Add any extra volumes verbatim.
volumes += SETTINGS.get('FLOW_DOCKER_EXTRA_VOLUMES', [])

# Create Docker --volume parameters from volumes.
command_args['volumes'] = ' '.join(['--volume="{src}":"{dest}":{options}'.format(**volume)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kostko Will this work if options is empty? Or do you have to omit the colon?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it works if the options are empty (no need to omit the colon).

for volume in volumes])

# Set working directory to the data volume.
command_args['workdir'] = '--workdir={}'.format(constants.DATA_VOLUME)

# Change user inside the container.
command_args['user'] = '--user={}:{}'.format(os.getuid(), os.getgid())
Expand All @@ -153,11 +196,8 @@ def start(self):

def run_script(self, script):
"""Execute the script and save results."""
mappings = SETTINGS.get('FLOW_DOCKER_MAPPINGS', {})
for map_ in mappings:
script = script.replace(map_['src'], map_['dest'])
# create a Bash command to add all the tools to PATH
tools_paths = ':'.join([map_["dest"] for map_ in self.mappings_tools])
# Create a Bash command to add all the tools to PATH.
tools_paths = ':'.join([map_["dest"] for map_ in self.tools_volumes])
add_tools_path = 'export PATH=$PATH:{}'.format(tools_paths)
# Spawn another child bash, to avoid running anything as PID 1, which has special
# signal handling (e.g., cannot be SIGKILL-ed from inside).
Expand Down
20 changes: 19 additions & 1 deletion resolwe/flow/executors/local/prepare.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,22 @@
method.
"""

from ..prepare import BaseFlowExecutorPreparer as FlowExecutorPreparer # pylint: disable=unused-import
from django.core.exceptions import PermissionDenied

from resolwe.flow.models import Data

from ..prepare import BaseFlowExecutorPreparer


class FlowExecutorPreparer(BaseFlowExecutorPreparer):
"""Specialized manager assist for the local executor."""

def extend_settings(self, data_id, files, secrets):
"""Prevent processes requiring access to secrets from being run."""
process = Data.objects.get(pk=data_id).process
if process.requirements.get('resources', {}).get('secrets', False):
raise PermissionDenied(
"Process which requires access to secrets cannot be run using the local executor"
)

return super().extend_settings(data_id, files, secrets)
48 changes: 41 additions & 7 deletions resolwe/flow/executors/prepare.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
class BaseFlowExecutorPreparer(object):
"""Represents the preparation functionality of the executor."""

def extend_settings(self, data_id, files):
def extend_settings(self, data_id, files, secrets):
"""Extend the settings the manager will serialize.

:param data_id: The :class:`~resolwe.flow.models.Data` object id
Expand All @@ -26,17 +26,22 @@ def extend_settings(self, data_id, files):
filenames, values are the objects that will be serialized
into those files. Standard filenames are listed in
:class:`resolwe.flow.managers.protocol.ExecutorFiles`.
:param secrets: Secret files dictionary describing additional secret
file content that should be created and made available to
processes with special permissions. Keys are filenames, values
are the raw strings that should be written into those files.
"""
data = Data.objects.select_related('process').get(pk=data_id)

files[ExecutorFiles.DJANGO_SETTINGS].update({
'USE_TZ': settings.USE_TZ,
'FLOW_EXECUTOR_TOOLS_PATHS': self.get_tools(),
})
files[ExecutorFiles.DATA] = model_to_dict(Data.objects.get(
pk=data_id
))
files[ExecutorFiles.PROCESS] = model_to_dict(Data.objects.get(
pk=data_id
).process)
files[ExecutorFiles.DATA] = model_to_dict(data)
files[ExecutorFiles.PROCESS] = model_to_dict(data.process)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1


# Add secrets if the process has permission to read them.
secrets.update(data.resolve_secrets())

def get_tools(self):
"""Get tools paths."""
Expand All @@ -60,3 +65,32 @@ def post_register_hook(self):
this point. By default, it does nothing.
"""
pass

def resolve_data_path(self, data=None, filename=None):
"""Resolve data path for use with the executor.

:param data: Data object instance
:param filename: Filename to resolve
:return: Resolved filename, which can be used to access the
given data file in programs executed using this executor
"""
if data is None:
return settings.FLOW_EXECUTOR['DATA_DIR']

if filename is None:
return os.path.join(settings.FLOW_EXECUTOR['DATA_DIR'], str(data.id))

return os.path.join(settings.FLOW_EXECUTOR['DATA_DIR'], str(data.id), filename)

def resolve_upload_path(self, filename=None):
"""Resolve upload path for use with the executor.

:param filename: Filename to resolve
:return: Resolved filename, which can be used to access the
given uploaded file in programs executed using this
executor
"""
if filename is None:
return settings.FLOW_EXECUTOR['UPLOAD_DIR']

return os.path.join(settings.FLOW_EXECUTOR['UPLOAD_DIR'], filename)
24 changes: 22 additions & 2 deletions resolwe/flow/executors/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,26 @@ def update_data_status(self, **kwargs):
})

def run(self, data_id, script, verbosity=1):
"""Execute the script and save results."""
try:
finish_fields = self._run(data_id, script, verbosity=verbosity)
except Exception as error: # pylint: disable=broad-except
logger.exception("Unhandled exception in executor")

# Send error report.
self.update_data_status(process_error=[str(error)], status=DATA_META['STATUS_ERROR'])
Copy link
Copy Markdown
Member

@dblenkus dblenkus Dec 8, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A quick question: are process_* fields extended when updated or are they overwritten? @jberci?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jberci, ping


finish_fields = {
ExecutorProtocol.FINISH_PROCESS_RC: 1,
}

if finish_fields is not None:
self._send_manager_command(ExecutorProtocol.FINISH, extra_fields=finish_fields)

# The response channel (Redis list) is deleted automatically once the list is drained, so
# there is no need to remove it manually.

def _run(self, data_id, script, verbosity=1):
"""Execute the script and save results."""
if verbosity >= 1:
print('RUN: {} {}'.format(data_id, script))
Expand Down Expand Up @@ -287,8 +307,8 @@ def run(self, data_id, script, verbosity=1):
if spawn_processes and process_rc == 0:
finish_fields[ExecutorProtocol.FINISH_SPAWN_PROCESSES] = spawn_processes
finish_fields[ExecutorProtocol.FINISH_EXPORTED_FILES] = self.exported_files_mapper
self._send_manager_command(ExecutorProtocol.FINISH, extra_fields=finish_fields)
# the feedback key deletes itself once the list is drained

return finish_fields

def terminate(self, data_id):
"""Terminate a running script."""
Expand Down
17 changes: 8 additions & 9 deletions resolwe/flow/expression_engines/jinja/filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,18 +63,17 @@ def data_by_slug(slug):
return Data.objects.get(slug=slug).pk


def get_url(file_path):
def get_url(hydrated_path):
"""Return file's url based on base url set in settings."""
# Get only file path if whole file object is given
if isinstance(file_path, dict) and 'file' in file_path:
file_path = file_path['file']
# Get only file path if whole file object is given.
if isinstance(hydrated_path, dict) and 'file' in hydrated_path:
hydrated_path = hydrated_path['file']

data_dir = settings.FLOW_EXECUTOR['DATA_DIR']
file_path = file_path[len(data_dir):] # remove data_dir prefix
file_path = file_path.lstrip('/')
base_url = getattr(settings, 'RESOLWE_HOST_URL', 'localhost')
if not hasattr(hydrated_path, 'file_name'):
raise TypeError("Argument to get_url must be a hydrated path")

return "{}/data/{}".format(base_url, file_path)
base_url = getattr(settings, 'RESOLWE_HOST_URL', 'localhost')
return "{}/data/{}/{}".format(base_url, hydrated_path.data_id, hydrated_path.file_name)


def descriptor(obj, path=''):
Expand Down
2 changes: 1 addition & 1 deletion resolwe/flow/managers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,4 @@ def load_manager(manager_name):


FLOW_MANAGER = getattr(settings, 'FLOW_MANAGER', {}).get('NAME', 'resolwe.flow.managers.local')
manager = load_manager(FLOW_MANAGER).Manager(static=True) # pylint: disable=invalid-name
manager = load_manager(FLOW_MANAGER).Manager() # pylint: disable=invalid-name
Loading