Skip to content

Commit

Permalink
Merge pull request #608 from bouthilx/feature/cmdline_parallel_workers
Browse files Browse the repository at this point in the history
Add Executor class and CLI arguments for parallelism
  • Loading branch information
bouthilx authored May 18, 2021
2 parents 0c5c74d + 7dc1c73 commit e0ad3fb
Show file tree
Hide file tree
Showing 30 changed files with 548 additions and 129 deletions.
14 changes: 14 additions & 0 deletions docs/src/code/executor.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
****************
Executor modules
****************

.. automodule:: orion.executor
:members:

.. toctree::
:maxdepth: 2
:caption: Executor modules of Oríon

executor/base
executor/dask
executor/joblib
5 changes: 5 additions & 0 deletions docs/src/code/executor/base.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Base
====

.. automodule:: orion.executor.base
:members:
5 changes: 5 additions & 0 deletions docs/src/code/executor/dask.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Dask Executor
=============

.. automodule:: orion.executor.dask_backend
:members:
5 changes: 5 additions & 0 deletions docs/src/code/executor/joblib.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Joblib Executor
===============

.. automodule:: orion.executor.joblib_backend
:members:
1 change: 1 addition & 0 deletions docs/src/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
code/benchmark
code/client
code/core
code/executor
code/plotting
code/storage
code/testing
Expand Down
44 changes: 44 additions & 0 deletions docs/src/user/config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ Full Example of Global Configuration
working_dir:
worker:
n_workers: 1
executor: joblib
executor_configuration: {}
heartbeat: 120
interrupt_signal_code: 130
max_broken: 10
Expand Down Expand Up @@ -372,6 +375,9 @@ Worker
.. code-block:: yaml
worker:
n_workers: 1
executor: joblib
executor_configuration: {}
heartbeat: 120
interrupt_signal_code: 130
max_broken: 10
Expand All @@ -381,6 +387,44 @@ Worker
.. _config_worker_n_workers:

n_workers
~~~~~~~~~

:Type: int
:Default: 1
:Env var: ORION_N_WORKERS
:Description:
Number of workers to run in parallel.
It is possible to run many `orion hunt` in parallel, and each will spawn
``n_workers``.


.. _config_worker_executor:

executor
~~~~~~~~

:Type: str
:Default: joblib
:Env var: ORION_EXECUTOR
:Description:
The executor backend used to parallelize orion workers.


.. _config_worker_executor_configuration:

executor_configuration
~~~~~~~~~~~~~~~~~~~~~~

:Type: str
:Default: {}
:Description:
The configuration of the executor. See :py:mod:`orion.executor` for documentation
of executors configuration.


.. _config_worker_heartbeat:

heartbeat
Expand Down
70 changes: 70 additions & 0 deletions examples/dask_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
"""
===========
Parallelism
===========
Multiprocessing
---------------
(joblib's loky)
Dask
----
"""

import joblib
import numpy
from sklearn import datasets
from sklearn.model_selection import cross_validate
from sklearn.svm import SVC

from orion.client import create_experiment


def main(C, gamma, tol, class_weight):

diabetes = datasets.load_diabetes()

X = diabetes.data
y = diabetes.target

model = SVC(kernel="rbf", C=C, gamma=gamma, tol=tol, class_weight=class_weight)

# Single metric evaluation using cross_validate
with joblib.parallel_backend("dask"):
cv_results = cross_validate(model, X, y, cv=5)

accuracy = numpy.mean(cv_results["test_score"])
error_rate = 1 - accuracy

return [{"name": "test_error_rate", "type": "objective", "value": error_rate}]


def hpo(n_workers=16):

experiment = create_experiment(
name="dask",
max_trials=1000,
max_broken=5,
space={
"C": "loguniform(1e-6, 1e6, precision=None)",
"gamma": "loguniform(1e-8, 1e8, precision=None)",
"tol": "loguniform(1e-4, 1e-1, precision=None)",
"class_weight": "choices([None, 'balanced'])",
},
algorithms={"random": {"seed": 1}},
)

with experiment.tmp_executor("dask", n_workers=n_workers):
experiment.workon(main, n_workers=n_workers // 2)

experiment.plot.regret().show()
experiment.plot.partial_dependencies(params=["C", "gamma", "tol"]).show()


if __name__ == "__main__":
hpo()
7 changes: 6 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"orion.benchmark",
"orion.client",
"orion.core",
"orion.executor",
"orion.plotting",
"orion.serving",
"orion.storage",
Expand Down Expand Up @@ -57,6 +58,10 @@
"track = orion.storage.track:Track",
"legacy = orion.storage.legacy:Legacy",
],
"Executor": [
"joblib = orion.executor.joblib_backend:Joblib",
"dask = orion.executor.dask_backend:Dask",
],
},
install_requires=[
"PyYAML",
Expand All @@ -80,7 +85,7 @@
],
tests_require=tests_require,
setup_requires=["setuptools", "appdirs", "pytest-runner"],
extras_require=dict(test=tests_require),
extras_require=dict(test=tests_require, dask=["dask[complete]"]),
# "Zipped eggs don't play nicely with namespace packaging"
# from https://github.com/pypa/sample-namespace-packages
zip_safe=False,
Expand Down
3 changes: 1 addition & 2 deletions src/orion/algo/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,7 @@ def __init__(self, space, **kwargs):
if isinstance(subalgo_kwargs, dict):
param = OptimizationAlgorithm(subalgo_type, space, **subalgo_kwargs)
elif (
isinstance(param, str)
and param.lower() in OptimizationAlgorithm.typenames
isinstance(param, str) and param.lower() in OptimizationAlgorithm.types
):
# pylint: disable=too-many-function-args
param = OptimizationAlgorithm(param, space)
Expand Down
2 changes: 1 addition & 1 deletion src/orion/client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ def workon(
producer = Producer(experiment)

experiment_client = ExperimentClient(experiment, producer)
experiment_client.workon(function, max_trials=max_trials)
experiment_client.workon(function, n_workers=1, max_trials=max_trials)

finally:
# Restore singletons
Expand Down
65 changes: 48 additions & 17 deletions src/orion/client/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@
import inspect
import logging
import traceback

from joblib import Parallel, delayed
from contextlib import contextmanager

import orion.core
import orion.core.utils.format_trials as format_trials
Expand All @@ -26,6 +25,7 @@
from orion.core.utils.flatten import flatten, unflatten
from orion.core.worker.trial import Trial, TrialCM
from orion.core.worker.trial_pacemaker import TrialPacemaker
from orion.executor.base import Executor
from orion.plotting.base import PlotAccessor
from orion.storage.base import FailedUpdate

Expand Down Expand Up @@ -74,13 +74,18 @@ class ExperimentClient:
"""

def __init__(self, experiment, producer, heartbeat=None):
def __init__(self, experiment, producer, executor=None, heartbeat=None):
self._experiment = experiment
self._producer = producer
self._pacemakers = {}
if heartbeat is None:
heartbeat = orion.core.config.worker.heartbeat
self.heartbeat = heartbeat
self.executor = executor or Executor(
orion.core.config.worker.executor,
n_workers=orion.core.config.worker.n_workers,
**orion.core.config.worker.executor_configuration,
)
self.plot = PlotAccessor(self)

###
Expand Down Expand Up @@ -594,10 +599,31 @@ def observe(self, trial, results):
finally:
self._release_reservation(trial, raise_if_unreserved=raise_if_unreserved)

@contextmanager
def tmp_executor(self, executor, **config):
"""Temporarily change the executor backend of the experiment client.
Parameters
----------
executor: str or :class:`orion.executor.base.Executor`
The executor to use. If it is a ``str``, the provided ``config`` will be used
to create the executor with ``Executor(executor, **config)``.
**config:
Configuration to use if ``executor`` is a ``str``.
"""
if isinstance(executor, str):
executor = Executor(executor, **config)
old_executor = self.executor
self.executor = executor
with executor:
yield self
self.executor = old_executor

def workon(
self,
fct,
n_workers=1,
n_workers=None,
max_trials=None,
max_trials_per_worker=None,
max_broken=None,
Expand All @@ -616,7 +642,7 @@ def workon(
parameter can be passed as ``**kwargs`` to `workon`. Function must return the final
objective.
n_workers: int, optional
Number of workers to run in parallel.
Number of workers to run in parallel. Defaults to value of global config.
max_trials: int, optional
Maximum number of trials to execute within ``workon``. If the experiment or algorithm
reach status is_done before, the execution of ``workon`` terminates.
Expand Down Expand Up @@ -667,6 +693,9 @@ def workon(
"""
self._check_if_executable()

if n_workers is None:
n_workers = orion.core.config.worker.n_workers

if max_trials is None:
max_trials = self.max_trials

Expand All @@ -682,18 +711,18 @@ def workon(
self._experiment.max_trials = max_trials
self._experiment.algorithms.algorithm.max_trials = max_trials

with Parallel(n_jobs=n_workers) as parallel:
trials = parallel(
delayed(self._optimize)(
fct,
max_trials_per_worker,
max_broken,
trial_arg,
on_error,
**kwargs,
)
for _ in range(n_workers)
trials = self.executor.wait(
self.executor.submit(
self._optimize,
fct,
max_trials_per_worker,
max_broken,
trial_arg,
on_error,
**kwargs,
)
for _ in range(n_workers)
)

return sum(trials)

Expand All @@ -712,7 +741,9 @@ def _optimize(self, fct, max_trials, max_broken, trial_arg, on_error, **kwargs):
kwargs[trial_arg] = trial

try:
results = fct(**unflatten(kwargs))
results = self.executor.wait(
[self.executor.submit(fct, **unflatten(kwargs))]
)[0]
self.observe(trial, results=results)
except (KeyboardInterrupt, InvalidResult):
raise
Expand Down
31 changes: 31 additions & 0 deletions src/orion/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,37 @@ def define_worker_config(config):
"""Create and define the fields of the worker configuration."""
worker_config = Configuration()

worker_config.add_option(
"n_workers",
option_type=int,
default=1,
env_var="ORION_N_WORKERS",
help=(
"Number of workers to run in parallel. "
"It is possible to run many ``orion hunt`` in parallel, and each will spawn "
"``n_workers``."
),
)

worker_config.add_option(
"executor",
option_type=str,
default="joblib",
env_var="ORION_EXECUTOR",
help="The executor backend used to parallelize orion workers.",
)

worker_config.add_option(
"executor_configuration",
option_type=dict,
default={},
help=(
"The configuration of the executor. See "
"https://orion.readthedocs.io/en/stable/code/executor.html for documentation "
"of executors configuration."
),
)

worker_config.add_option(
"heartbeat",
option_type=int,
Expand Down
Loading

0 comments on commit e0ad3fb

Please sign in to comment.