From 322f52ae0356949cdf9578ce8f591820d15d2414 Mon Sep 17 00:00:00 2001 From: Peter Killick Date: Mon, 27 Mar 2017 11:49:21 +0100 Subject: [PATCH 1/7] Add options.py --- lib/iris/__init__.py | 1 + lib/iris/options.py | 93 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 94 insertions(+) create mode 100644 lib/iris/options.py diff --git a/lib/iris/__init__.py b/lib/iris/__init__.py index 3019a0adcb..1b1b91dafc 100644 --- a/lib/iris/__init__.py +++ b/lib/iris/__init__.py @@ -113,6 +113,7 @@ def callback(cube, field, filename): from iris._deprecation import IrisDeprecation, warn_deprecated import iris.fileformats import iris.io +import iris.options try: diff --git a/lib/iris/options.py b/lib/iris/options.py new file mode 100644 index 0000000000..9008fe6ecb --- /dev/null +++ b/lib/iris/options.py @@ -0,0 +1,93 @@ +# (C) British Crown Copyright 2017, Met Office +# +# This file is part of Iris. +# +# Iris is free software: you can redistribute it and/or modify it under +# the terms of the GNU Lesser General Public License as published by the +# Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Iris is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with Iris. If not, see . +""" +Control runtime options of Iris. + +""" +from __future__ import (absolute_import, division, print_function) +from six.moves import (filter, input, map, range, zip) # noqa + +from multiprocessing import cpu_count +import re +import warnings + +import dask +import distributed + + +class Parallel(object): + """ + Control dask parallel-processing options for Iris. + + """ + _default_scheduler = 'threaded' + + def __init__(self, num_workers=1, scheduler=_default_scheduler, pool=None): + self._num_workers = num_workers + self._scheduler = scheduler + self.pool = pool + + self._dask_scheduler = None + + @property + def num_workers(self): + return self._num_workers + + @num_workers.setter + def num_workers(self, value): + if value >= cpu_count(): + # Limit maximum CPUs used to 1 fewer than all available CPUs. + wmsg = ('Requested more CPUs ({}) than total available ({}). ' + 'Limiting number of used CPUs to {}.') + warnings.warn(wmsg.format(value, cpu_count(), cpu_count()-1)) + value = cpu_count() - 1 + self._num_workers = value + + @property + def scheduler(self): + return self._scheduler + + @scheduler.setter + def scheduler(self, value): + if value == 'threaded': + self._scheduler = value + self.dask_scheduler = dask.threaded.get + elif value == 'multiprocessing': + self._scheduler = value + self.dask_scheduler = dask.multiprocessing.get + elif value == 'async': + self._scheduler = value + self.dask_scheduler = dask.async.get_sync + elif re.match(r'^(\d{1,3}\.){3}\d{1,3}:\d{1,5}$', value): + self._scheduler = value + self.dask_scheduler = distributed.Client.get + else: + # Invalid value for `scheduler`. + wmsg = 'Invalid value for scheduler: {!r}. Defaulting to {}.' + warnings.warn(wmsg.format(value, self._default_scheduler)) + self.scheduler = self._default_scheduler + + @property + def dask_scheduler(self): + return self._dask_scheduler + + @dask_scheduler.setter + def dask_scheduler(self, value): + self._dask_scheduler = value + + +parallel = Parallel From d49d90bed703c5259c717ea9beafc4ad053f74ee Mon Sep 17 00:00:00 2001 From: Peter Killick Date: Tue, 28 Mar 2017 13:24:50 +0100 Subject: [PATCH 2/7] Finished draft of options.parallel --- lib/iris/_lazy_data.py | 4 ++ lib/iris/options.py | 156 +++++++++++++++++++++++++++++++++++------ 2 files changed, 140 insertions(+), 20 deletions(-) diff --git a/lib/iris/_lazy_data.py b/lib/iris/_lazy_data.py index 7d525477d6..83c41a7d8b 100644 --- a/lib/iris/_lazy_data.py +++ b/lib/iris/_lazy_data.py @@ -95,6 +95,10 @@ def as_concrete_data(data, **kwargs): Returns: A NumPy `ndarray` or masked array. + .. note:: + Specific dask options for computation are controlled by + :class:`iris.options.Parallel`. + """ if is_lazy_data(data): # Realise dask array, ensuring the data result is always a NumPy array. diff --git a/lib/iris/options.py b/lib/iris/options.py index 9008fe6ecb..7110ebc2c5 100644 --- a/lib/iris/options.py +++ b/lib/iris/options.py @@ -22,6 +22,7 @@ from six.moves import (filter, input, map, range, zip) # noqa from multiprocessing import cpu_count +from multiprocessing.pool import ThreadPool import re import warnings @@ -31,31 +32,88 @@ class Parallel(object): """ - Control dask parallel-processing options for Iris. + Control dask parallel processing options for Iris. """ - _default_scheduler = 'threaded' + def __init__(self, scheduler=None, num_workers=None): + """ + Set up options for dask parallel processing. + + Currently accepted kwargs: + + * scheduler: + The scheduler used to run a dask graph. Must be set to one of: + + * 'threaded': (default) + The scheduler processes the graph in parallel using a + thread pool. Good for processing dask arrays and dataframes. + * 'multiprocessing': + The scheduler processes the graph in parallel using a + process pool. Good for processing dask bags. + * 'async': + The scheduler runs synchronously (not in parallel). Good for + debugging. + * The IP address and port of a distributed scheduler: + Specifies the location of a distributed scheduler that has + already been set up. The distributed scheduler will process the + graph. + + For more information see + http://dask.pydata.org/en/latest/scheduler-overview.html. + + * num_workers: + The number of worker threads or processess to use to run the dask + graph in parallel. Defaults to 1 (that is, processed serially). + + .. note:: + The value for `num_workers` cannot be set to greater than the + number of CPUs available on the host system. If such a value is + requested, `num_workers` is automatically set to 1 less than + the number of CPUs available on the host system. + + .. note:: + Only the 'threaded' and 'multiprocessing' schedulers support + the `num_workers` kwarg. If it is specified with the `async` or + `distributed` scheduler, the kwarg is ignored: + + * The 'async' scheduler runs serially so will only use a single + worker. + * The number of workers for the 'distributed' scheduler must be + defined when setting up the distributed scheduler. For more + information on setting up distributed schedulers, see + https://distributed.readthedocs.io/en/latest/index.html. + + Example usages: + + * Specify that we want to load a cube with dask parallel processing + using multiprocessing with six worker processes:: + + >>> iris.options.parallel(scheduler='multiprocessing', num_workers=6) + >>> iris.load('my_dataset.nc') + + * Specify, with a context manager, that we want to load a cube with + dask parallel processing using four worker threads:: + + >>> with iris.options.parallel(scheduler='threaded', num_workers=4): + ... iris.load('my_dataset.nc') + + * Run dask parallel processing using a distributed scheduler that has + been set up at the IP address and port at ``192.168.0.219:8786``:: + + >>> iris.options.parallel(scheduler='192.168.0.219:8786') + + """ + # Set some defaults first of all. + self._default_scheduler = 'threaded' + self._default_num_workers = 1 - def __init__(self, num_workers=1, scheduler=_default_scheduler, pool=None): - self._num_workers = num_workers self._scheduler = scheduler - self.pool = pool + self._num_workers = num_workers self._dask_scheduler = None - @property - def num_workers(self): - return self._num_workers - - @num_workers.setter - def num_workers(self, value): - if value >= cpu_count(): - # Limit maximum CPUs used to 1 fewer than all available CPUs. - wmsg = ('Requested more CPUs ({}) than total available ({}). ' - 'Limiting number of used CPUs to {}.') - warnings.warn(wmsg.format(value, cpu_count(), cpu_count()-1)) - value = cpu_count() - 1 - self._num_workers = value + # Activate the specified dask options. + self._set_dask_options() @property def scheduler(self): @@ -63,6 +121,8 @@ def scheduler(self): @scheduler.setter def scheduler(self, value): + if value is None: + value = self._default_scheduler if value == 'threaded': self._scheduler = value self.dask_scheduler = dask.threaded.get @@ -73,14 +133,43 @@ def scheduler(self, value): self._scheduler = value self.dask_scheduler = dask.async.get_sync elif re.match(r'^(\d{1,3}\.){3}\d{1,3}:\d{1,5}$', value): - self._scheduler = value - self.dask_scheduler = distributed.Client.get + self._scheduler = 'distributed' + self.dask_scheduler = value else: # Invalid value for `scheduler`. wmsg = 'Invalid value for scheduler: {!r}. Defaulting to {}.' warnings.warn(wmsg.format(value, self._default_scheduler)) self.scheduler = self._default_scheduler + @property + def num_workers(self): + return self._num_workers + + @num_workers.setter + def num_workers(self, value): + if self.scheduler == 'async': + wmsg = 'Cannot set `num_workers` for the serial scheduler {!r}.' + warnings.warn(wmsg.format(self.scheduler)) + value = None + elif self.scheduler == 'distributed': + wmsg = ('Attempting to set `num_workers` with the {!r} scheduler ' + 'requested. Please instead specify number of workers when ' + 'setting up the distributed scheduler. See ' + 'https://distributed.readthedocs.io/en/latest/index.html ' + 'for more details.') + warnings.warn(wmsg) + value = None + else: + if value is None: + value = self._default_num_workers + if value >= cpu_count(): + # Limit maximum CPUs used to 1 fewer than all available CPUs. + wmsg = ('Requested more CPUs ({}) than total available ({}). ' + 'Limiting number of used CPUs to {}.') + warnings.warn(wmsg.format(value, cpu_count(), cpu_count()-1)) + value = cpu_count() - 1 + self._num_workers = value + @property def dask_scheduler(self): return self._dask_scheduler @@ -89,5 +178,32 @@ def dask_scheduler(self): def dask_scheduler(self, value): self._dask_scheduler = value + def _set_dask_options(self): + """ + Use `dask.set_options` to globally apply the options specified at + instantiation, either for the lifetime of the session or + context manager. + + """ + get = self.dask_scheduler + pool = None + if self.scheduler in ['threaded', 'multiprocessing']: + pool = ThreadPool(self.num_workers) + if self.scheduler == 'distributed': + get = distributed.Client(self.dask_scheduler).get + + dask.set_options(get=get, pool=pool) + + def get(self, item): + return getattr(self, item) + + def __enter__(self): + return + + def __exit__(self, exception_type, exception_value, exception_traceback): + self.num_workers = self._default_num_workers + self.scheduler = self._default_scheduler + self._set_dask_options() + parallel = Parallel From f2ac954c0ff2dc43091d03b55b237acedb11b14e Mon Sep 17 00:00:00 2001 From: Peter Killick Date: Wed, 29 Mar 2017 09:47:24 +0100 Subject: [PATCH 3/7] Add tests --- lib/iris/options.py | 14 +- lib/iris/tests/unit/options/__init__.py | 20 ++ lib/iris/tests/unit/options/test_Parallel.py | 214 +++++++++++++++++++ 3 files changed, 242 insertions(+), 6 deletions(-) create mode 100644 lib/iris/tests/unit/options/__init__.py create mode 100644 lib/iris/tests/unit/options/test_Parallel.py diff --git a/lib/iris/options.py b/lib/iris/options.py index 7110ebc2c5..91e7b29eba 100644 --- a/lib/iris/options.py +++ b/lib/iris/options.py @@ -27,6 +27,7 @@ import warnings import dask +import dask.multiprocessing import distributed @@ -35,7 +36,7 @@ class Parallel(object): Control dask parallel processing options for Iris. """ - def __init__(self, scheduler=None, num_workers=None): + def __init__(self, scheduler='threaded', num_workers=1): """ Set up options for dask parallel processing. @@ -107,8 +108,8 @@ def __init__(self, scheduler=None, num_workers=None): self._default_scheduler = 'threaded' self._default_num_workers = 1 - self._scheduler = scheduler - self._num_workers = num_workers + self.scheduler = scheduler + self.num_workers = num_workers self._dask_scheduler = None @@ -147,17 +148,18 @@ def num_workers(self): @num_workers.setter def num_workers(self, value): - if self.scheduler == 'async': + if self.scheduler == 'async' and value != self._default_num_workers: wmsg = 'Cannot set `num_workers` for the serial scheduler {!r}.' warnings.warn(wmsg.format(self.scheduler)) value = None - elif self.scheduler == 'distributed': + elif (self.scheduler == 'distributed' and + value != self._default_num_workers): wmsg = ('Attempting to set `num_workers` with the {!r} scheduler ' 'requested. Please instead specify number of workers when ' 'setting up the distributed scheduler. See ' 'https://distributed.readthedocs.io/en/latest/index.html ' 'for more details.') - warnings.warn(wmsg) + warnings.warn(wmsg.format(self.scheduler)) value = None else: if value is None: diff --git a/lib/iris/tests/unit/options/__init__.py b/lib/iris/tests/unit/options/__init__.py new file mode 100644 index 0000000000..93397c38fa --- /dev/null +++ b/lib/iris/tests/unit/options/__init__.py @@ -0,0 +1,20 @@ +# (C) British Crown Copyright 2017, Met Office +# +# This file is part of Iris. +# +# Iris is free software: you can redistribute it and/or modify it under +# the terms of the GNU Lesser General Public License as published by the +# Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Iris is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with Iris. If not, see . +"""Unit tests for the :mod:`iris.options` module.""" + +from __future__ import (absolute_import, division, print_function) +from six.moves import (filter, input, map, range, zip) # noqa diff --git a/lib/iris/tests/unit/options/test_Parallel.py b/lib/iris/tests/unit/options/test_Parallel.py new file mode 100644 index 0000000000..df31b4102f --- /dev/null +++ b/lib/iris/tests/unit/options/test_Parallel.py @@ -0,0 +1,214 @@ +# (C) British Crown Copyright 2017, Met Office +# +# This file is part of Iris. +# +# Iris is free software: you can redistribute it and/or modify it under +# the terms of the GNU Lesser General Public License as published by the +# Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Iris is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with Iris. If not, see . +"""Unit tests for the `iris.options.Paralle` class.""" + +from __future__ import (absolute_import, division, print_function) +from six.moves import (filter, input, map, range, zip) # noqa +import six + +# Import iris.tests first so that some things can be initialised before +# importing anything else. +import iris.tests as tests + +import warnings + +from dask import set_options, async + +from iris.options import Parallel +from iris.tests import mock + + +class Test_operation(tests.IrisTest): + # Check that the options are passed through to 'real' code. + # NOTE: tests that call the option class directly and as a contextmgr. + pass + + +class Test__set_dask_options(tests.IrisTest): + # Check the correct dask options are set given the inputs. + # NOTE: tests that check the correct dask options are set + # (will require mock :scream:). + # def setUp(self): + # patcher = mock.patch('dask.set_options') + # self.addCleanup(patcher.stop) + # self.mock_dask_opts = patcher.start() + + def test_default(self): + pass + + def test_threaded(self): + pass + + def test_threaded_num_workers(self): + pass + + def test_async(self): + scheduler = 'async' + # dask_options = mock.Mock(spec=set_options) + # dask_scheduler = mock.Mock(spec=async.get_sync) + with mock.patch('dask.set_options') as mock_dask_opts: + Parallel(scheduler=scheduler) + # dask_options.assert_called_once_with(get=dask_scheduler) + mock_dask_opts.assert_any_call() + + +class Test_set_schedulers(tests.IrisTest): + # Check that the correct scheduler and dask scheduler are chosen given the + # inputs. + def test_default(self): + opts = Parallel() + result = opts.scheduler + expected = opts._default_scheduler + self.assertEqual(result, expected) + + def test_threaded(self): + scheduler = 'threaded' + opts = Parallel(scheduler=scheduler) + result = opts.scheduler + self.assertEqual(result, scheduler) + + def test_multiprocessing(self): + scheduler = 'multiprocessing' + opts = Parallel(scheduler=scheduler) + result = opts.scheduler + self.assertEqual(result, scheduler) + + def test_async(self): + scheduler = 'async' + opts = Parallel(scheduler=scheduler) + result = opts.scheduler + self.assertEqual(result, scheduler) + + def test_distributed(self): + scheduler = '192.168.0.128:8786' + opts = Parallel(scheduler=scheduler) + result = opts.scheduler + self.assertEqual(result, 'distributed') + + def test_bad(self): + scheduler = 'wibble' + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter('always') + opts = Parallel(scheduler=scheduler) + result = opts.scheduler + expected = opts._default_scheduler + self.assertEqual(result, expected) + exp_wmsg = 'Invalid value for scheduler: {!r}' + six.assertRegex(self, str(w[0].message), exp_wmsg.format(scheduler)) + + +class Test_set_dask_scheduler(tests.IrisTest): + # Check that the correct scheduler and dask scheduler are chosen given the + # inputs. + def test_default(self): + opts = Parallel() + result = opts.scheduler + expected = opts._default_scheduler + self.assertEqual(result, expected) + + def test_threaded(self): + scheduler = 'threaded' + opts = Parallel(scheduler=scheduler) + result = opts.scheduler + self.assertEqual(result, scheduler) + + def test_multiprocessing(self): + scheduler = 'multiprocessing' + opts = Parallel(scheduler=scheduler) + result = opts.scheduler + self.assertEqual(result, scheduler) + + def test_async(self): + scheduler = 'async' + opts = Parallel(scheduler=scheduler) + result = opts.scheduler + self.assertEqual(result, scheduler) + + def test_distributed(self): + scheduler = '192.168.0.128:8786' + opts = Parallel(scheduler=scheduler) + result = opts.scheduler + self.assertEqual(result, 'distributed') + + def test_bad(self): + scheduler = 'wibble' + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter('always') + opts = Parallel(scheduler=scheduler) + result = opts.scheduler + expected = opts._default_scheduler + self.assertEqual(result, expected) + exp_wmsg = 'Invalid value for scheduler: {!r}' + six.assertRegex(self, str(w[0].message), exp_wmsg.format(scheduler)) + + +class Test_set_num_workers(tests.IrisTest): + # Check that the correct `num_workers` are chosen given the inputs. + def test_default(self): + opts = Parallel() + result = opts.num_workers + expected = opts._default_num_workers + self.assertEqual(result, expected) + + def test_basic(self): + n_workers = 5 + opts = Parallel(num_workers=n_workers) + result = opts.num_workers + self.assertEqual(result, n_workers) + + def test_too_many_workers(self): + max_cpus = 8 + n_workers = 12 + with mock.patch('multiprocessing.cpu_count', return_value=max_cpus): + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter('always') + opts = Parallel(num_workers=n_workers) + result = opts.num_workers + self.assertEqual(result, max_cpus-1) + exp_wmsg = ('Requested more CPUs ({}) than total available ({}). ' + 'Limiting number of used CPUs to {}.') + self.assertEqual(str(w[0].message), + exp_wmsg.format(n_workers, max_cpus, max_cpus-1)) + + def test_negative_workers(self): + n_workers = -2 + exp_emsg = "Number of processes must be at least 1" + with self.assertRaisesRegexp(ValueError, exp_emsg): + Parallel(num_workers=n_workers) + + def test_async(self): + scheduler = 'async' + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter('always') + opts = Parallel(scheduler=scheduler, num_workers=5) + self.assertIsNone(opts.num_workers) + exp_wmsg = 'Cannot set `num_workers` for the serial scheduler {!r}' + six.assertRegex(self, str(w[0].message), exp_wmsg.format(scheduler)) + + def test_distributed(self): + scheduler = '192.168.0.128:8786' + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter('always') + opts = Parallel(scheduler=scheduler, num_workers=5) + self.assertIsNone(opts.num_workers) + exp_wmsg = 'Attempting to set `num_workers` with the {!r} scheduler' + six.assertRegex(self, str(w[0].message), + exp_wmsg.format('distributed')) + + +if __name__ == '__main__': + tests.main() From afef8ab50b0e91a2107b26a3a0407077d8c64b5a Mon Sep 17 00:00:00 2001 From: Peter Killick Date: Thu, 30 Mar 2017 15:58:13 +0100 Subject: [PATCH 4/7] Rewrite, and some tests --- conda-requirements.txt | 1 + lib/iris/options.py | 200 ++++++++++++------- lib/iris/tests/unit/options/test_Parallel.py | 100 +++++----- minimal-conda-requirements.txt | 1 + 4 files changed, 188 insertions(+), 114 deletions(-) diff --git a/conda-requirements.txt b/conda-requirements.txt index cd89693e57..06e957b608 100644 --- a/conda-requirements.txt +++ b/conda-requirements.txt @@ -11,6 +11,7 @@ pyke udunits2 cf_units dask +distributed # Iris build dependencies setuptools diff --git a/lib/iris/options.py b/lib/iris/options.py index 91e7b29eba..780ad4e92a 100644 --- a/lib/iris/options.py +++ b/lib/iris/options.py @@ -20,7 +20,9 @@ """ from __future__ import (absolute_import, division, print_function) from six.moves import (filter, input, map, range, zip) # noqa +import six +import contextlib from multiprocessing import cpu_count from multiprocessing.pool import ThreadPool import re @@ -31,12 +33,45 @@ import distributed -class Parallel(object): +class Option(object): + """ + An abstract superclass to enforce certain key behaviours for all `Option` + classes. + + """ + @property + def _defaults_dict(self): + raise NotImplementedError + + def __setattr__(self, name, value): + if name not in self.__dict__: + # Can't add new names. + msg = "'Option' object has no attribute {!r}".format(name) + raise AttributeError(msg) + if value is None: + # Set an explicitly unset value to the default value for the name. + value = self._defaults_dict[name]['default'] + if self._defaults_dict[name]['options'] is not None: + # Replace a bad value with the default if there is a defined set of + # specified good values. + if value not in self._defaults_dict[name]['options']: + good_value = self._defaults_dict[name]['default'] + wmsg = ('Attempting to set bad value {!r} for attribute {!r}. ' + 'Defaulting to {!r}.') + warnings.warn(wmsg.format(value, name, good_value)) + value = good_value + self.__dict__[name] = value + + def context(self): + raise NotImplementedError + + +class Parallel(Option): """ Control dask parallel processing options for Iris. """ - def __init__(self, scheduler='threaded', num_workers=1): + def __init__(self, scheduler=None, num_workers=None): """ Set up options for dask parallel processing. @@ -89,96 +124,115 @@ def __init__(self, scheduler='threaded', num_workers=1): * Specify that we want to load a cube with dask parallel processing using multiprocessing with six worker processes:: - >>> iris.options.parallel(scheduler='multiprocessing', num_workers=6) - >>> iris.load('my_dataset.nc') + iris.options.parallel(scheduler='multiprocessing', num_workers=6) + iris.load('my_dataset.nc') * Specify, with a context manager, that we want to load a cube with dask parallel processing using four worker threads:: - >>> with iris.options.parallel(scheduler='threaded', num_workers=4): - ... iris.load('my_dataset.nc') + with iris.options.parallel(scheduler='threaded', num_workers=4): + iris.load('my_dataset.nc') * Run dask parallel processing using a distributed scheduler that has been set up at the IP address and port at ``192.168.0.219:8786``:: - >>> iris.options.parallel(scheduler='192.168.0.219:8786') + iris.options.parallel(scheduler='192.168.0.219:8786') """ - # Set some defaults first of all. - self._default_scheduler = 'threaded' - self._default_num_workers = 1 + # Set `__dict__` keys first. + self.__dict__['_scheduler'] = scheduler + self.__dict__['scheduler'] = None + self.__dict__['num_workers'] = None + self.__dict__['dask_scheduler'] = None - self.scheduler = scheduler - self.num_workers = num_workers - - self._dask_scheduler = None + # Set `__dict__` values for each kwarg. + setattr(self, 'scheduler', scheduler) + setattr(self, 'num_workers', num_workers) + setattr(self, 'dask_scheduler', self.get('scheduler')) # Activate the specified dask options. self._set_dask_options() + def __setattr__(self, name, value): + if value is None: + value = self._defaults_dict[name]['default'] + attr_setter = getattr(self, 'set_{}'.format(name)) + value = attr_setter(value) + super(Parallel, self).__setattr__(name, value) + @property - def scheduler(self): - return self._scheduler + def _defaults_dict(self): + """ + Define the default value and available options for each settable + `kwarg` of this `Option`. + + Note: `'options'` can be set to `None` if it is not reasonable to + specify all possible options. For example, this may be reasonable if + the `'options'` were a range of numbers. - @scheduler.setter - def scheduler(self, value): + """ + return {'_scheduler': {'default': None, 'options': None}, + 'scheduler': {'default': 'threaded', + 'options': ['threaded', + 'multiprocessing', + 'async', + 'distributed']}, + 'num_workers': {'default': 1, 'options': None}, + 'dask_scheduler': {'default': None, 'options': None}, + } + + def set__scheduler(self, value): + return value + + def set_scheduler(self, value): + default = self._defaults_dict['scheduler']['default'] if value is None: - value = self._default_scheduler - if value == 'threaded': - self._scheduler = value - self.dask_scheduler = dask.threaded.get - elif value == 'multiprocessing': - self._scheduler = value - self.dask_scheduler = dask.multiprocessing.get - elif value == 'async': - self._scheduler = value - self.dask_scheduler = dask.async.get_sync + value = default elif re.match(r'^(\d{1,3}\.){3}\d{1,3}:\d{1,5}$', value): - self._scheduler = 'distributed' - self.dask_scheduler = value - else: + value = 'distributed' + elif value not in self._defaults_dict['scheduler']['options']: # Invalid value for `scheduler`. wmsg = 'Invalid value for scheduler: {!r}. Defaulting to {}.' - warnings.warn(wmsg.format(value, self._default_scheduler)) - self.scheduler = self._default_scheduler - - @property - def num_workers(self): - return self._num_workers - - @num_workers.setter - def num_workers(self, value): - if self.scheduler == 'async' and value != self._default_num_workers: + warnings.warn(wmsg.format(value, default)) + self.set_scheduler(default) + return value + + def set_num_workers(self, value): + default = self._defaults_dict['num_workers']['default'] + scheduler = self.get('scheduler') + if scheduler == 'async' and value != default: wmsg = 'Cannot set `num_workers` for the serial scheduler {!r}.' - warnings.warn(wmsg.format(self.scheduler)) + warnings.warn(wmsg.format(scheduler)) value = None - elif (self.scheduler == 'distributed' and - value != self._default_num_workers): + elif scheduler == 'distributed' and value != default: wmsg = ('Attempting to set `num_workers` with the {!r} scheduler ' 'requested. Please instead specify number of workers when ' 'setting up the distributed scheduler. See ' 'https://distributed.readthedocs.io/en/latest/index.html ' 'for more details.') - warnings.warn(wmsg.format(self.scheduler)) + warnings.warn(wmsg.format(scheduler)) value = None else: if value is None: - value = self._default_num_workers + value = default if value >= cpu_count(): # Limit maximum CPUs used to 1 fewer than all available CPUs. wmsg = ('Requested more CPUs ({}) than total available ({}). ' 'Limiting number of used CPUs to {}.') warnings.warn(wmsg.format(value, cpu_count(), cpu_count()-1)) value = cpu_count() - 1 - self._num_workers = value - - @property - def dask_scheduler(self): - return self._dask_scheduler - - @dask_scheduler.setter - def dask_scheduler(self, value): - self._dask_scheduler = value + return value + + def set_dask_scheduler(self, scheduler): + if scheduler == 'threaded': + value = dask.threaded.get + elif scheduler == 'multiprocessing': + value = dask.multiprocessing.get + elif scheduler == 'async': + value = dask.async.get_sync + elif scheduler == 'distributed': + value = self.get('_scheduler') + return value def _set_dask_options(self): """ @@ -187,25 +241,37 @@ def _set_dask_options(self): context manager. """ - get = self.dask_scheduler + scheduler = self.get('scheduler') + num_workers = self.get('num_workers') + get = self.get('dask_scheduler') pool = None - if self.scheduler in ['threaded', 'multiprocessing']: - pool = ThreadPool(self.num_workers) - if self.scheduler == 'distributed': - get = distributed.Client(self.dask_scheduler).get + + if scheduler in ['threaded', 'multiprocessing']: + pool = ThreadPool(num_workers) + if scheduler == 'distributed': + get = distributed.Client(get).get dask.set_options(get=get, pool=pool) def get(self, item): return getattr(self, item) - def __enter__(self): - return - - def __exit__(self, exception_type, exception_value, exception_traceback): - self.num_workers = self._default_num_workers - self.scheduler = self._default_scheduler + @contextlib.contextmanager + def context(self, **kwargs): + # Snapshot the starting state for restoration at the end of the + # contextmanager block. + starting_state = self.__dict__.copy() + # Update the state to reflect the requested changes. + for name, value in six.iteritems(kwargs): + setattr(self, name, value) self._set_dask_options() + try: + yield + finally: + # Return the state to the starting state. + self.__dict__.clear() + self.__dict__.update(starting_state) + self._set_dask_options() -parallel = Parallel +parallel = Parallel() diff --git a/lib/iris/tests/unit/options/test_Parallel.py b/lib/iris/tests/unit/options/test_Parallel.py index df31b4102f..8b63995e6f 100644 --- a/lib/iris/tests/unit/options/test_Parallel.py +++ b/lib/iris/tests/unit/options/test_Parallel.py @@ -24,9 +24,11 @@ # importing anything else. import iris.tests as tests +import multiprocessing import warnings -from dask import set_options, async +import dask +import distributed from iris.options import Parallel from iris.tests import mock @@ -35,7 +37,10 @@ class Test_operation(tests.IrisTest): # Check that the options are passed through to 'real' code. # NOTE: tests that call the option class directly and as a contextmgr. - pass + + def test_bad_name__contextmgr(self): + # Check we can't do `with iris.options.parallel.context('foo'='bar')`. + pass class Test__set_dask_options(tests.IrisTest): @@ -47,23 +52,34 @@ class Test__set_dask_options(tests.IrisTest): # self.addCleanup(patcher.stop) # self.mock_dask_opts = patcher.start() + def setUp(self): + self.mock_dask = dask + self.mock_dask.threaded.get = mock.MagicMock() + self.mock_dask.set_options = mock.MagicMock() + self.mock_mul = multiprocessing + self.mock_mul.pool.ThreadPool = mock.MagicMock() + def test_default(self): pass def test_threaded(self): - pass + scheduler = 'threaded' + Parallel(scheduler=scheduler) + # self.mock_mul.pool.ThreadPool.assert_called_once_with(1) + self.mock_dask.set_options.assert_called_once_with(get=self.mock_dask.threaded.get, + pool=self.mock_mul.pool.ThreadPool) def test_threaded_num_workers(self): pass - def test_async(self): - scheduler = 'async' - # dask_options = mock.Mock(spec=set_options) - # dask_scheduler = mock.Mock(spec=async.get_sync) - with mock.patch('dask.set_options') as mock_dask_opts: - Parallel(scheduler=scheduler) - # dask_options.assert_called_once_with(get=dask_scheduler) - mock_dask_opts.assert_any_call() + # def test_async(self): + # scheduler = 'async' + # # dask_options = mock.Mock(spec=set_options) + # # dask_scheduler = mock.Mock(spec=async.get_sync) + # with mock.patch('dask.set_options') as mock_dask_opts: + # Parallel(scheduler=scheduler) + # # dask_options.assert_called_once_with(get=dask_scheduler) + # mock_dask_opts.assert_any_call() class Test_set_schedulers(tests.IrisTest): @@ -71,32 +87,32 @@ class Test_set_schedulers(tests.IrisTest): # inputs. def test_default(self): opts = Parallel() - result = opts.scheduler - expected = opts._default_scheduler + result = opts.get('scheduler') + expected = opts._defaults_dict['scheduler']['default'] self.assertEqual(result, expected) def test_threaded(self): scheduler = 'threaded' opts = Parallel(scheduler=scheduler) - result = opts.scheduler + result = opts.get('scheduler') self.assertEqual(result, scheduler) def test_multiprocessing(self): scheduler = 'multiprocessing' opts = Parallel(scheduler=scheduler) - result = opts.scheduler + result = opts.get('scheduler') self.assertEqual(result, scheduler) def test_async(self): scheduler = 'async' opts = Parallel(scheduler=scheduler) - result = opts.scheduler + result = opts.get('scheduler') self.assertEqual(result, scheduler) def test_distributed(self): scheduler = '192.168.0.128:8786' opts = Parallel(scheduler=scheduler) - result = opts.scheduler + result = opts.get('scheduler') self.assertEqual(result, 'distributed') def test_bad(self): @@ -104,8 +120,8 @@ def test_bad(self): with warnings.catch_warnings(record=True) as w: warnings.simplefilter('always') opts = Parallel(scheduler=scheduler) - result = opts.scheduler - expected = opts._default_scheduler + result = opts.get('scheduler') + expected = opts._defaults_dict['scheduler']['default'] self.assertEqual(result, expected) exp_wmsg = 'Invalid value for scheduler: {!r}' six.assertRegex(self, str(w[0].message), exp_wmsg.format(scheduler)) @@ -116,58 +132,46 @@ class Test_set_dask_scheduler(tests.IrisTest): # inputs. def test_default(self): opts = Parallel() - result = opts.scheduler - expected = opts._default_scheduler - self.assertEqual(result, expected) + result = opts.get('dask_scheduler') + self.assertIs(result, dask.threaded.get) def test_threaded(self): scheduler = 'threaded' opts = Parallel(scheduler=scheduler) - result = opts.scheduler - self.assertEqual(result, scheduler) + result = opts.get('dask_scheduler') + self.assertIs(result, dask.threaded.get) def test_multiprocessing(self): scheduler = 'multiprocessing' opts = Parallel(scheduler=scheduler) - result = opts.scheduler - self.assertEqual(result, scheduler) + result = opts.get('dask_scheduler') + self.assertIs(result, dask.multiprocessing.get) def test_async(self): scheduler = 'async' opts = Parallel(scheduler=scheduler) - result = opts.scheduler - self.assertEqual(result, scheduler) + result = opts.get('dask_scheduler') + self.assertIs(result, dask.async.get_sync) def test_distributed(self): scheduler = '192.168.0.128:8786' - opts = Parallel(scheduler=scheduler) - result = opts.scheduler - self.assertEqual(result, 'distributed') - - def test_bad(self): - scheduler = 'wibble' - with warnings.catch_warnings(record=True) as w: - warnings.simplefilter('always') + with mock.patch('distributed.Client.get') as mock_get: opts = Parallel(scheduler=scheduler) - result = opts.scheduler - expected = opts._default_scheduler - self.assertEqual(result, expected) - exp_wmsg = 'Invalid value for scheduler: {!r}' - six.assertRegex(self, str(w[0].message), exp_wmsg.format(scheduler)) + mock_get.assert_called_once_with(scheduler) class Test_set_num_workers(tests.IrisTest): # Check that the correct `num_workers` are chosen given the inputs. def test_default(self): opts = Parallel() - result = opts.num_workers - expected = opts._default_num_workers + result = opts.get('num_workers') + expected = opts._defaults_dict['num_workers']['default'] self.assertEqual(result, expected) def test_basic(self): n_workers = 5 opts = Parallel(num_workers=n_workers) - result = opts.num_workers + result = opts.get('num_workers') self.assertEqual(result, n_workers) def test_too_many_workers(self): @@ -177,7 +181,7 @@ def test_too_many_workers(self): with warnings.catch_warnings(record=True) as w: warnings.simplefilter('always') opts = Parallel(num_workers=n_workers) - result = opts.num_workers + result = opts.get('num_workers') self.assertEqual(result, max_cpus-1) exp_wmsg = ('Requested more CPUs ({}) than total available ({}). ' 'Limiting number of used CPUs to {}.') @@ -195,7 +199,8 @@ def test_async(self): with warnings.catch_warnings(record=True) as w: warnings.simplefilter('always') opts = Parallel(scheduler=scheduler, num_workers=5) - self.assertIsNone(opts.num_workers) + expected = opts._defaults_dict['num_workers']['default'] + self.assertEqual(opts.get('num_workers'), expected) exp_wmsg = 'Cannot set `num_workers` for the serial scheduler {!r}' six.assertRegex(self, str(w[0].message), exp_wmsg.format(scheduler)) @@ -204,7 +209,8 @@ def test_distributed(self): with warnings.catch_warnings(record=True) as w: warnings.simplefilter('always') opts = Parallel(scheduler=scheduler, num_workers=5) - self.assertIsNone(opts.num_workers) + expected = opts._defaults_dict['num_workers']['default'] + self.assertEqual(opts.get('num_workers'), expected) exp_wmsg = 'Attempting to set `num_workers` with the {!r} scheduler' six.assertRegex(self, str(w[0].message), exp_wmsg.format('distributed')) diff --git a/minimal-conda-requirements.txt b/minimal-conda-requirements.txt index a87c787ec9..a9d6313630 100644 --- a/minimal-conda-requirements.txt +++ b/minimal-conda-requirements.txt @@ -11,6 +11,7 @@ pyke udunits2 cf_units dask +distributed # Iris build dependencies setuptools From d685203b48d169ee23624edee263c2f30b388f4f Mon Sep 17 00:00:00 2001 From: Peter Killick Date: Mon, 10 Apr 2017 11:54:23 +0100 Subject: [PATCH 5/7] Move to config.py --- lib/iris/config.py | 271 ++++++++++++++++- lib/iris/options.py | 277 ------------------ .../unit/{options => config}/__init__.py | 2 +- .../unit/{options => config}/test_Parallel.py | 4 +- 4 files changed, 272 insertions(+), 282 deletions(-) delete mode 100644 lib/iris/options.py rename lib/iris/tests/unit/{options => config}/__init__.py (94%) rename lib/iris/tests/unit/{options => config}/test_Parallel.py (98%) diff --git a/lib/iris/config.py b/lib/iris/config.py index e14ac058a4..486ff35ec7 100644 --- a/lib/iris/config.py +++ b/lib/iris/config.py @@ -1,4 +1,4 @@ -# (C) British Crown Copyright 2010 - 2016, Met Office +# (C) British Crown Copyright 2010 - 2017, Met Office # # This file is part of Iris. # @@ -66,11 +66,25 @@ from __future__ import (absolute_import, division, print_function) from six.moves import (filter, input, map, range, zip) # noqa - +import six from six.moves import configparser + +import contextlib +from multiprocessing import cpu_count +from multiprocessing.pool import ThreadPool +import re import os.path import warnings +import dask +import dask.multiprocessing + +DISTRIBUTED_AVAILABLE = True +try: + import distributed +except ImportError: + DISTRIBUTED_AVAILABLE = False + # Returns simple string options def get_option(section, option, default=None): @@ -154,3 +168,256 @@ def get_dir_option(section, option, default=None): IMPORT_LOGGER = get_option(_LOGGING_SECTION, 'import_logger') + + +################# +# Runtime options + +class Option(object): + """ + An abstract superclass to enforce certain key behaviours for all `Option` + classes. + + """ + @property + def _defaults_dict(self): + raise NotImplementedError + + def __setattr__(self, name, value): + if name not in self.__dict__: + # Can't add new names. + msg = "'Option' object has no attribute {!r}".format(name) + raise AttributeError(msg) + if value is None: + # Set an explicitly unset value to the default value for the name. + value = self._defaults_dict[name]['default'] + if self._defaults_dict[name]['options'] is not None: + # Replace a bad value with the default if there is a defined set of + # specified good values. + if value not in self._defaults_dict[name]['options']: + good_value = self._defaults_dict[name]['default'] + wmsg = ('Attempting to set bad value {!r} for attribute {!r}. ' + 'Defaulting to {!r}.') + warnings.warn(wmsg.format(value, name, good_value)) + value = good_value + self.__dict__[name] = value + + def context(self): + raise NotImplementedError + + +class Parallel(Option): + """ + Control dask parallel processing options for Iris. + + """ + def __init__(self, scheduler=None, num_workers=None): + """ + Set up options for dask parallel processing. + + Currently accepted kwargs: + + * scheduler: + The scheduler used to run a dask graph. Must be set to one of: + + * 'threaded': (default) + The scheduler processes the graph in parallel using a + thread pool. Good for processing dask arrays and dataframes. + * 'multiprocessing': + The scheduler processes the graph in parallel using a + process pool. Good for processing dask bags. + * 'async': + The scheduler runs synchronously (not in parallel). Good for + debugging. + * The IP address and port of a distributed scheduler: + Specifies the location of a distributed scheduler that has + already been set up. The distributed scheduler will process the + graph. + + For more information see + http://dask.pydata.org/en/latest/scheduler-overview.html. + + * num_workers: + The number of worker threads or processess to use to run the dask + graph in parallel. Defaults to 1 (that is, processed serially). + + .. note:: + The value for `num_workers` cannot be set to greater than the + number of CPUs available on the host system. If such a value is + requested, `num_workers` is automatically set to 1 less than + the number of CPUs available on the host system. + + .. note:: + Only the 'threaded' and 'multiprocessing' schedulers support + the `num_workers` kwarg. If it is specified with the `async` or + `distributed` scheduler, the kwarg is ignored: + + * The 'async' scheduler runs serially so will only use a single + worker. + * The number of workers for the 'distributed' scheduler must be + defined when setting up the distributed scheduler. For more + information on setting up distributed schedulers, see + https://distributed.readthedocs.io/en/latest/index.html. + + Example usages: + + * Specify that we want to load a cube with dask parallel processing + using multiprocessing with six worker processes:: + + iris.options.parallel(scheduler='multiprocessing', num_workers=6) + iris.load('my_dataset.nc') + + * Specify, with a context manager, that we want to load a cube with + dask parallel processing using four worker threads:: + + with iris.options.parallel(scheduler='threaded', num_workers=4): + iris.load('my_dataset.nc') + + * Run dask parallel processing using a distributed scheduler that has + been set up at the IP address and port at ``192.168.0.219:8786``:: + + iris.options.parallel(scheduler='192.168.0.219:8786') + + """ + # Set `__dict__` keys first. + self.__dict__['_scheduler'] = scheduler + self.__dict__['scheduler'] = None + self.__dict__['num_workers'] = None + self.__dict__['dask_scheduler'] = None + + # Set `__dict__` values for each kwarg. + setattr(self, 'scheduler', scheduler) + setattr(self, 'num_workers', num_workers) + setattr(self, 'dask_scheduler', self.get('scheduler')) + + # Activate the specified dask options. + self._set_dask_options() + + def __setattr__(self, name, value): + if value is None: + value = self._defaults_dict[name]['default'] + attr_setter = getattr(self, 'set_{}'.format(name)) + value = attr_setter(value) + super(Parallel, self).__setattr__(name, value) + + @property + def _defaults_dict(self): + """ + Define the default value and available options for each settable + `kwarg` of this `Option`. + + Note: `'options'` can be set to `None` if it is not reasonable to + specify all possible options. For example, this may be reasonable if + the `'options'` were a range of numbers. + + """ + return {'_scheduler': {'default': None, 'options': None}, + 'scheduler': {'default': 'threaded', + 'options': ['threaded', + 'multiprocessing', + 'async', + 'distributed']}, + 'num_workers': {'default': 1, 'options': None}, + 'dask_scheduler': {'default': None, 'options': None}, + } + + def set__scheduler(self, value): + return value + + def set_scheduler(self, value): + default = self._defaults_dict['scheduler']['default'] + if value is None: + value = default + elif re.match(r'^(\d{1,3}\.){3}\d{1,3}:\d{1,5}$', value): + if DISTRIBUTED_AVAILABLE: + value = 'distributed' + else: + # Distributed not available. + wmsg = 'Cannot import distributed. Defaulting to {}.' + warnings.warn(wmsg.format(default)) + self.set_scheduler(default) + elif value not in self._defaults_dict['scheduler']['options']: + # Invalid value for `scheduler`. + wmsg = 'Invalid value for scheduler: {!r}. Defaulting to {}.' + warnings.warn(wmsg.format(value, default)) + self.set_scheduler(default) + return value + + def set_num_workers(self, value): + default = self._defaults_dict['num_workers']['default'] + scheduler = self.get('scheduler') + if scheduler == 'async' and value != default: + wmsg = 'Cannot set `num_workers` for the serial scheduler {!r}.' + warnings.warn(wmsg.format(scheduler)) + value = None + elif scheduler == 'distributed' and value != default: + wmsg = ('Attempting to set `num_workers` with the {!r} scheduler ' + 'requested. Please instead specify number of workers when ' + 'setting up the distributed scheduler. See ' + 'https://distributed.readthedocs.io/en/latest/index.html ' + 'for more details.') + warnings.warn(wmsg.format(scheduler)) + value = None + else: + if value is None: + value = default + if value >= cpu_count(): + # Limit maximum CPUs used to 1 fewer than all available CPUs. + wmsg = ('Requested more CPUs ({}) than total available ({}). ' + 'Limiting number of used CPUs to {}.') + warnings.warn(wmsg.format(value, cpu_count(), cpu_count()-1)) + value = cpu_count() - 1 + return value + + def set_dask_scheduler(self, scheduler): + if scheduler == 'threaded': + value = dask.threaded.get + elif scheduler == 'multiprocessing': + value = dask.multiprocessing.get + elif scheduler == 'async': + value = dask.async.get_sync + elif scheduler == 'distributed': + value = self.get('_scheduler') + return value + + def _set_dask_options(self): + """ + Use `dask.set_options` to globally apply the options specified at + instantiation, either for the lifetime of the session or + context manager. + + """ + scheduler = self.get('scheduler') + num_workers = self.get('num_workers') + get = self.get('dask_scheduler') + pool = None + + if scheduler in ['threaded', 'multiprocessing']: + pool = ThreadPool(num_workers) + if scheduler == 'distributed': + get = distributed.Client(get).get + + dask.set_options(get=get, pool=pool) + + def get(self, item): + return getattr(self, item) + + @contextlib.contextmanager + def context(self, **kwargs): + # Snapshot the starting state for restoration at the end of the + # contextmanager block. + starting_state = self.__dict__.copy() + # Update the state to reflect the requested changes. + for name, value in six.iteritems(kwargs): + setattr(self, name, value) + self._set_dask_options() + try: + yield + finally: + # Return the state to the starting state. + self.__dict__.clear() + self.__dict__.update(starting_state) + self._set_dask_options() + + +parallel = Parallel() \ No newline at end of file diff --git a/lib/iris/options.py b/lib/iris/options.py deleted file mode 100644 index 780ad4e92a..0000000000 --- a/lib/iris/options.py +++ /dev/null @@ -1,277 +0,0 @@ -# (C) British Crown Copyright 2017, Met Office -# -# This file is part of Iris. -# -# Iris is free software: you can redistribute it and/or modify it under -# the terms of the GNU Lesser General Public License as published by the -# Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# Iris is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public License -# along with Iris. If not, see . -""" -Control runtime options of Iris. - -""" -from __future__ import (absolute_import, division, print_function) -from six.moves import (filter, input, map, range, zip) # noqa -import six - -import contextlib -from multiprocessing import cpu_count -from multiprocessing.pool import ThreadPool -import re -import warnings - -import dask -import dask.multiprocessing -import distributed - - -class Option(object): - """ - An abstract superclass to enforce certain key behaviours for all `Option` - classes. - - """ - @property - def _defaults_dict(self): - raise NotImplementedError - - def __setattr__(self, name, value): - if name not in self.__dict__: - # Can't add new names. - msg = "'Option' object has no attribute {!r}".format(name) - raise AttributeError(msg) - if value is None: - # Set an explicitly unset value to the default value for the name. - value = self._defaults_dict[name]['default'] - if self._defaults_dict[name]['options'] is not None: - # Replace a bad value with the default if there is a defined set of - # specified good values. - if value not in self._defaults_dict[name]['options']: - good_value = self._defaults_dict[name]['default'] - wmsg = ('Attempting to set bad value {!r} for attribute {!r}. ' - 'Defaulting to {!r}.') - warnings.warn(wmsg.format(value, name, good_value)) - value = good_value - self.__dict__[name] = value - - def context(self): - raise NotImplementedError - - -class Parallel(Option): - """ - Control dask parallel processing options for Iris. - - """ - def __init__(self, scheduler=None, num_workers=None): - """ - Set up options for dask parallel processing. - - Currently accepted kwargs: - - * scheduler: - The scheduler used to run a dask graph. Must be set to one of: - - * 'threaded': (default) - The scheduler processes the graph in parallel using a - thread pool. Good for processing dask arrays and dataframes. - * 'multiprocessing': - The scheduler processes the graph in parallel using a - process pool. Good for processing dask bags. - * 'async': - The scheduler runs synchronously (not in parallel). Good for - debugging. - * The IP address and port of a distributed scheduler: - Specifies the location of a distributed scheduler that has - already been set up. The distributed scheduler will process the - graph. - - For more information see - http://dask.pydata.org/en/latest/scheduler-overview.html. - - * num_workers: - The number of worker threads or processess to use to run the dask - graph in parallel. Defaults to 1 (that is, processed serially). - - .. note:: - The value for `num_workers` cannot be set to greater than the - number of CPUs available on the host system. If such a value is - requested, `num_workers` is automatically set to 1 less than - the number of CPUs available on the host system. - - .. note:: - Only the 'threaded' and 'multiprocessing' schedulers support - the `num_workers` kwarg. If it is specified with the `async` or - `distributed` scheduler, the kwarg is ignored: - - * The 'async' scheduler runs serially so will only use a single - worker. - * The number of workers for the 'distributed' scheduler must be - defined when setting up the distributed scheduler. For more - information on setting up distributed schedulers, see - https://distributed.readthedocs.io/en/latest/index.html. - - Example usages: - - * Specify that we want to load a cube with dask parallel processing - using multiprocessing with six worker processes:: - - iris.options.parallel(scheduler='multiprocessing', num_workers=6) - iris.load('my_dataset.nc') - - * Specify, with a context manager, that we want to load a cube with - dask parallel processing using four worker threads:: - - with iris.options.parallel(scheduler='threaded', num_workers=4): - iris.load('my_dataset.nc') - - * Run dask parallel processing using a distributed scheduler that has - been set up at the IP address and port at ``192.168.0.219:8786``:: - - iris.options.parallel(scheduler='192.168.0.219:8786') - - """ - # Set `__dict__` keys first. - self.__dict__['_scheduler'] = scheduler - self.__dict__['scheduler'] = None - self.__dict__['num_workers'] = None - self.__dict__['dask_scheduler'] = None - - # Set `__dict__` values for each kwarg. - setattr(self, 'scheduler', scheduler) - setattr(self, 'num_workers', num_workers) - setattr(self, 'dask_scheduler', self.get('scheduler')) - - # Activate the specified dask options. - self._set_dask_options() - - def __setattr__(self, name, value): - if value is None: - value = self._defaults_dict[name]['default'] - attr_setter = getattr(self, 'set_{}'.format(name)) - value = attr_setter(value) - super(Parallel, self).__setattr__(name, value) - - @property - def _defaults_dict(self): - """ - Define the default value and available options for each settable - `kwarg` of this `Option`. - - Note: `'options'` can be set to `None` if it is not reasonable to - specify all possible options. For example, this may be reasonable if - the `'options'` were a range of numbers. - - """ - return {'_scheduler': {'default': None, 'options': None}, - 'scheduler': {'default': 'threaded', - 'options': ['threaded', - 'multiprocessing', - 'async', - 'distributed']}, - 'num_workers': {'default': 1, 'options': None}, - 'dask_scheduler': {'default': None, 'options': None}, - } - - def set__scheduler(self, value): - return value - - def set_scheduler(self, value): - default = self._defaults_dict['scheduler']['default'] - if value is None: - value = default - elif re.match(r'^(\d{1,3}\.){3}\d{1,3}:\d{1,5}$', value): - value = 'distributed' - elif value not in self._defaults_dict['scheduler']['options']: - # Invalid value for `scheduler`. - wmsg = 'Invalid value for scheduler: {!r}. Defaulting to {}.' - warnings.warn(wmsg.format(value, default)) - self.set_scheduler(default) - return value - - def set_num_workers(self, value): - default = self._defaults_dict['num_workers']['default'] - scheduler = self.get('scheduler') - if scheduler == 'async' and value != default: - wmsg = 'Cannot set `num_workers` for the serial scheduler {!r}.' - warnings.warn(wmsg.format(scheduler)) - value = None - elif scheduler == 'distributed' and value != default: - wmsg = ('Attempting to set `num_workers` with the {!r} scheduler ' - 'requested. Please instead specify number of workers when ' - 'setting up the distributed scheduler. See ' - 'https://distributed.readthedocs.io/en/latest/index.html ' - 'for more details.') - warnings.warn(wmsg.format(scheduler)) - value = None - else: - if value is None: - value = default - if value >= cpu_count(): - # Limit maximum CPUs used to 1 fewer than all available CPUs. - wmsg = ('Requested more CPUs ({}) than total available ({}). ' - 'Limiting number of used CPUs to {}.') - warnings.warn(wmsg.format(value, cpu_count(), cpu_count()-1)) - value = cpu_count() - 1 - return value - - def set_dask_scheduler(self, scheduler): - if scheduler == 'threaded': - value = dask.threaded.get - elif scheduler == 'multiprocessing': - value = dask.multiprocessing.get - elif scheduler == 'async': - value = dask.async.get_sync - elif scheduler == 'distributed': - value = self.get('_scheduler') - return value - - def _set_dask_options(self): - """ - Use `dask.set_options` to globally apply the options specified at - instantiation, either for the lifetime of the session or - context manager. - - """ - scheduler = self.get('scheduler') - num_workers = self.get('num_workers') - get = self.get('dask_scheduler') - pool = None - - if scheduler in ['threaded', 'multiprocessing']: - pool = ThreadPool(num_workers) - if scheduler == 'distributed': - get = distributed.Client(get).get - - dask.set_options(get=get, pool=pool) - - def get(self, item): - return getattr(self, item) - - @contextlib.contextmanager - def context(self, **kwargs): - # Snapshot the starting state for restoration at the end of the - # contextmanager block. - starting_state = self.__dict__.copy() - # Update the state to reflect the requested changes. - for name, value in six.iteritems(kwargs): - setattr(self, name, value) - self._set_dask_options() - try: - yield - finally: - # Return the state to the starting state. - self.__dict__.clear() - self.__dict__.update(starting_state) - self._set_dask_options() - - -parallel = Parallel() diff --git a/lib/iris/tests/unit/options/__init__.py b/lib/iris/tests/unit/config/__init__.py similarity index 94% rename from lib/iris/tests/unit/options/__init__.py rename to lib/iris/tests/unit/config/__init__.py index 93397c38fa..dd625e1e91 100644 --- a/lib/iris/tests/unit/options/__init__.py +++ b/lib/iris/tests/unit/config/__init__.py @@ -14,7 +14,7 @@ # # You should have received a copy of the GNU Lesser General Public License # along with Iris. If not, see . -"""Unit tests for the :mod:`iris.options` module.""" +"""Unit tests for the :mod:`iris.config` module.""" from __future__ import (absolute_import, division, print_function) from six.moves import (filter, input, map, range, zip) # noqa diff --git a/lib/iris/tests/unit/options/test_Parallel.py b/lib/iris/tests/unit/config/test_Parallel.py similarity index 98% rename from lib/iris/tests/unit/options/test_Parallel.py rename to lib/iris/tests/unit/config/test_Parallel.py index 8b63995e6f..5193557a79 100644 --- a/lib/iris/tests/unit/options/test_Parallel.py +++ b/lib/iris/tests/unit/config/test_Parallel.py @@ -14,7 +14,7 @@ # # You should have received a copy of the GNU Lesser General Public License # along with Iris. If not, see . -"""Unit tests for the `iris.options.Paralle` class.""" +"""Unit tests for the :class:`iris.config.Parallel` class.""" from __future__ import (absolute_import, division, print_function) from six.moves import (filter, input, map, range, zip) # noqa @@ -30,7 +30,7 @@ import dask import distributed -from iris.options import Parallel +from iris.config import Parallel from iris.tests import mock From 60aebe8e96e84068cb757680bdf6f4166e712e17 Mon Sep 17 00:00:00 2001 From: Peter Killick Date: Wed, 12 Apr 2017 16:36:50 +0100 Subject: [PATCH 6/7] Add tests and repr --- conda-requirements.txt | 1 - lib/iris/config.py | 15 +- lib/iris/tests/unit/config/test_Parallel.py | 207 +++++++++++++------- minimal-conda-requirements.txt | 1 - 4 files changed, 150 insertions(+), 74 deletions(-) diff --git a/conda-requirements.txt b/conda-requirements.txt index 06e957b608..cd89693e57 100644 --- a/conda-requirements.txt +++ b/conda-requirements.txt @@ -11,7 +11,6 @@ pyke udunits2 cf_units dask -distributed # Iris build dependencies setuptools diff --git a/lib/iris/config.py b/lib/iris/config.py index 486ff35ec7..c4ab53da1e 100644 --- a/lib/iris/config.py +++ b/lib/iris/config.py @@ -293,6 +293,16 @@ def __init__(self, scheduler=None, num_workers=None): # Activate the specified dask options. self._set_dask_options() + def __repr__(self): + msg = 'Dask parallel options: {}.' + + # Automatically populate with all currently accepted kwargs. + options = ['{}={}'.format(k, v) + for k, v in six.iteritems(self.__dict__) + if not k.startswith('_')] + joined = ', '.join(options) + return msg.format(joined) + def __setattr__(self, name, value): if value is None: value = self._defaults_dict[name]['default'] @@ -388,11 +398,11 @@ def _set_dask_options(self): """ scheduler = self.get('scheduler') - num_workers = self.get('num_workers') get = self.get('dask_scheduler') pool = None if scheduler in ['threaded', 'multiprocessing']: + num_workers = self.get('num_workers') pool = ThreadPool(num_workers) if scheduler == 'distributed': get = distributed.Client(get).get @@ -410,6 +420,7 @@ def context(self, **kwargs): # Update the state to reflect the requested changes. for name, value in six.iteritems(kwargs): setattr(self, name, value) + setattr(self, 'dask_scheduler', self.get('scheduler')) self._set_dask_options() try: yield @@ -420,4 +431,4 @@ def context(self, **kwargs): self._set_dask_options() -parallel = Parallel() \ No newline at end of file +parallel = Parallel() diff --git a/lib/iris/tests/unit/config/test_Parallel.py b/lib/iris/tests/unit/config/test_Parallel.py index 5193557a79..1f3c33de06 100644 --- a/lib/iris/tests/unit/config/test_Parallel.py +++ b/lib/iris/tests/unit/config/test_Parallel.py @@ -24,11 +24,9 @@ # importing anything else. import iris.tests as tests -import multiprocessing import warnings import dask -import distributed from iris.config import Parallel from iris.tests import mock @@ -44,47 +42,116 @@ def test_bad_name__contextmgr(self): class Test__set_dask_options(tests.IrisTest): - # Check the correct dask options are set given the inputs. - # NOTE: tests that check the correct dask options are set - # (will require mock :scream:). - # def setUp(self): - # patcher = mock.patch('dask.set_options') - # self.addCleanup(patcher.stop) - # self.mock_dask_opts = patcher.start() - def setUp(self): - self.mock_dask = dask - self.mock_dask.threaded.get = mock.MagicMock() - self.mock_dask.set_options = mock.MagicMock() - self.mock_mul = multiprocessing - self.mock_mul.pool.ThreadPool = mock.MagicMock() + ThreadPool = 'iris.config.ThreadPool' + self.pool = mock.sentinel.pool + self.patch_ThreadPool = self.patch(ThreadPool, return_value=self.pool) + self.default_num_workers = 1 + + Client = 'distributed.Client' + self.address = '192.168.0.128:8786' + mocker = mock.Mock(get=self.address) + self.patch_Client = self.patch(Client, return_value=mocker) + + set_options = 'dask.set_options' + self.patch_set_options = self.patch(set_options) def test_default(self): - pass + Parallel() + self.assertEqual(self.patch_Client.call_count, 0) + self.patch_ThreadPool.assert_called_once_with(self.default_num_workers) + + pool = self.pool + get = dask.threaded.get + self.patch_set_options.assert_called_once_with(pool=pool, get=get) + + def test__five_workers(self): + n_workers = 5 + Parallel(num_workers=n_workers) + self.assertEqual(self.patch_Client.call_count, 0) + self.patch_ThreadPool.assert_called_once_with(n_workers) + + pool = self.pool + get = dask.threaded.get + self.patch_set_options.assert_called_once_with(pool=pool, get=get) + + def test__five_workers__contextmgr(self): + n_workers = 5 + options = Parallel() + pool = self.pool + get = dask.threaded.get + + with options.context(num_workers=n_workers): + self.assertEqual(self.patch_Client.call_count, 0) + self.patch_ThreadPool.assert_called_with(n_workers) + + self.patch_set_options.assert_called_with(pool=pool, get=get) + + self.patch_ThreadPool.assert_called_with(self.default_num_workers) + self.patch_set_options.assert_called_with(pool=pool, get=get) def test_threaded(self): scheduler = 'threaded' Parallel(scheduler=scheduler) - # self.mock_mul.pool.ThreadPool.assert_called_once_with(1) - self.mock_dask.set_options.assert_called_once_with(get=self.mock_dask.threaded.get, - pool=self.mock_mul.pool.ThreadPool) + self.assertEqual(self.patch_Client.call_count, 0) + self.patch_ThreadPool.assert_called_once_with(self.default_num_workers) - def test_threaded_num_workers(self): - pass + pool = self.pool + get = dask.threaded.get + self.patch_set_options.assert_called_once_with(pool=pool, get=get) - # def test_async(self): - # scheduler = 'async' - # # dask_options = mock.Mock(spec=set_options) - # # dask_scheduler = mock.Mock(spec=async.get_sync) - # with mock.patch('dask.set_options') as mock_dask_opts: - # Parallel(scheduler=scheduler) - # # dask_options.assert_called_once_with(get=dask_scheduler) - # mock_dask_opts.assert_any_call() + def test_multiprocessing(self): + scheduler = 'multiprocessing' + Parallel(scheduler=scheduler) + self.assertEqual(self.patch_Client.call_count, 0) + self.patch_ThreadPool.assert_called_once_with(self.default_num_workers) + + pool = self.pool + get = dask.multiprocessing.get + self.patch_set_options.assert_called_once_with(pool=pool, get=get) + + def test_multiprocessing__contextmgr(self): + scheduler = 'multiprocessing' + options = Parallel() + with options.context(scheduler=scheduler): + self.assertEqual(self.patch_Client.call_count, 0) + self.patch_ThreadPool.assert_called_with(self.default_num_workers) + + pool = self.pool + get = dask.multiprocessing.get + self.patch_set_options.assert_called_with(pool=pool, get=get) + + default_get = dask.threaded.get + self.patch_ThreadPool.assert_called_with(self.default_num_workers) + self.patch_set_options.assert_called_with(pool=pool, + get=default_get) + + def test_async(self): + scheduler = 'async' + Parallel(scheduler=scheduler) + self.assertEqual(self.patch_Client.call_count, 0) + self.assertEqual(self.patch_ThreadPool.call_count, 0) + + pool = self.pool + get = dask.async.get_sync + self.patch_set_options.assert_called_once_with(pool=None, get=get) + + def test_distributed(self): + scheduler = self.address + Parallel(scheduler=scheduler) + self.assertEqual(self.patch_ThreadPool.call_count, 0) + + get = scheduler + self.patch_Client.assert_called_once_with(get) + + self.patch_set_options.assert_called_once_with(pool=None, get=get) class Test_set_schedulers(tests.IrisTest): - # Check that the correct scheduler and dask scheduler are chosen given the - # inputs. + # Check that the correct scheduler is chosen given the inputs. + def setUp(self): + self.patch('iris.config.Parallel._set_dask_options') + def test_default(self): opts = Parallel() result = opts.get('scheduler') @@ -127,41 +194,11 @@ def test_bad(self): six.assertRegex(self, str(w[0].message), exp_wmsg.format(scheduler)) -class Test_set_dask_scheduler(tests.IrisTest): - # Check that the correct scheduler and dask scheduler are chosen given the - # inputs. - def test_default(self): - opts = Parallel() - result = opts.get('dask_scheduler') - self.assertIs(result, dask.threaded.get) - - def test_threaded(self): - scheduler = 'threaded' - opts = Parallel(scheduler=scheduler) - result = opts.get('dask_scheduler') - self.assertIs(result, dask.threaded.get) - - def test_multiprocessing(self): - scheduler = 'multiprocessing' - opts = Parallel(scheduler=scheduler) - result = opts.get('dask_scheduler') - self.assertIs(result, dask.multiprocessing.get) - - def test_async(self): - scheduler = 'async' - opts = Parallel(scheduler=scheduler) - result = opts.get('dask_scheduler') - self.assertIs(result, dask.async.get_sync) - - def test_distributed(self): - scheduler = '192.168.0.128:8786' - with mock.patch('distributed.Client.get') as mock_get: - opts = Parallel(scheduler=scheduler) - mock_get.assert_called_once_with(scheduler) - - class Test_set_num_workers(tests.IrisTest): # Check that the correct `num_workers` are chosen given the inputs. + def setUp(self): + self.patch('iris.config.Parallel._set_dask_options') + def test_default(self): opts = Parallel() result = opts.get('num_workers') @@ -188,12 +225,6 @@ def test_too_many_workers(self): self.assertEqual(str(w[0].message), exp_wmsg.format(n_workers, max_cpus, max_cpus-1)) - def test_negative_workers(self): - n_workers = -2 - exp_emsg = "Number of processes must be at least 1" - with self.assertRaisesRegexp(ValueError, exp_emsg): - Parallel(num_workers=n_workers) - def test_async(self): scheduler = 'async' with warnings.catch_warnings(record=True) as w: @@ -216,5 +247,41 @@ def test_distributed(self): exp_wmsg.format('distributed')) +class Test_set_dask_scheduler(tests.IrisTest): + # Check that the correct dask scheduler is chosen given the inputs. + def setUp(self): + self.patch('iris.config.Parallel._set_dask_options') + + def test_default(self): + opts = Parallel() + result = opts.get('dask_scheduler') + expected = dask.threaded.get + self.assertIs(result, expected) + + def test_threaded(self): + opts = Parallel(scheduler='threaded') + result = opts.get('dask_scheduler') + expected = dask.threaded.get + self.assertIs(result, expected) + + def test_multiprocessing(self): + opts = Parallel(scheduler='multiprocessing') + result = opts.get('dask_scheduler') + expected = dask.multiprocessing.get + self.assertIs(result, expected) + + def test_async(self): + opts = Parallel(scheduler='async') + result = opts.get('dask_scheduler') + expected = dask.async.get_sync + self.assertIs(result, expected) + + def test_distributed(self): + scheduler = '192.168.0.128:8786' + opts = Parallel(scheduler=scheduler) + result = opts.get('dask_scheduler') + self.assertEqual(result, scheduler) + + if __name__ == '__main__': tests.main() diff --git a/minimal-conda-requirements.txt b/minimal-conda-requirements.txt index a9d6313630..a87c787ec9 100644 --- a/minimal-conda-requirements.txt +++ b/minimal-conda-requirements.txt @@ -11,7 +11,6 @@ pyke udunits2 cf_units dask -distributed # Iris build dependencies setuptools From 8d05008e2a76cef334e1fdfcd8a820e7470a75d7 Mon Sep 17 00:00:00 2001 From: Peter Killick Date: Thu, 20 Apr 2017 10:58:52 +0100 Subject: [PATCH 7/7] Most review comments actioned --- lib/iris/__init__.py | 1 - lib/iris/_lazy_data.py | 2 +- lib/iris/config.py | 37 +++++++++++---------- lib/iris/tests/unit/config/test_Parallel.py | 19 ++++++++--- 4 files changed, 35 insertions(+), 24 deletions(-) diff --git a/lib/iris/__init__.py b/lib/iris/__init__.py index 1b1b91dafc..3019a0adcb 100644 --- a/lib/iris/__init__.py +++ b/lib/iris/__init__.py @@ -113,7 +113,6 @@ def callback(cube, field, filename): from iris._deprecation import IrisDeprecation, warn_deprecated import iris.fileformats import iris.io -import iris.options try: diff --git a/lib/iris/_lazy_data.py b/lib/iris/_lazy_data.py index 83c41a7d8b..d2cf9b2ac9 100644 --- a/lib/iris/_lazy_data.py +++ b/lib/iris/_lazy_data.py @@ -97,7 +97,7 @@ def as_concrete_data(data, **kwargs): .. note:: Specific dask options for computation are controlled by - :class:`iris.options.Parallel`. + :class:`iris.config.Parallel`. """ if is_lazy_data(data): diff --git a/lib/iris/config.py b/lib/iris/config.py index c4ab53da1e..76cbf9410d 100644 --- a/lib/iris/config.py +++ b/lib/iris/config.py @@ -72,18 +72,17 @@ import contextlib from multiprocessing import cpu_count from multiprocessing.pool import ThreadPool -import re import os.path +import re import warnings import dask import dask.multiprocessing -DISTRIBUTED_AVAILABLE = True try: import distributed except ImportError: - DISTRIBUTED_AVAILABLE = False + distributed = None # Returns simple string options @@ -184,10 +183,6 @@ def _defaults_dict(self): raise NotImplementedError def __setattr__(self, name, value): - if name not in self.__dict__: - # Can't add new names. - msg = "'Option' object has no attribute {!r}".format(name) - raise AttributeError(msg) if value is None: # Set an explicitly unset value to the default value for the name. value = self._defaults_dict[name]['default'] @@ -264,19 +259,19 @@ def __init__(self, scheduler=None, num_workers=None): * Specify that we want to load a cube with dask parallel processing using multiprocessing with six worker processes:: - iris.options.parallel(scheduler='multiprocessing', num_workers=6) + iris.config.parallel(scheduler='multiprocessing', num_workers=6) iris.load('my_dataset.nc') * Specify, with a context manager, that we want to load a cube with dask parallel processing using four worker threads:: - with iris.options.parallel(scheduler='threaded', num_workers=4): + with iris.config.parallel(scheduler='threaded', num_workers=4): iris.load('my_dataset.nc') * Run dask parallel processing using a distributed scheduler that has been set up at the IP address and port at ``192.168.0.219:8786``:: - iris.options.parallel(scheduler='192.168.0.219:8786') + iris.config.parallel(scheduler='192.168.0.219:8786') """ # Set `__dict__` keys first. @@ -304,9 +299,13 @@ def __repr__(self): return msg.format(joined) def __setattr__(self, name, value): + if name not in self.__dict__: + # Can't add new names. + msg = "{!r} object has no attribute {!r}" + raise AttributeError(msg.format(self.__class__.__name__, name)) if value is None: value = self._defaults_dict[name]['default'] - attr_setter = getattr(self, 'set_{}'.format(name)) + attr_setter = self._defaults_dict[name]['setter'] value = attr_setter(value) super(Parallel, self).__setattr__(name, value) @@ -321,14 +320,18 @@ def _defaults_dict(self): the `'options'` were a range of numbers. """ - return {'_scheduler': {'default': None, 'options': None}, + return {'_scheduler': {'default': None, 'options': None, + 'setter': self.set__scheduler}, 'scheduler': {'default': 'threaded', 'options': ['threaded', 'multiprocessing', 'async', - 'distributed']}, - 'num_workers': {'default': 1, 'options': None}, - 'dask_scheduler': {'default': None, 'options': None}, + 'distributed'], + 'setter': self.set_scheduler}, + 'num_workers': {'default': 1, 'options': None, + 'setter': self.set_num_workers}, + 'dask_scheduler': {'default': None, 'options': None, + 'setter': self.set_dask_scheduler}, } def set__scheduler(self, value): @@ -339,7 +342,7 @@ def set_scheduler(self, value): if value is None: value = default elif re.match(r'^(\d{1,3}\.){3}\d{1,3}:\d{1,5}$', value): - if DISTRIBUTED_AVAILABLE: + if distributed is not None: value = 'distributed' else: # Distributed not available. @@ -404,7 +407,7 @@ def _set_dask_options(self): if scheduler in ['threaded', 'multiprocessing']: num_workers = self.get('num_workers') pool = ThreadPool(num_workers) - if scheduler == 'distributed': + elif scheduler == 'distributed': get = distributed.Client(get).get dask.set_options(get=get, pool=pool) diff --git a/lib/iris/tests/unit/config/test_Parallel.py b/lib/iris/tests/unit/config/test_Parallel.py index 1f3c33de06..5dc77993b0 100644 --- a/lib/iris/tests/unit/config/test_Parallel.py +++ b/lib/iris/tests/unit/config/test_Parallel.py @@ -32,13 +32,22 @@ from iris.tests import mock -class Test_operation(tests.IrisTest): - # Check that the options are passed through to 'real' code. - # NOTE: tests that call the option class directly and as a contextmgr. +class Test__operation(tests.IrisTest): + def setUp(self): + self.parallel = Parallel() + + def test_bad_name(self): + # Check we can't do `iris.config.parallel.foo = 'bar`. + exp_emsg = "'Parallel' object has no attribute 'foo'" + with self.assertRaisesRegexp(AttributeError, exp_emsg): + self.parallel.foo = 'bar' def test_bad_name__contextmgr(self): - # Check we can't do `with iris.options.parallel.context('foo'='bar')`. - pass + # Check we can't do `with iris.config.parallel.context('foo'='bar')`. + exp_emsg = "'Parallel' object has no attribute 'foo'" + with self.assertRaisesRegexp(AttributeError, exp_emsg): + with self.parallel.context(foo='bar'): + pass class Test__set_dask_options(tests.IrisTest):