Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions lib/iris/_lazy_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ def as_concrete_data(data, **kwargs):
Returns:
A NumPy `ndarray` or masked array.

.. note::
Specific dask options for computation are controlled by
:class:`iris.config.Parallel`.

"""
if is_lazy_data(data):
# Realise dask array, ensuring the data result is always a NumPy array.
Expand Down
285 changes: 283 additions & 2 deletions lib/iris/config.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# (C) British Crown Copyright 2010 - 2016, Met Office
# (C) British Crown Copyright 2010 - 2017, Met Office
#
# This file is part of Iris.
#
Expand Down Expand Up @@ -66,11 +66,24 @@

from __future__ import (absolute_import, division, print_function)
from six.moves import (filter, input, map, range, zip) # noqa

import six
from six.moves import configparser

import contextlib
from multiprocessing import cpu_count
from multiprocessing.pool import ThreadPool
import os.path
import re
import warnings

import dask
import dask.multiprocessing

try:
import distributed
except ImportError:
distributed = None


# Returns simple string options
def get_option(section, option, default=None):
Expand Down Expand Up @@ -154,3 +167,271 @@ def get_dir_option(section, option, default=None):


IMPORT_LOGGER = get_option(_LOGGING_SECTION, 'import_logger')


#################
# Runtime options

class Option(object):
"""
An abstract superclass to enforce certain key behaviours for all `Option`
classes.

"""
@property
def _defaults_dict(self):
raise NotImplementedError

def __setattr__(self, name, value):
if value is None:
# Set an explicitly unset value to the default value for the name.
value = self._defaults_dict[name]['default']
if self._defaults_dict[name]['options'] is not None:
# Replace a bad value with the default if there is a defined set of
# specified good values.
if value not in self._defaults_dict[name]['options']:
good_value = self._defaults_dict[name]['default']
wmsg = ('Attempting to set bad value {!r} for attribute {!r}. '
'Defaulting to {!r}.')
warnings.warn(wmsg.format(value, name, good_value))
value = good_value
self.__dict__[name] = value

def context(self):
raise NotImplementedError


class Parallel(Option):
"""
Control dask parallel processing options for Iris.

"""
def __init__(self, scheduler=None, num_workers=None):
"""
Set up options for dask parallel processing.

Currently accepted kwargs:

* scheduler:
The scheduler used to run a dask graph. Must be set to one of:

* 'threaded': (default)
The scheduler processes the graph in parallel using a
thread pool. Good for processing dask arrays and dataframes.
* 'multiprocessing':
The scheduler processes the graph in parallel using a
process pool. Good for processing dask bags.
* 'async':
The scheduler runs synchronously (not in parallel). Good for
debugging.
* The IP address and port of a distributed scheduler:
Specifies the location of a distributed scheduler that has
already been set up. The distributed scheduler will process the
graph.

For more information see
http://dask.pydata.org/en/latest/scheduler-overview.html.

* num_workers:
The number of worker threads or processess to use to run the dask
graph in parallel. Defaults to 1 (that is, processed serially).

.. note::
The value for `num_workers` cannot be set to greater than the
number of CPUs available on the host system. If such a value is
requested, `num_workers` is automatically set to 1 less than
the number of CPUs available on the host system.

.. note::
Only the 'threaded' and 'multiprocessing' schedulers support
the `num_workers` kwarg. If it is specified with the `async` or
`distributed` scheduler, the kwarg is ignored:

* The 'async' scheduler runs serially so will only use a single
worker.
* The number of workers for the 'distributed' scheduler must be
defined when setting up the distributed scheduler. For more
information on setting up distributed schedulers, see
https://distributed.readthedocs.io/en/latest/index.html.

Example usages:

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dkillick I'm really glad you put some example usages in here. And I like the amount of detail you have put into the descriptions of the args.

* Specify that we want to load a cube with dask parallel processing
using multiprocessing with six worker processes::

iris.config.parallel(scheduler='multiprocessing', num_workers=6)
iris.load('my_dataset.nc')

* Specify, with a context manager, that we want to load a cube with
dask parallel processing using four worker threads::

with iris.config.parallel(scheduler='threaded', num_workers=4):
iris.load('my_dataset.nc')

* Run dask parallel processing using a distributed scheduler that has
been set up at the IP address and port at ``192.168.0.219:8786``::

iris.config.parallel(scheduler='192.168.0.219:8786')

"""
# Set `__dict__` keys first.
self.__dict__['_scheduler'] = scheduler
self.__dict__['scheduler'] = None
self.__dict__['num_workers'] = None
self.__dict__['dask_scheduler'] = None

# Set `__dict__` values for each kwarg.
setattr(self, 'scheduler', scheduler)
setattr(self, 'num_workers', num_workers)
setattr(self, 'dask_scheduler', self.get('scheduler'))

# Activate the specified dask options.
self._set_dask_options()

def __repr__(self):
msg = 'Dask parallel options: {}.'

# Automatically populate with all currently accepted kwargs.
options = ['{}={}'.format(k, v)
for k, v in six.iteritems(self.__dict__)
if not k.startswith('_')]
joined = ', '.join(options)
return msg.format(joined)

def __setattr__(self, name, value):
if name not in self.__dict__:
# Can't add new names.
msg = "{!r} object has no attribute {!r}"
raise AttributeError(msg.format(self.__class__.__name__, name))
if value is None:
value = self._defaults_dict[name]['default']
attr_setter = self._defaults_dict[name]['setter']
value = attr_setter(value)
super(Parallel, self).__setattr__(name, value)

@property
def _defaults_dict(self):
"""
Define the default value and available options for each settable
`kwarg` of this `Option`.

Note: `'options'` can be set to `None` if it is not reasonable to
specify all possible options. For example, this may be reasonable if
the `'options'` were a range of numbers.

"""
return {'_scheduler': {'default': None, 'options': None,
'setter': self.set__scheduler},
'scheduler': {'default': 'threaded',
'options': ['threaded',
'multiprocessing',
'async',
'distributed'],
'setter': self.set_scheduler},
'num_workers': {'default': 1, 'options': None,
'setter': self.set_num_workers},
'dask_scheduler': {'default': None, 'options': None,
'setter': self.set_dask_scheduler},
}

def set__scheduler(self, value):
return value

def set_scheduler(self, value):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These set_<key> methods are confusingly named. They don't actually set anything. A better name would be convert_<key>, IMO.

Copy link
Member Author

@DPeterK DPeterK Apr 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They define values that are set for keys in the instance's underlying __dict__. In this sense they behave very similarly to the @property.setter in a more traditional class.

I think renaming them to convert_xxx would be more confusing as I think "converting" is further from what they're doing than "setting" is.

Copy link
Contributor

@djkirkham djkirkham Apr 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

they behave very similarly to the @property.setter in a more traditional class

But they don't behave like a setter because they don't set anything!

default = self._defaults_dict['scheduler']['default']
if value is None:
value = default
elif re.match(r'^(\d{1,3}\.){3}\d{1,3}:\d{1,5}$', value):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dkillick You could compile this re ...

if distributed is not None:
value = 'distributed'
else:
# Distributed not available.
wmsg = 'Cannot import distributed. Defaulting to {}.'
warnings.warn(wmsg.format(default))
self.set_scheduler(default)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't actually do anything, assuming the default scheduler is always the same. Did you mean to return this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My intention is that the scheduler setter is re-run on the default value in case it throws up any problems with setting the scheduler to the default in this case. Either way, the default value will very likely be returned in the next call of set_scheduler.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But what is returned from this function is value, and that isn't changed if this else section is reached. So ultimately what will be set in the attribute is the original value passed in, despite what the warning says.

Copy link
Contributor

@djkirkham djkirkham Apr 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I see what happens. When Option.__setattr__ is called, it doesn't recognise the address as a legitimate value, and sets the attribute to the default. But I still don't understand the point of this self.set_scheduler(default) call.

elif value not in self._defaults_dict['scheduler']['options']:
# Invalid value for `scheduler`.
wmsg = 'Invalid value for scheduler: {!r}. Defaulting to {}.'
warnings.warn(wmsg.format(value, default))
self.set_scheduler(default)
return value

def set_num_workers(self, value):
default = self._defaults_dict['num_workers']['default']
scheduler = self.get('scheduler')
if scheduler == 'async' and value != default:
wmsg = 'Cannot set `num_workers` for the serial scheduler {!r}.'
warnings.warn(wmsg.format(scheduler))
value = None
elif scheduler == 'distributed' and value != default:
wmsg = ('Attempting to set `num_workers` with the {!r} scheduler '
'requested. Please instead specify number of workers when '
'setting up the distributed scheduler. See '
'https://distributed.readthedocs.io/en/latest/index.html '
'for more details.')
warnings.warn(wmsg.format(scheduler))
value = None
else:
if value is None:
value = default
if value >= cpu_count():
# Limit maximum CPUs used to 1 fewer than all available CPUs.
wmsg = ('Requested more CPUs ({}) than total available ({}). '
'Limiting number of used CPUs to {}.')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dkillick Is this going to be a bit confusing? What if somebody requests that maximum number of CPUs available, and then is only allowed to use one less than the max available, but the warning message states that the total available is the same number that they requested.

For example, let's say my CPU count is 8, and as a parallelisation beginner I want to see how fast my script is when I run it on all 8 CPUs, so I request 8. This isn't allowed by set_num_workers, so I get a warning message which says:
'Requested more CPUs (8) than total available (8). Limiting number of used CPUs to 7'

I would find this message confusing, and it would make me angry. Maybe it just needs a little clarification that you can't use all the CPUs available because it's silly.

warnings.warn(wmsg.format(value, cpu_count(), cpu_count()-1))
value = cpu_count() - 1
return value

def set_dask_scheduler(self, scheduler):
if scheduler == 'threaded':
value = dask.threaded.get
elif scheduler == 'multiprocessing':
value = dask.multiprocessing.get
elif scheduler == 'async':
value = dask.async.get_sync
elif scheduler == 'distributed':
value = self.get('_scheduler')
return value

def _set_dask_options(self):
"""
Use `dask.set_options` to globally apply the options specified at
instantiation, either for the lifetime of the session or
context manager.

"""
scheduler = self.get('scheduler')
get = self.get('dask_scheduler')
pool = None

if scheduler in ['threaded', 'multiprocessing']:
num_workers = self.get('num_workers')
pool = ThreadPool(num_workers)
elif scheduler == 'distributed':
get = distributed.Client(get).get

dask.set_options(get=get, pool=pool)

def get(self, item):
return getattr(self, item)

@contextlib.contextmanager
def context(self, **kwargs):
# Snapshot the starting state for restoration at the end of the
# contextmanager block.
starting_state = self.__dict__.copy()
# Update the state to reflect the requested changes.
for name, value in six.iteritems(kwargs):
setattr(self, name, value)
setattr(self, 'dask_scheduler', self.get('scheduler'))
self._set_dask_options()
try:
yield
finally:
# Return the state to the starting state.
self.__dict__.clear()
self.__dict__.update(starting_state)
self._set_dask_options()


parallel = Parallel()
20 changes: 20 additions & 0 deletions lib/iris/tests/unit/config/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# (C) British Crown Copyright 2017, Met Office
#
# This file is part of Iris.
#
# Iris is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the
# Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Iris is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Iris. If not, see <http://www.gnu.org/licenses/>.
"""Unit tests for the :mod:`iris.config` module."""

from __future__ import (absolute_import, division, print_function)
from six.moves import (filter, input, map, range, zip) # noqa
Loading