Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ script:
# ray tune tests
- ./ci/suppress_output python python/ray/tune/tests/test_dependency.py
# `cluster_tests.py` runs on Jenkins, not Travis.
- python -m pytest --durations=10 --ignore=python/ray/tune/tests/test_cluster.py python/ray/tune/tests
- python -m pytest --durations=10 --ignore=python/ray/tune/tests/test_cluster.py --ignore=python/ray/tune/tests/test_actor_reuse.py python/ray/tune/tests

# ray rllib tests
- python/ray/rllib/tests/run_silent.sh tests/test_catalog.py
Expand Down
3 changes: 3 additions & 0 deletions ci/jenkins_tests/run_tune_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ echo "Using Docker image" $DOCKER_SHA
$SUPPRESS_OUTPUT docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \
pytest /ray/python/ray/tune/tests/test_cluster.py

$SUPPRESS_OUTPUT docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \
pytest /ray/python/ray/tune/tests/test_actor_reuse.py

$SUPPRESS_OUTPUT docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \
python /ray/python/ray/tune/examples/tune_mnist_ray.py \
--smoke-test
Expand Down
5 changes: 5 additions & 0 deletions python/ray/tune/error.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,8 @@
class TuneError(Exception):
"""General error class raised by ray.tune."""
pass


class AbortTrialExecution(TuneError):
"""Error that indicates a trial should not be retried."""
pass
1 change: 1 addition & 0 deletions python/ray/tune/examples/pbt_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,5 @@ def reset_config(self, new_config):
}
},
scheduler=pbt,
reuse_actors=True,
verbose=False)
83 changes: 62 additions & 21 deletions python/ray/tune/ray_trial_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@
import traceback

import ray
from ray.tune.error import TuneError
from ray.tune.error import TuneError, AbortTrialExecution
from ray.tune.logger import NoopLogger
from ray.tune.trial import Trial, Resources, Checkpoint
from ray.tune.trial_executor import TrialExecutor
from ray.tune.util import warn_if_slow

logger = logging.getLogger(__name__)

Expand All @@ -30,7 +31,7 @@ def unwrap(self):
class RayTrialExecutor(TrialExecutor):
"""An implemention of TrialExecutor based on Ray."""

def __init__(self, queue_trials=False):
def __init__(self, queue_trials=False, reuse_actors=False):
super(RayTrialExecutor, self).__init__(queue_trials)
self._running = {}
# Since trial resume after paused should not run
Expand All @@ -40,21 +41,46 @@ def __init__(self, queue_trials=False):
self._avail_resources = Resources(cpu=0, gpu=0)
self._committed_resources = Resources(cpu=0, gpu=0)
self._resources_initialized = False
self._reuse_actors = reuse_actors
self._cached_actor = None
if ray.is_initialized():
self._update_avail_resources()

def _setup_runner(self, trial):
cls = ray.remote(
num_cpus=trial.resources.cpu,
num_gpus=trial.resources.gpu,
resources=trial.resources.custom_resources)(
trial._get_trainable_cls())
def _setup_runner(self, trial, reuse_allowed):
if (self._reuse_actors and reuse_allowed
and self._cached_actor is not None):
logger.debug("Reusing cached runner {} for {}".format(
self._cached_actor, trial.trial_id))
existing_runner = self._cached_actor
self._cached_actor = None
else:
if self._cached_actor:
logger.debug(
"Cannot reuse cached runner {} for new trial".format(
self._cached_actor))
self._cached_actor.stop.remote()
self._cached_actor.__ray_terminate__.remote()
self._cached_actor = None
existing_runner = None
cls = ray.remote(
num_cpus=trial.resources.cpu,
num_gpus=trial.resources.gpu,
resources=trial.resources.custom_resources)(
trial._get_trainable_cls())

trial.init_logger()
# We checkpoint metadata here to try mitigating logdir duplication
self.try_checkpoint_metadata(trial)
remote_logdir = trial.logdir

if existing_runner:
trial.runner = existing_runner
if not self.reset_trial(trial, trial.config, trial.experiment_tag):
raise AbortTrialExecution(
"Trial runner reuse requires reset_trial() to be "
"implemented and return True.")
return existing_runner

def logger_creator(config):
# Set the working dir in the remote process, for user file writes
if not os.path.exists(remote_logdir):
Expand Down Expand Up @@ -86,7 +112,10 @@ def _start_trial(self, trial, checkpoint=None):
"""
prior_status = trial.status
self.set_status(trial, Trial.RUNNING)
trial.runner = self._setup_runner(trial)
trial.runner = self._setup_runner(
trial,
reuse_allowed=checkpoint is not None
or trial._checkpoint.value is not None)
if not self.restore(trial, checkpoint):
if trial.status == Trial.ERROR:
raise RuntimeError(
Expand Down Expand Up @@ -126,12 +155,18 @@ def _stop_trial(self, trial, error=False, error_msg=None,
try:
trial.write_error_log(error_msg)
if hasattr(trial, 'runner') and trial.runner:
stop_tasks = []
stop_tasks.append(trial.runner.stop.remote())
stop_tasks.append(trial.runner.__ray_terminate__.remote())
# TODO(ekl) seems like wait hangs when killing actors
_, unfinished = ray.wait(
stop_tasks, num_returns=2, timeout=0.25)
if (not error and self._reuse_actors
and self._cached_actor is None):
logger.debug("Reusing actor for {}".format(trial.runner))
self._cached_actor = trial.runner
else:
logger.info(
"Destroying actor for trial {}. If your trainable is "
"slow to initialize, consider setting "
"reuse_actors=True to reduce actor creation "
"overheads.".format(trial))
trial.runner.stop.remote()
trial.runner.__ray_terminate__.remote()
except Exception:
logger.exception("Error stopping runner for Trial %s", str(trial))
self.set_status(trial, Trial.ERROR)
Expand All @@ -152,11 +187,13 @@ def start_trial(self, trial, checkpoint=None):
self._commit_resources(trial.resources)
try:
self._start_trial(trial, checkpoint)
except Exception:
except Exception as e:
logger.exception("Error starting runner for Trial %s", str(trial))
error_msg = traceback.format_exc()
time.sleep(2)
self._stop_trial(trial, error=True, error_msg=error_msg)
if isinstance(e, AbortTrialExecution):
return # don't retry fatal Tune errors
try:
# This forces the trial to not start from checkpoint.
trial.clear_checkpoint()
Expand Down Expand Up @@ -222,7 +259,8 @@ def reset_trial(self, trial, new_config, new_experiment_tag):
trial.experiment_tag = new_experiment_tag
trial.config = new_config
trainable = trial.runner
reset_val = ray.get(trainable.reset_config.remote(new_config))
with warn_if_slow("reset_config"):
reset_val = ray.get(trainable.reset_config.remote(new_config))
return reset_val

def get_running_trials(self):
Expand All @@ -249,7 +287,8 @@ def fetch_result(self, trial):
if not trial_future:
raise ValueError("Trial was not running.")
self._running.pop(trial_future[0])
result = ray.get(trial_future[0])
with warn_if_slow("fetch_result"):
result = ray.get(trial_future[0])

# For local mode
if isinstance(result, _LocalWrapper):
Expand Down Expand Up @@ -400,7 +439,8 @@ def save(self, trial, storage=Checkpoint.DISK):
if storage == Checkpoint.MEMORY:
trial._checkpoint.value = trial.runner.save_to_object.remote()
else:
trial._checkpoint.value = ray.get(trial.runner.save.remote())
with warn_if_slow("save_to_disk"):
trial._checkpoint.value = ray.get(trial.runner.save.remote())
return trial._checkpoint.value

def restore(self, trial, checkpoint=None):
Expand All @@ -421,11 +461,12 @@ def restore(self, trial, checkpoint=None):
value = checkpoint.value
if checkpoint.storage == Checkpoint.MEMORY:
assert type(value) != Checkpoint, type(value)
ray.get(trial.runner.restore_from_object.remote(value))
trial.runner.restore_from_object.remote(value)
else:
worker_ip = ray.get(trial.runner.current_ip.remote())
trial.sync_logger_to_new_location(worker_ip)
ray.get(trial.runner.restore.remote(value))
with warn_if_slow("restore_from_disk"):
ray.get(trial.runner.restore.remote(value))
trial.last_result = checkpoint.last_result
return True
except Exception:
Expand Down
19 changes: 10 additions & 9 deletions python/ray/tune/schedulers/pbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,23 +218,24 @@ def _exploit(self, trial_executor, trial, trial_to_clone):
trial_state = self._trial_state[trial]
new_state = self._trial_state[trial_to_clone]
if not new_state.last_checkpoint:
logger.warning("[pbt]: no checkpoint for trial."
" Skip exploit for Trial {}".format(trial))
logger.info("[pbt]: no checkpoint for trial."
" Skip exploit for Trial {}".format(trial))
return
new_config = explore(trial_to_clone.config, self._hyperparam_mutations,
self._resample_probability,
self._custom_explore_fn)
logger.warning("[exploit] transferring weights from trial "
"{} (score {}) -> {} (score {})".format(
trial_to_clone, new_state.last_score, trial,
trial_state.last_score))
# TODO(ekl) restarting the trial is expensive. We should implement a
# lighter way reset() method that can alter the trial config.
logger.info("[exploit] transferring weights from trial "
"{} (score {}) -> {} (score {})".format(
trial_to_clone, new_state.last_score, trial,
trial_state.last_score))
new_tag = make_experiment_tag(trial_state.orig_tag, new_config,
self._hyperparam_mutations)
reset_successful = trial_executor.reset_trial(trial, new_config,
new_tag)
if not reset_successful:
if reset_successful:
trial_executor.restore(
trial, Checkpoint.from_object(new_state.last_checkpoint))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@arcelien is this ok?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me try it on a single gpu machine both with time mutliplexing and also with a small population size.

else:
trial_executor.stop_trial(trial, stop_logger=False)
trial.config = new_config
trial.experiment_tag = new_tag
Expand Down
96 changes: 96 additions & 0 deletions python/ray/tune/tests/test_actor_reuse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import unittest

import ray
from ray.tune import Trainable, run_experiments
from ray.tune.error import TuneError
from ray.tune.schedulers.trial_scheduler import FIFOScheduler, TrialScheduler


class FrequentPausesScheduler(FIFOScheduler):
def on_trial_result(self, trial_runner, trial, result):
return TrialScheduler.PAUSE


class MyResettableClass(Trainable):
def _setup(self, config):
self.config = config
self.num_resets = 0
self.iter = 0

def _train(self):
self.iter += 1
return {"num_resets": self.num_resets, "done": self.iter > 1}

def _save(self, chkpt_dir):
return {"iter": self.iter}

def _restore(self, item):
self.iter = item["iter"]

def reset_config(self, new_config):
if "fake_reset_not_supported" in self.config:
return False
self.num_resets += 1
return True


class ActorReuseTest(unittest.TestCase):
def setUp(self):
ray.init(num_cpus=1, num_gpus=0)

def tearDown(self):
ray.shutdown()

def testTrialReuseDisabled(self):
trials = run_experiments(
{
"foo": {
"run": MyResettableClass,
"num_samples": 4,
"config": {},
}
},
reuse_actors=False,
scheduler=FrequentPausesScheduler())
self.assertEqual([t.last_result["num_resets"] for t in trials],
[0, 0, 0, 0])

def testTrialReuseEnabled(self):
trials = run_experiments(
{
"foo": {
"run": MyResettableClass,
"num_samples": 4,
"config": {},
}
},
reuse_actors=True,
scheduler=FrequentPausesScheduler())
self.assertEqual([t.last_result["num_resets"] for t in trials],
[1, 2, 3, 4])

def testTrialReuseEnabledError(self):
def run():
run_experiments(
{
"foo": {
"run": MyResettableClass,
"max_failures": 1,
"num_samples": 4,
"config": {
"fake_reset_not_supported": True
},
}
},
reuse_actors=True,
scheduler=FrequentPausesScheduler())

self.assertRaises(TuneError, lambda: run())


if __name__ == "__main__":
unittest.main(verbosity=2)
9 changes: 8 additions & 1 deletion python/ray/tune/trainable.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,9 @@ def restore(self, checkpoint_path):
self._restore(checkpoint_dict)
else:
self._restore(checkpoint_path)
self._time_since_restore = 0.0
self._timesteps_since_restore = 0
self._iterations_since_restore = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch

self._restored = True

def restore_from_object(self, obj):
Expand Down Expand Up @@ -350,12 +353,16 @@ def export_model(self, export_formats, export_dir=None):
def reset_config(self, new_config):
"""Resets configuration without restarting the trial.

This method is optional, but can be implemented to speed up algorithms
such as PBT, and to allow performance optimizations such as running
experiments with reuse_actors=True.

Args:
new_config (dir): Updated hyperparameter configuration
for the trainable.

Returns:
True if configuration reset successfully else False.
True if reset was successful else False.
"""
return False

Expand Down
Loading