diff --git a/docs/CHANGELOG.rst b/docs/CHANGELOG.rst index c8f82ef23..94f5259d1 100644 --- a/docs/CHANGELOG.rst +++ b/docs/CHANGELOG.rst @@ -10,6 +10,21 @@ This project adheres to `Semantic Versioning `_. Unreleased ========== +Changed +------- +- **BACKWARD INCOMPATIBLE:** Removed ``FLOW_DOCKER_MAPPINGS`` in favor of new + ``FLOW_DOCKER_VOLUME_EXTRA_OPTIONS`` and ``FLOW_DOCKER_EXTRA_VOLUMES`` + +Added +----- +- Support for passing secrets to processes in a controlled way using a newly + defined ``basic:secret`` input type + +Fixed +----- +- Set correct executor in flow manager +- Make executors more robust to unhandled failures + ================== 5.1.0 - 2017-12-12 diff --git a/resolwe/flow/execution_engines/bash/__init__.py b/resolwe/flow/execution_engines/bash/__init__.py index e1f353fde..454c45b59 100644 --- a/resolwe/flow/execution_engines/bash/__init__.py +++ b/resolwe/flow/execution_engines/bash/__init__.py @@ -35,7 +35,7 @@ def evaluate(self, data): # Include special 'proc' variable in the context. inputs['proc'] = { 'data_id': data.id, - 'data_dir': settings.FLOW_EXECUTOR['DATA_DIR'], + 'data_dir': self.manager.get_executor().resolve_data_path(), } # Include special 'requirements' variable in the context. diff --git a/resolwe/flow/executors/docker/constants.py b/resolwe/flow/executors/docker/constants.py new file mode 100644 index 000000000..e5a292ee9 --- /dev/null +++ b/resolwe/flow/executors/docker/constants.py @@ -0,0 +1,5 @@ +"""Constants for the Docker executor.""" +DATA_VOLUME = '/data' +DATA_ALL_VOLUME = '/data_all' +UPLOAD_VOLUME = '/upload' +SECRETS_VOLUME = '/secrets' diff --git a/resolwe/flow/executors/docker/prepare.py b/resolwe/flow/executors/docker/prepare.py index 3f2058df1..5f584b97f 100644 --- a/resolwe/flow/executors/docker/prepare.py +++ b/resolwe/flow/executors/docker/prepare.py @@ -7,9 +7,12 @@ method. """ +import os + from django.conf import settings from django.core.management import call_command +from . import constants from ..prepare import BaseFlowExecutorPreparer @@ -20,3 +23,32 @@ def post_register_hook(self): """Pull Docker images needed by processes after registering.""" if not getattr(settings, 'FLOW_DOCKER_DONT_PULL', False): call_command('list_docker_images', pull=True) + + def resolve_data_path(self, data=None, filename=None): + """Resolve data path for use with the executor. + + :param data: Data object instance + :param filename: Filename to resolve + :return: Resolved filename, which can be used to access the + given data file in programs executed using this executor + """ + if data is None: + return constants.DATA_ALL_VOLUME + + if filename is None: + return os.path.join(constants.DATA_ALL_VOLUME, str(data.id)) + + return os.path.join(constants.DATA_ALL_VOLUME, str(data.id), filename) + + def resolve_upload_path(self, filename=None): + """Resolve upload path for use with the executor. + + :param filename: Filename to resolve + :return: Resolved filename, which can be used to access the + given uploaded file in programs executed using this + executor + """ + if filename is None: + return constants.UPLOAD_VOLUME + + return os.path.join(constants.UPLOAD_VOLUME, filename) diff --git a/resolwe/flow/executors/docker/run.py b/resolwe/flow/executors/docker/run.py index 7876f40e8..1723659f0 100644 --- a/resolwe/flow/executors/docker/run.py +++ b/resolwe/flow/executors/docker/run.py @@ -7,7 +7,9 @@ import subprocess import tempfile +from . import constants from ..local.run import FlowExecutor as LocalFlowExecutor +from ..protocol import ExecutorFiles # pylint: disable=import-error from ..run import PROCESS_META, SETTINGS from .seccomp import SECCOMP_POLICY @@ -22,7 +24,7 @@ def __init__(self, *args, **kwargs): super(FlowExecutor, self).__init__(*args, **kwargs) self.container_name_prefix = None - self.mappings_tools = None + self.tools_volumes = None self.temporary_files = [] self.command = SETTINGS.get('FLOW_DOCKER_COMMAND', 'docker') @@ -94,11 +96,45 @@ def start(self): command_args['security'] = ' '.join(security) - # render Docker mappings in FLOW_DOCKER_MAPPINGS setting - mappings_template = SETTINGS.get('FLOW_DOCKER_MAPPINGS', []) - context = {'data_id': self.data_id} - mappings = [{key.format(**context): value.format(**context) for key, value in template.items()} - for template in mappings_template] + # Setup Docker volumes. + def new_volume(kind, base_dir_name, volume, path=None, read_only=True): + """Generate a new volume entry. + + :param kind: Kind of volume, which is used for getting extra options from + settings (the ``FLOW_DOCKER_VOLUME_EXTRA_OPTIONS`` setting) + :param base_dir_name: Name of base directory setting for volume source path + :param volume: Destination volume mount point + :param path: Optional additional path atoms appended to source path + :param read_only: True to make the volume read-only + """ + if path is None: + path = [] + + path = [str(atom) for atom in path] + + options = set(SETTINGS.get('FLOW_DOCKER_VOLUME_EXTRA_OPTIONS', {}).get(kind, '').split(',')) + options.discard('') + # Do not allow modification of read-only option. + options.discard('ro') + options.discard('rw') + + if read_only: + options.add('ro') + else: + options.add('rw') + + return { + 'src': os.path.join(SETTINGS['FLOW_EXECUTOR'].get(base_dir_name, ''), *path), + 'dest': volume, + 'options': ','.join(options), + } + + volumes = [ + new_volume('data', 'DATA_DIR', constants.DATA_VOLUME, [self.data_id], read_only=False), + new_volume('data_all', 'DATA_DIR', constants.DATA_ALL_VOLUME), + new_volume('upload', 'UPLOAD_DIR', constants.UPLOAD_VOLUME, read_only=False), + new_volume('secrets', 'RUNTIME_DIR', constants.SECRETS_VOLUME, [self.data_id, ExecutorFiles.SECRETS_DIR]), + ] # Generate dummy passwd and create mappings for it. This is required because some tools # inside the container may try to lookup the given UID/GID and will crash if they don't @@ -115,26 +151,33 @@ def start(self): group_file.file.flush() self.temporary_files.append(group_file) - mappings.append({'src': passwd_file.name, 'dest': '/etc/passwd', 'mode': 'ro,Z'}) - mappings.append({'src': group_file.name, 'dest': '/etc/group', 'mode': 'ro,Z'}) + volumes += [ + new_volume('users', None, '/etc/passwd', [passwd_file.name]), + new_volume('users', None, '/etc/group', [group_file.name]), + ] - # create mappings for tools + # Create volumes for tools. # NOTE: To prevent processes tampering with tools, all tools are mounted read-only - # NOTE: Since the tools are shared among all containers they must use the shared SELinux - # label (z option) - self.mappings_tools = [{'src': tool, 'dest': '/usr/local/bin/resolwe/{}'.format(i), 'mode': 'ro,z'} - for i, tool in enumerate(self.get_tools())] - mappings += self.mappings_tools - # create Docker --volume parameters from mappings - command_args['volumes'] = ' '.join(['--volume="{src}":"{dest}":{mode}'.format(**map_) - for map_ in mappings]) - - # set working directory inside the container to the mapped directory of - # the current Data's directory - command_args['workdir'] = '' - for template in mappings_template: - if '{data_id}' in template['src']: - command_args['workdir'] = '--workdir={}'.format(template['dest']) + self.tools_volumes = [] + for index, tool in enumerate(self.get_tools()): + self.tools_volumes.append(new_volume( + 'tools', + None, + os.path.join('/usr/local/bin/resolwe', str(index)), + [tool] + )) + + volumes += self.tools_volumes + + # Add any extra volumes verbatim. + volumes += SETTINGS.get('FLOW_DOCKER_EXTRA_VOLUMES', []) + + # Create Docker --volume parameters from volumes. + command_args['volumes'] = ' '.join(['--volume="{src}":"{dest}":{options}'.format(**volume) + for volume in volumes]) + + # Set working directory to the data volume. + command_args['workdir'] = '--workdir={}'.format(constants.DATA_VOLUME) # Change user inside the container. command_args['user'] = '--user={}:{}'.format(os.getuid(), os.getgid()) @@ -153,11 +196,8 @@ def start(self): def run_script(self, script): """Execute the script and save results.""" - mappings = SETTINGS.get('FLOW_DOCKER_MAPPINGS', {}) - for map_ in mappings: - script = script.replace(map_['src'], map_['dest']) - # create a Bash command to add all the tools to PATH - tools_paths = ':'.join([map_["dest"] for map_ in self.mappings_tools]) + # Create a Bash command to add all the tools to PATH. + tools_paths = ':'.join([map_["dest"] for map_ in self.tools_volumes]) add_tools_path = 'export PATH=$PATH:{}'.format(tools_paths) # Spawn another child bash, to avoid running anything as PID 1, which has special # signal handling (e.g., cannot be SIGKILL-ed from inside). diff --git a/resolwe/flow/executors/local/prepare.py b/resolwe/flow/executors/local/prepare.py index b0885429b..8b53af656 100644 --- a/resolwe/flow/executors/local/prepare.py +++ b/resolwe/flow/executors/local/prepare.py @@ -7,4 +7,22 @@ method. """ -from ..prepare import BaseFlowExecutorPreparer as FlowExecutorPreparer # pylint: disable=unused-import +from django.core.exceptions import PermissionDenied + +from resolwe.flow.models import Data + +from ..prepare import BaseFlowExecutorPreparer + + +class FlowExecutorPreparer(BaseFlowExecutorPreparer): + """Specialized manager assist for the local executor.""" + + def extend_settings(self, data_id, files, secrets): + """Prevent processes requiring access to secrets from being run.""" + process = Data.objects.get(pk=data_id).process + if process.requirements.get('resources', {}).get('secrets', False): + raise PermissionDenied( + "Process which requires access to secrets cannot be run using the local executor" + ) + + return super().extend_settings(data_id, files, secrets) diff --git a/resolwe/flow/executors/prepare.py b/resolwe/flow/executors/prepare.py index b4033acbf..e61c677d7 100644 --- a/resolwe/flow/executors/prepare.py +++ b/resolwe/flow/executors/prepare.py @@ -17,7 +17,7 @@ class BaseFlowExecutorPreparer(object): """Represents the preparation functionality of the executor.""" - def extend_settings(self, data_id, files): + def extend_settings(self, data_id, files, secrets): """Extend the settings the manager will serialize. :param data_id: The :class:`~resolwe.flow.models.Data` object id @@ -26,17 +26,22 @@ def extend_settings(self, data_id, files): filenames, values are the objects that will be serialized into those files. Standard filenames are listed in :class:`resolwe.flow.managers.protocol.ExecutorFiles`. + :param secrets: Secret files dictionary describing additional secret + file content that should be created and made available to + processes with special permissions. Keys are filenames, values + are the raw strings that should be written into those files. """ + data = Data.objects.select_related('process').get(pk=data_id) + files[ExecutorFiles.DJANGO_SETTINGS].update({ 'USE_TZ': settings.USE_TZ, 'FLOW_EXECUTOR_TOOLS_PATHS': self.get_tools(), }) - files[ExecutorFiles.DATA] = model_to_dict(Data.objects.get( - pk=data_id - )) - files[ExecutorFiles.PROCESS] = model_to_dict(Data.objects.get( - pk=data_id - ).process) + files[ExecutorFiles.DATA] = model_to_dict(data) + files[ExecutorFiles.PROCESS] = model_to_dict(data.process) + + # Add secrets if the process has permission to read them. + secrets.update(data.resolve_secrets()) def get_tools(self): """Get tools paths.""" @@ -60,3 +65,32 @@ def post_register_hook(self): this point. By default, it does nothing. """ pass + + def resolve_data_path(self, data=None, filename=None): + """Resolve data path for use with the executor. + + :param data: Data object instance + :param filename: Filename to resolve + :return: Resolved filename, which can be used to access the + given data file in programs executed using this executor + """ + if data is None: + return settings.FLOW_EXECUTOR['DATA_DIR'] + + if filename is None: + return os.path.join(settings.FLOW_EXECUTOR['DATA_DIR'], str(data.id)) + + return os.path.join(settings.FLOW_EXECUTOR['DATA_DIR'], str(data.id), filename) + + def resolve_upload_path(self, filename=None): + """Resolve upload path for use with the executor. + + :param filename: Filename to resolve + :return: Resolved filename, which can be used to access the + given uploaded file in programs executed using this + executor + """ + if filename is None: + return settings.FLOW_EXECUTOR['UPLOAD_DIR'] + + return os.path.join(settings.FLOW_EXECUTOR['UPLOAD_DIR'], filename) diff --git a/resolwe/flow/executors/run.py b/resolwe/flow/executors/run.py index 2035737e9..9f8d93450 100644 --- a/resolwe/flow/executors/run.py +++ b/resolwe/flow/executors/run.py @@ -153,6 +153,26 @@ def update_data_status(self, **kwargs): }) def run(self, data_id, script, verbosity=1): + """Execute the script and save results.""" + try: + finish_fields = self._run(data_id, script, verbosity=verbosity) + except Exception as error: # pylint: disable=broad-except + logger.exception("Unhandled exception in executor") + + # Send error report. + self.update_data_status(process_error=[str(error)], status=DATA_META['STATUS_ERROR']) + + finish_fields = { + ExecutorProtocol.FINISH_PROCESS_RC: 1, + } + + if finish_fields is not None: + self._send_manager_command(ExecutorProtocol.FINISH, extra_fields=finish_fields) + + # The response channel (Redis list) is deleted automatically once the list is drained, so + # there is no need to remove it manually. + + def _run(self, data_id, script, verbosity=1): """Execute the script and save results.""" if verbosity >= 1: print('RUN: {} {}'.format(data_id, script)) @@ -287,8 +307,8 @@ def run(self, data_id, script, verbosity=1): if spawn_processes and process_rc == 0: finish_fields[ExecutorProtocol.FINISH_SPAWN_PROCESSES] = spawn_processes finish_fields[ExecutorProtocol.FINISH_EXPORTED_FILES] = self.exported_files_mapper - self._send_manager_command(ExecutorProtocol.FINISH, extra_fields=finish_fields) - # the feedback key deletes itself once the list is drained + + return finish_fields def terminate(self, data_id): """Terminate a running script.""" diff --git a/resolwe/flow/expression_engines/jinja/filters.py b/resolwe/flow/expression_engines/jinja/filters.py index 1abb63804..f8138be78 100644 --- a/resolwe/flow/expression_engines/jinja/filters.py +++ b/resolwe/flow/expression_engines/jinja/filters.py @@ -63,18 +63,17 @@ def data_by_slug(slug): return Data.objects.get(slug=slug).pk -def get_url(file_path): +def get_url(hydrated_path): """Return file's url based on base url set in settings.""" - # Get only file path if whole file object is given - if isinstance(file_path, dict) and 'file' in file_path: - file_path = file_path['file'] + # Get only file path if whole file object is given. + if isinstance(hydrated_path, dict) and 'file' in hydrated_path: + hydrated_path = hydrated_path['file'] - data_dir = settings.FLOW_EXECUTOR['DATA_DIR'] - file_path = file_path[len(data_dir):] # remove data_dir prefix - file_path = file_path.lstrip('/') - base_url = getattr(settings, 'RESOLWE_HOST_URL', 'localhost') + if not hasattr(hydrated_path, 'file_name'): + raise TypeError("Argument to get_url must be a hydrated path") - return "{}/data/{}".format(base_url, file_path) + base_url = getattr(settings, 'RESOLWE_HOST_URL', 'localhost') + return "{}/data/{}/{}".format(base_url, hydrated_path.data_id, hydrated_path.file_name) def descriptor(obj, path=''): diff --git a/resolwe/flow/managers/__init__.py b/resolwe/flow/managers/__init__.py index 1ac01a4fa..bdd67fa99 100644 --- a/resolwe/flow/managers/__init__.py +++ b/resolwe/flow/managers/__init__.py @@ -38,4 +38,4 @@ def load_manager(manager_name): FLOW_MANAGER = getattr(settings, 'FLOW_MANAGER', {}).get('NAME', 'resolwe.flow.managers.local') -manager = load_manager(FLOW_MANAGER).Manager(static=True) # pylint: disable=invalid-name +manager = load_manager(FLOW_MANAGER).Manager() # pylint: disable=invalid-name diff --git a/resolwe/flow/managers/base.py b/resolwe/flow/managers/base.py index 72b5754f6..947b25b3b 100644 --- a/resolwe/flow/managers/base.py +++ b/resolwe/flow/managers/base.py @@ -16,11 +16,11 @@ from importlib import import_module from channels import Channel -from channels.generic import BaseConsumer from channels.test import Client from django.conf import settings from django.contrib.contenttypes.models import ContentType +from django.core.exceptions import PermissionDenied from django.db import IntegrityError, transaction from resolwe.flow.engine import InvalidEngineError, load_engines @@ -86,7 +86,7 @@ def dependency_status(data): return Data.STATUS_DONE -class BaseManager(BaseConsumer): +class BaseManager(object): """Manager handles process job execution.""" class _SynchronizationManager(object): @@ -159,12 +159,18 @@ def __exit__(self, exc_type, exc_val, exc_tb): self.state.settings_override = self.old_overrides return False - def discover_engines(self): - """Discover configured engines.""" - executor = getattr(settings, 'FLOW_EXECUTOR', {}).get('NAME', 'resolwe.flow.executors.local') + def discover_engines(self, executor=None): + """Discover configured engines. + + :param executor: Optional executor module override + """ + if executor is None: + executor = getattr(settings, 'FLOW_EXECUTOR', {}).get('NAME', 'resolwe.flow.executors.local') self.executor = self.load_executor(executor) + expression_engines = getattr(settings, 'FLOW_EXPRESSION_ENGINES', ['resolwe.flow.expression_engines.jinja']) self.expression_engines = self.load_expression_engines(expression_engines) + execution_engines = getattr(settings, 'FLOW_EXECUTION_ENGINES', ['resolwe.flow.execution_engines.bash']) self.execution_engines = self.load_execution_engines(execution_engines) @@ -210,23 +216,14 @@ def __init__(self, *args, **kwargs): # independently, it must clear the cache on its own. self._drop_ctypes = getattr(settings, 'FLOW_MANAGER_DISABLE_CTYPE_CACHE', False) - if 'static' not in kwargs or not kwargs['static']: - super().__init__(*args, **kwargs) - - def update_routing(self): - """Update naming information in the Channels' routing layer. - - This should not be needed in normal operation. It's mostly here - to support the testing infrastructure, where the manner in which - Django settings are patched and the ordering (in terms of import - order) where they're needed clash and make things awkward. - """ - # This is all a bit kludgy, but we want runtime settings - # patching after the module is already loaded, and Channels - # wants a class variable. - setattr(type(self), 'method_mapping', { - state.MANAGER_CONTROL_CHANNEL: 'handle_control_event', - }) + # Ensure there is only one manager instance per process. This + # is required as other parts of the code access the global + # manager instance. + try: + from resolwe.flow import managers + assert not hasattr(managers, 'manager') + except ImportError: + pass def _marshal_settings(self): """Marshal Django settings into a serializable object. @@ -329,6 +326,7 @@ def _prepare_context(self, data_id, data_dir, runtime_dir, **kwargs): file. """ files = {} + secrets = {} settings_dict = {} settings_dict['DATA_DIR'] = data_dir @@ -340,28 +338,47 @@ def _prepare_context(self, data_id, data_dir, runtime_dir, **kwargs): django_settings.update(kwargs) files[ExecutorFiles.DJANGO_SETTINGS] = django_settings - # add scheduling classes + # Add scheduling classes. files[ExecutorFiles.PROCESS_META] = { k: getattr(Process, k) for k in dir(Process) if k.startswith('SCHEDULING_CLASS_') and isinstance(getattr(Process, k), str) } - # add Data status constants + # Add Data status constants. files[ExecutorFiles.DATA_META] = { k: getattr(Data, k) for k in dir(Data) if k.startswith('STATUS_') and isinstance(getattr(Data, k), str) } - # extend the settings with whatever the executor wants - self.executor.extend_settings(data_id, files) + # Extend the settings with whatever the executor wants. + self.executor.extend_settings(data_id, files, secrets) - # save the settings into the various files in the data dir + # Save the settings into the various files in the runtime dir. settings_dict[ExecutorFiles.FILE_LIST_KEY] = list(files.keys()) for file_name in files: file_path = os.path.join(runtime_dir, file_name) with open(file_path, 'wt') as json_file: json.dump(files[file_name], json_file, cls=SettingsJSONifier) + # Save the secrets in the runtime dir, with permissions to prevent listing the given + # directory. + secrets_dir = os.path.join(runtime_dir, ExecutorFiles.SECRETS_DIR) + os.makedirs(secrets_dir, mode=0o300) + for file_name, value in secrets.items(): + file_path = os.path.join(secrets_dir, file_name) + + # Set umask to 0 to ensure that we set the correct permissions. + old_umask = os.umask(0) + try: + # We need to use os.open in order to correctly enforce file creation. Otherwise, + # there is a race condition which can be used to create the file with different + # ownership/permissions. + file_descriptor = os.open(file_path, os.O_WRONLY | os.O_CREAT, mode=0o600) + with os.fdopen(file_descriptor, 'w') as raw_file: + raw_file.write(value) + finally: + os.umask(old_umask) + def _prepare_executor(self, data_id, executor): """Copy executor sources into the destination directory. @@ -449,7 +466,16 @@ def handle_control_event(self, message): elif cmd == WorkerProtocol.FINISH: data_id = message.content[WorkerProtocol.DATA_ID] if not getattr(settings, 'FLOW_MANAGER_KEEP_DATA', False): - shutil.rmtree(self._get_per_data_dir('RUNTIME_DIR', data_id)) + try: + def handle_error(func, path, exc_info): + """Handle permission errors while removing data directories.""" + if isinstance(exc_info[1], PermissionError): + os.chmod(path, 0o700) + shutil.rmtree(path) + + shutil.rmtree(self._get_per_data_dir('RUNTIME_DIR', data_id), onerror=handle_error) + except OSError: + logger.exception("Manager exception while removing data runtime directory.") if message.content[WorkerProtocol.FINISH_SPAWNED]: new_sema = self.state.sync_semaphore.add(1) @@ -571,6 +597,10 @@ def _communicate(self, run_sync=False, verbosity=1, executor='resolwe.flow.execu state.MANAGER_CONTROL_CHANNEL, executor )) + + # Ensure we have the correct engines loaded. + self.discover_engines(executor=executor) + if self._drop_ctypes: ContentType.objects.clear_cache() try: @@ -645,6 +675,11 @@ def _communicate(self, run_sync=False, verbosity=1, executor='resolwe.flow.execu self.settings_actual.get('FLOW_EXECUTOR', {}).get('PYTHON', '/usr/bin/env python') + ' -m executors ' + executor_module ] + except PermissionDenied as error: + data.status = Data.STATUS_ERROR + data.process_error.append("Permission denied for process: {}".format(error)) + data.save() + continue except OSError as err: logger.error(__( "OSError occurred while preparing data {} (will skip): {}", diff --git a/resolwe/flow/managers/consumer.py b/resolwe/flow/managers/consumer.py new file mode 100644 index 000000000..4734f5ddb --- /dev/null +++ b/resolwe/flow/managers/consumer.py @@ -0,0 +1,24 @@ +"""Manager Channels consumer.""" +from channels.generic import BaseConsumer + +from . import manager, state + + +class ManagerConsumer(BaseConsumer): + """Channels consumer for handling manager events.""" + + @classmethod + def channel_names(cls): + """Names of channels to listen for messages on.""" + return {state.MANAGER_CONTROL_CHANNEL} + + @property + def method_mapping(self): + """Consumer channel to method mapping.""" + return { + state.MANAGER_CONTROL_CHANNEL: 'handle_control_event', + } + + def handle_control_event(self, message): + """Handle a control event message.""" + manager.handle_control_event(message) diff --git a/resolwe/flow/managers/protocol.py b/resolwe/flow/managers/protocol.py index e1082945c..0ad372872 100644 --- a/resolwe/flow/managers/protocol.py +++ b/resolwe/flow/managers/protocol.py @@ -47,3 +47,5 @@ class ExecutorFiles(object): PROCESS_META = 'process_meta.json' PROCESS_SCRIPT = 'process_script.sh' + + SECRETS_DIR = 'secrets' diff --git a/resolwe/flow/migrations/0010_add_secret.py b/resolwe/flow/migrations/0010_add_secret.py new file mode 100644 index 000000000..4e387fa7a --- /dev/null +++ b/resolwe/flow/migrations/0010_add_secret.py @@ -0,0 +1,33 @@ +# -*- coding: utf-8 -*- +# Generated by Django 1.11.6 on 2017-12-06 05:15 +from __future__ import unicode_literals + +from django.conf import settings +import django.contrib.postgres.fields.jsonb +from django.db import migrations, models +import django.db.models.deletion +import fernet_fields.fields +import uuid + + +class Migration(migrations.Migration): + + dependencies = [ + migrations.swappable_dependency(settings.AUTH_USER_MODEL), + ('flow', '0009_make_size_mandatory'), + ] + + operations = [ + migrations.CreateModel( + name='Secret', + fields=[ + ('created', models.DateTimeField(auto_now_add=True, db_index=True)), + ('modified', models.DateTimeField(auto_now=True, db_index=True)), + ('handle', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)), + ('value', fernet_fields.fields.EncryptedTextField()), + ('metadata', django.contrib.postgres.fields.jsonb.JSONField(default=dict)), + ('expires', models.DateTimeField(db_index=True, null=True)), + ('contributor', models.ForeignKey(on_delete=django.db.models.deletion.PROTECT, to=settings.AUTH_USER_MODEL)), + ], + ), + ] diff --git a/resolwe/flow/models/__init__.py b/resolwe/flow/models/__init__.py index b819b6f5b..c9a0f6b6e 100644 --- a/resolwe/flow/models/__init__.py +++ b/resolwe/flow/models/__init__.py @@ -58,6 +58,14 @@ .. autoclass:: resolwe.flow.models.Storage :members: +Secret model +============ + +Postgres ORM model for storing secrets. + +.. autoclass:: resolwe.flow.models.Secret + :members: + """ from .collection import Collection @@ -65,4 +73,5 @@ from .descriptor import DescriptorSchema from .entity import Entity, Relation, RelationType from .process import Process +from .secret import Secret from .storage import Storage diff --git a/resolwe/flow/models/data.py b/resolwe/flow/models/data.py index 40f720cc0..9d55fdf1c 100644 --- a/resolwe/flow/models/data.py +++ b/resolwe/flow/models/data.py @@ -9,7 +9,7 @@ from django.conf import settings from django.contrib.postgres.fields import ArrayField, JSONField -from django.core.exceptions import ValidationError +from django.core.exceptions import PermissionDenied, ValidationError from django.core.validators import RegexValidator from django.db import models, transaction @@ -20,6 +20,7 @@ from .base import BaseModel from .descriptor import DescriptorSchema from .entity import Entity +from .secret import Secret from .storage import Storage from .utils import ( DirtyError, hydrate_input_references, hydrate_size, render_descriptor, render_template, validate_schema, @@ -195,6 +196,48 @@ def save_storage(self, instance, schema): # `value` is copied by value, so `fields[name]` must be changed fields[name] = storage.pk + def resolve_secrets(self): + """Retrieve handles for all basic:secret: fields on input. + + The process must have the ``secrets`` resource requirement + specified in order to access any secrets. Otherwise this method + will raise a ``PermissionDenied`` exception. + + :return: A dictionary of secrets where key is the secret handle + and value is the secret value. + """ + secrets = {} + for field_schema, fields in iterate_fields(self.input, self.process.input_schema): # pylint: disable=no-member + if not field_schema.get('type', '').startswith('basic:secret:'): + continue + + name = field_schema['name'] + value = fields[name] + try: + handle = value['handle'] + except KeyError: + continue + + try: + secrets[handle] = Secret.objects.get_secret( + handle, + contributor=self.contributor + ) + except Secret.DoesNotExist: + raise PermissionDenied("Access to secret not allowed or secret does not exist") + + # If the process does not not have the right requirements it is not + # allowed to access any secrets. + allowed = self.process.requirements.get('resources', {}).get('secrets', False) # pylint: disable=no-member + if secrets and not allowed: + raise PermissionDenied( + "Process '{}' has secret inputs, but no permission to see secrets".format( + self.process.slug # pylint: disable=no-member + ) + ) + + return secrets + def save_dependencies(self, instance, schema): """Save data: and list:data: references as parents.""" def add_dependency(value): diff --git a/resolwe/flow/models/secret.py b/resolwe/flow/models/secret.py new file mode 100644 index 000000000..86290a9d7 --- /dev/null +++ b/resolwe/flow/models/secret.py @@ -0,0 +1,75 @@ +"""Resolwe secrets model.""" +import uuid + +from fernet_fields import EncryptedTextField + +from django.conf import settings +from django.contrib.postgres.fields import JSONField +from django.db import models + + +class SecretManager(models.Manager): + """Manager for Secret objects.""" + + def create_secret(self, value, contributor, metadata=None, expires=None): + """Create a new secret, returning its handle. + + :param value: Secret value to store + :param contributor: User owning the secret + :param metadata: Optional metadata dictionary (must be JSON serializable) + :param expires: Optional date/time of expiry (defaults to None, which means that + the secret never expires) + :return: Secret handle + """ + if metadata is None: + metadata = {} + + secret = self.create( + value=value, + contributor=contributor, + metadata=metadata, + expires=expires, + ) + return str(secret.handle) + + def get_secret(self, handle, contributor): + """Retrieve an existing secret's value. + + :param handle: Secret handle + :param contributor: User instance to perform contributor validation, + which means that only secrets for the given contributor will be + looked up. + """ + queryset = self.all() + if contributor is not None: + queryset = queryset.filter(contributor=contributor) + secret = queryset.get(handle=handle) + return secret.value + + +class Secret(models.Model): + """Postgres model for storing secrets.""" + + #: creation date and time + created = models.DateTimeField(auto_now_add=True, db_index=True) + + #: modified date and time + modified = models.DateTimeField(auto_now=True, db_index=True) + + #: user that created the secret + contributor = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.PROTECT) + + #: secret handle + handle = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) + + #: actual secret + value = EncryptedTextField() + + #: secret metadata (not encrypted) + metadata = JSONField(default=dict) + + #: expiry time + expires = models.DateTimeField(null=True, db_index=True) + + #: secret manager + objects = SecretManager() diff --git a/resolwe/flow/models/utils.py b/resolwe/flow/models/utils.py index 997fc1eb7..33bce7953 100644 --- a/resolwe/flow/models/utils.py +++ b/resolwe/flow/models/utils.py @@ -215,11 +215,21 @@ def _hydrate_values(output, output_schema, data): """ def hydrate_path(file_name): """Hydrate file paths.""" - id_ = "{}/".format(data.id) # needs trailing slash - if id_ in file_name: - file_name = file_name[file_name.find(id_) + len(id_):] # remove id from filename + from resolwe.flow.managers import manager - return os.path.join(settings.FLOW_EXECUTOR['DATA_DIR'], id_, file_name) + class HydratedPath(str): + """String wrapper, which also stores the original filename.""" + + __slots__ = ('data_id', 'file_name') + + def __new__(cls, value=''): + """Initialize hydrated path.""" + hydrated = str.__new__(cls, value) + hydrated.data_id = data.id + hydrated.file_name = file_name + return hydrated + + return HydratedPath(manager.get_executor().resolve_data_path(data, file_name)) def hydrate_storage(storage_id): """Hydrate storage fields.""" @@ -314,6 +324,8 @@ def hydrate_input_uploads(input_, input_schema, hydrate_values=True): Add the upload location for relative paths. """ + from resolwe.flow.managers import manager + files = [] for field_schema, fields in iterate_fields(input_, input_schema): name = field_schema['name'] @@ -331,7 +343,7 @@ def hydrate_input_uploads(input_, input_schema, hydrate_values=True): if isinstance(value['file_temp'], six.string_types): # If file_temp not url, nor absolute path: hydrate path if not os.path.isabs(value['file_temp']) and not urlregex.search(value['file_temp']): - value['file_temp'] = os.path.join(settings.FLOW_EXECUTOR['UPLOAD_DIR'], value['file_temp']) + value['file_temp'] = manager.get_executor().resolve_upload_path(value['file_temp']) else: # Something very strange happened value['file_temp'] = 'Invalid value for file_temp in DB' diff --git a/resolwe/flow/routing.py b/resolwe/flow/routing.py index d9dcb8f08..6a357e704 100644 --- a/resolwe/flow/routing.py +++ b/resolwe/flow/routing.py @@ -8,18 +8,8 @@ from channels.routing import route_class -from resolwe.flow.managers import manager +from resolwe.flow.managers.consumer import ManagerConsumer -# All routing information has to be set and finished at the point when the 'route_class' -# instance below is created, as it will scan the handler class' 'method_mapping' during init -# and register itself. This means the manager's 'update_routing' method needs to be called -# before 'channel_routing' is created. It does need to be called, because the manager is -# created too soon for settings overrides (such as for testing channels) to take effect. -# Make sure this file is not imported too early (best: just leave it to Channels, the framework -# imports this at 'runworkers' time); the import prerequisites are: loaded and patched settings, -# and a loaded manager. - -manager.update_routing() channel_routing = [ # pylint: disable=invalid-name - route_class(type(manager)), + route_class(ManagerConsumer), ] diff --git a/resolwe/flow/static/flow/fieldSchema.json b/resolwe/flow/static/flow/fieldSchema.json index 24e15880c..f3edffa33 100644 --- a/resolwe/flow/static/flow/fieldSchema.json +++ b/resolwe/flow/static/flow/fieldSchema.json @@ -63,7 +63,7 @@ }, "type": { "type": "string", - "pattern": "^(basic:(boolean|text|date|datetime|url:(download|view|link)|file|file:html|dir|json):$|data:$|data:[a-z0-9:]+:$)" + "pattern": "^(basic:(boolean|text|date|datetime|url:(download|view|link)|secret|file|file:html|dir|json):$|data:$|data:[a-z0-9:]+:$)" }, "description": { "type": "string" diff --git a/resolwe/flow/static/flow/processorSchema.json b/resolwe/flow/static/flow/processorSchema.json index 236a91388..7debd6741 100644 --- a/resolwe/flow/static/flow/processorSchema.json +++ b/resolwe/flow/static/flow/processorSchema.json @@ -143,6 +143,10 @@ "network": { "description": "External network access", "type": "boolean" + }, + "secrets": { + "description": "Access to secret store", + "type": "boolean" } } } diff --git a/resolwe/flow/static/flow/typeSchema.json b/resolwe/flow/static/flow/typeSchema.json index bf622d46b..e8aaeb5be 100644 --- a/resolwe/flow/static/flow/typeSchema.json +++ b/resolwe/flow/static/flow/typeSchema.json @@ -14,6 +14,7 @@ { "$ref": "#/types/decimal" }, { "$ref": "#/types/date" }, { "$ref": "#/types/datetime" }, + { "$ref": "#/types/secret" }, { "$ref": "#/types/file" }, { "$ref": "#/types/dir" }, { "$ref": "#/types/url" }, @@ -113,6 +114,28 @@ } } }, + "secret": { + "type": "object", + "required": ["type", "value"], + "additionalProperties": false, + "properties": { + "type": { + "type": "string", + "pattern": "^basic:secret:$" + }, + "value": { + "type": "object", + "required": ["handle"], + "additionalProperties": false, + "properties": { + "handle": { + "type": "string", + "pattern": "^[0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$" + } + } + } + } + }, "file": { "type": "object", "required": ["type", "value"], diff --git a/resolwe/flow/tests/processes/secrets.yml b/resolwe/flow/tests/processes/secrets.yml new file mode 100644 index 000000000..d7e84609b --- /dev/null +++ b/resolwe/flow/tests/processes/secrets.yml @@ -0,0 +1,33 @@ +- slug: test-secrets-permission-denied + name: Secret Process + version: 1.0.0 + type: "data:test:secret" + input: + - name: token + label: Secret token + type: basic:secret + run: + program: | + # This process should never execute as it lacks the proper permissions + # to access any secrets it is passed on input + +- slug: test-secrets-echo + name: Secret Process + version: 1.0.0 + type: "data:test:secret" + requirements: + expression-engine: jinja + resources: + secrets: true + input: + - name: token + label: Secret token + type: basic:secret + output: + - name: secret + label: Secret value + type: basic:string + run: + program: | + # Echo secret to output. NEVER do this in an actual process! + re-save secret "$(cat /secrets/{{ token.handle }})" diff --git a/resolwe/flow/tests/test_secrets.py b/resolwe/flow/tests/test_secrets.py new file mode 100644 index 000000000..402434804 --- /dev/null +++ b/resolwe/flow/tests/test_secrets.py @@ -0,0 +1,65 @@ +# pylint: disable=missing-docstring +import os +import uuid + +from resolwe.flow.models import Data, Secret +from resolwe.test import ProcessTestCase, TransactionTestCase, tag_process, with_docker_executor + +PROCESSES_DIR = os.path.join(os.path.dirname(__file__), 'processes') + + +class SecretsModelTest(TransactionTestCase): + def test_secret_manager(self): + handle = Secret.objects.create_secret('foo bar', self.user) + value = Secret.objects.get_secret(handle, self.user) + self.assertEqual(value, 'foo bar') + + with self.assertRaises(Secret.DoesNotExist): + value = Secret.objects.get_secret(handle, self.admin) + + with self.assertRaises(Secret.DoesNotExist): + value = Secret.objects.get_secret(uuid.uuid4(), self.user) + + +class SecretsProcessTest(ProcessTestCase): + def setUp(self): + super().setUp() + + self._register_schemas(path=[PROCESSES_DIR]) + + # Create some secrets. + self.secret_value = 'hello secret world' + self.secret_handle = Secret.objects.create_secret(self.secret_value, self.admin) + + @with_docker_executor + @tag_process('test-secrets-permission-denied') + def test_permission_denied(self): + data = self.run_process( + 'test-secrets-permission-denied', + input_={'token': {'handle': self.secret_handle}}, + assert_status=Data.STATUS_ERROR + ) + self.assertIn( + "Permission denied for process: Process 'test-secrets-permission-denied' has secret " + "inputs, but no permission to see secrets", + data.process_error + ) + + @with_docker_executor + @tag_process('test-secrets-echo') + def test_secret_echo(self): + data = self.run_process('test-secrets-echo', input_={'token': {'handle': self.secret_handle}}) + self.assertEqual(data.output['secret'], self.secret_value) + + @tag_process('test-secrets-echo') + def test_secret_local_executor(self): + data = self.run_process( + 'test-secrets-echo', + input_={'token': {'handle': self.secret_handle}}, + assert_status=Data.STATUS_ERROR + ) + self.assertIn( + "Permission denied for process: Process which requires access to secrets cannot be run " + "using the local executor", + data.process_error + ) diff --git a/resolwe/test/testcases/setting_overrides.py b/resolwe/test/testcases/setting_overrides.py index 225f52fa4..55512e2ca 100644 --- a/resolwe/test/testcases/setting_overrides.py +++ b/resolwe/test/testcases/setting_overrides.py @@ -11,39 +11,6 @@ TEST_SETTINGS_OVERRIDES = getattr(settings, 'FLOW_EXECUTOR', {}).get('TEST', {}) FLOW_EXECUTOR_SETTINGS.update(TEST_SETTINGS_OVERRIDES) - -# update FLOW_DOCKER_MAPPINGS setting if necessary -def _get_updated_docker_mappings(previous_settings=getattr(settings, 'FLOW_EXECUTOR', {})): - """Reconstruct the docker mappings setting. - - Any pre-existing patched mappings are needed here because the new - mappings are constructed using a simple search and replace: - occurences of the old directory settings are replaced with the - updated ones. - - :param previous_settings: The original flow executor settings before - update, needed to construct new mappings. - :return: The updated docker mappings dictionary. - :rtype: dict - """ - if 'FLOW_DOCKER_MAPPINGS' in globals(): - previous_mappings = FLOW_DOCKER_MAPPINGS - else: - # Read FLOW_DOCKER_MAPPINGS setting from Django settings upon importing - # the module - previous_mappings = getattr(settings, 'FLOW_DOCKER_MAPPINGS', {}) - mappings_copy = copy.deepcopy(previous_mappings) - for map_ in mappings_copy: - for map_entry in ['src', 'dest']: - for setting in ['DATA_DIR', 'UPLOAD_DIR']: - if previous_settings[setting] in map_[map_entry]: - map_[map_entry] = map_[map_entry].replace( - previous_settings[setting], FLOW_EXECUTOR_SETTINGS[setting], 1) - return mappings_copy - - -FLOW_DOCKER_MAPPINGS = _get_updated_docker_mappings() - # override all FLOW_MANAGER settings that are specified in FLOW_MANAGER['TEST'] FLOW_MANAGER_SETTINGS = copy.deepcopy(getattr(settings, 'FLOW_MANAGER', {})) TEST_SETTINGS_OVERRIDES = getattr(settings, 'FLOW_MANAGER', {}).get('TEST', {}) diff --git a/resolwe/test_helpers/test_runner.py b/resolwe/test_helpers/test_runner.py index e8ebc1a04..23292c622 100644 --- a/resolwe/test_helpers/test_runner.py +++ b/resolwe/test_helpers/test_runner.py @@ -6,7 +6,6 @@ """ import contextlib -import copy import logging import os import re @@ -196,15 +195,10 @@ def _sequence_paths(paths): def _create_test_dirs(): """Create all the testing directories.""" - # Due to the unique way the docker mappings are updated, we need - # the pre-fudge settings copied and sheltered for the updater to work. - previous_settings = copy.deepcopy(resolwe_settings.FLOW_EXECUTOR_SETTINGS) items = ['DATA_DIR', 'UPLOAD_DIR', 'RUNTIME_DIR'] paths = _sequence_paths([resolwe_settings.FLOW_EXECUTOR_SETTINGS[i] for i in items]) for item, path in zip(items, paths): resolwe_settings.FLOW_EXECUTOR_SETTINGS[item] = path - resolwe_settings.FLOW_DOCKER_MAPPINGS =\ - resolwe_settings._get_updated_docker_mappings(previous_settings) # pylint: disable=protected-access def _prepare_settings(): @@ -223,7 +217,6 @@ def _prepare_settings(): FLOW_MANAGER_DISABLE_CTYPE_CACHE=True, FLOW_EXECUTOR=resolwe_settings.FLOW_EXECUTOR_SETTINGS, FLOW_MANAGER=resolwe_settings.FLOW_MANAGER_SETTINGS, - FLOW_DOCKER_MAPPINGS=resolwe_settings.FLOW_DOCKER_MAPPINGS ) diff --git a/setup.py b/setup.py index 6609a0681..83d9ade6e 100644 --- a/setup.py +++ b/setup.py @@ -58,6 +58,7 @@ 'django-guardian>=1.4.2', 'django-mathfilters>=0.3.0', 'django-versionfield2>=0.5.0', + 'django-fernet-fields>=0.5', 'elasticsearch-dsl~=2.2.0', 'psycopg2>=2.5.0', 'mock>=1.3.0', diff --git a/tests/.gitignore b/tests/.gitignore new file mode 100644 index 000000000..e629e70d0 --- /dev/null +++ b/tests/.gitignore @@ -0,0 +1,4 @@ +# Ignore files created during tests. +.test_data/test_* +.test_runtime/test_* +.test_upload/test_* diff --git a/tests/settings.py b/tests/settings.py index e6ce585a9..ef0b8be78 100644 --- a/tests/settings.py +++ b/tests/settings.py @@ -161,17 +161,16 @@ 'resolwe.flow.finders.AppDirectoriesFinder', ) -FLOW_DOCKER_MAPPINGS = [ - {'src': os.path.join(FLOW_EXECUTOR['DATA_DIR'], '{data_id}'), - 'dest': '/data', - 'mode': 'rw,Z'}, - {'src': FLOW_EXECUTOR['DATA_DIR'], - 'dest': '/data_all', - 'mode': 'ro,z'}, - {'src': FLOW_EXECUTOR['UPLOAD_DIR'], - 'dest': '/upload', - 'mode': 'rw,z'}, -] +FLOW_DOCKER_VOLUME_EXTRA_OPTIONS = { + 'data': 'Z', + 'data_all': 'z', + 'upload': 'z', + 'secrets': 'Z', + 'users': 'Z', + 'tools': 'z', +} + +FLOW_DOCKER_EXTRA_VOLUMES = [] REST_FRAMEWORK = { 'DEFAULT_AUTHENTICATION_CLASSES': (