diff --git a/.travis.yml b/.travis.yml index 158657eb93f6..e077f57d7446 100644 --- a/.travis.yml +++ b/.travis.yml @@ -161,7 +161,7 @@ script: # ray tune tests - ./ci/suppress_output python python/ray/tune/tests/test_dependency.py # `cluster_tests.py` runs on Jenkins, not Travis. - - python -m pytest --durations=10 --ignore=python/ray/tune/tests/test_cluster.py python/ray/tune/tests + - python -m pytest --durations=10 --ignore=python/ray/tune/tests/test_cluster.py --ignore=python/ray/tune/tests/test_actor_reuse.py python/ray/tune/tests # ray rllib tests - python/ray/rllib/tests/run_silent.sh tests/test_catalog.py diff --git a/ci/jenkins_tests/run_tune_tests.sh b/ci/jenkins_tests/run_tune_tests.sh index 6e8e63aa607b..6154fe70d4f6 100755 --- a/ci/jenkins_tests/run_tune_tests.sh +++ b/ci/jenkins_tests/run_tune_tests.sh @@ -34,6 +34,9 @@ echo "Using Docker image" $DOCKER_SHA $SUPPRESS_OUTPUT docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ pytest /ray/python/ray/tune/tests/test_cluster.py +$SUPPRESS_OUTPUT docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ + pytest /ray/python/ray/tune/tests/test_actor_reuse.py + $SUPPRESS_OUTPUT docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ python /ray/python/ray/tune/examples/tune_mnist_ray.py \ --smoke-test diff --git a/python/ray/tune/error.py b/python/ray/tune/error.py index badf60a08fdc..2afd47a9245a 100644 --- a/python/ray/tune/error.py +++ b/python/ray/tune/error.py @@ -6,3 +6,8 @@ class TuneError(Exception): """General error class raised by ray.tune.""" pass + + +class AbortTrialExecution(TuneError): + """Error that indicates a trial should not be retried.""" + pass diff --git a/python/ray/tune/examples/pbt_example.py b/python/ray/tune/examples/pbt_example.py index 3433e82f94ee..9cc387e5b110 100755 --- a/python/ray/tune/examples/pbt_example.py +++ b/python/ray/tune/examples/pbt_example.py @@ -96,4 +96,5 @@ def reset_config(self, new_config): } }, scheduler=pbt, + reuse_actors=True, verbose=False) diff --git a/python/ray/tune/ray_trial_executor.py b/python/ray/tune/ray_trial_executor.py index c9bb976aca51..9af5e48d0fda 100644 --- a/python/ray/tune/ray_trial_executor.py +++ b/python/ray/tune/ray_trial_executor.py @@ -10,10 +10,11 @@ import traceback import ray -from ray.tune.error import TuneError +from ray.tune.error import TuneError, AbortTrialExecution from ray.tune.logger import NoopLogger from ray.tune.trial import Trial, Resources, Checkpoint from ray.tune.trial_executor import TrialExecutor +from ray.tune.util import warn_if_slow logger = logging.getLogger(__name__) @@ -30,7 +31,7 @@ def unwrap(self): class RayTrialExecutor(TrialExecutor): """An implemention of TrialExecutor based on Ray.""" - def __init__(self, queue_trials=False): + def __init__(self, queue_trials=False, reuse_actors=False): super(RayTrialExecutor, self).__init__(queue_trials) self._running = {} # Since trial resume after paused should not run @@ -40,21 +41,46 @@ def __init__(self, queue_trials=False): self._avail_resources = Resources(cpu=0, gpu=0) self._committed_resources = Resources(cpu=0, gpu=0) self._resources_initialized = False + self._reuse_actors = reuse_actors + self._cached_actor = None if ray.is_initialized(): self._update_avail_resources() - def _setup_runner(self, trial): - cls = ray.remote( - num_cpus=trial.resources.cpu, - num_gpus=trial.resources.gpu, - resources=trial.resources.custom_resources)( - trial._get_trainable_cls()) + def _setup_runner(self, trial, reuse_allowed): + if (self._reuse_actors and reuse_allowed + and self._cached_actor is not None): + logger.debug("Reusing cached runner {} for {}".format( + self._cached_actor, trial.trial_id)) + existing_runner = self._cached_actor + self._cached_actor = None + else: + if self._cached_actor: + logger.debug( + "Cannot reuse cached runner {} for new trial".format( + self._cached_actor)) + self._cached_actor.stop.remote() + self._cached_actor.__ray_terminate__.remote() + self._cached_actor = None + existing_runner = None + cls = ray.remote( + num_cpus=trial.resources.cpu, + num_gpus=trial.resources.gpu, + resources=trial.resources.custom_resources)( + trial._get_trainable_cls()) trial.init_logger() # We checkpoint metadata here to try mitigating logdir duplication self.try_checkpoint_metadata(trial) remote_logdir = trial.logdir + if existing_runner: + trial.runner = existing_runner + if not self.reset_trial(trial, trial.config, trial.experiment_tag): + raise AbortTrialExecution( + "Trial runner reuse requires reset_trial() to be " + "implemented and return True.") + return existing_runner + def logger_creator(config): # Set the working dir in the remote process, for user file writes if not os.path.exists(remote_logdir): @@ -86,7 +112,10 @@ def _start_trial(self, trial, checkpoint=None): """ prior_status = trial.status self.set_status(trial, Trial.RUNNING) - trial.runner = self._setup_runner(trial) + trial.runner = self._setup_runner( + trial, + reuse_allowed=checkpoint is not None + or trial._checkpoint.value is not None) if not self.restore(trial, checkpoint): if trial.status == Trial.ERROR: raise RuntimeError( @@ -126,12 +155,18 @@ def _stop_trial(self, trial, error=False, error_msg=None, try: trial.write_error_log(error_msg) if hasattr(trial, 'runner') and trial.runner: - stop_tasks = [] - stop_tasks.append(trial.runner.stop.remote()) - stop_tasks.append(trial.runner.__ray_terminate__.remote()) - # TODO(ekl) seems like wait hangs when killing actors - _, unfinished = ray.wait( - stop_tasks, num_returns=2, timeout=0.25) + if (not error and self._reuse_actors + and self._cached_actor is None): + logger.debug("Reusing actor for {}".format(trial.runner)) + self._cached_actor = trial.runner + else: + logger.info( + "Destroying actor for trial {}. If your trainable is " + "slow to initialize, consider setting " + "reuse_actors=True to reduce actor creation " + "overheads.".format(trial)) + trial.runner.stop.remote() + trial.runner.__ray_terminate__.remote() except Exception: logger.exception("Error stopping runner for Trial %s", str(trial)) self.set_status(trial, Trial.ERROR) @@ -152,11 +187,13 @@ def start_trial(self, trial, checkpoint=None): self._commit_resources(trial.resources) try: self._start_trial(trial, checkpoint) - except Exception: + except Exception as e: logger.exception("Error starting runner for Trial %s", str(trial)) error_msg = traceback.format_exc() time.sleep(2) self._stop_trial(trial, error=True, error_msg=error_msg) + if isinstance(e, AbortTrialExecution): + return # don't retry fatal Tune errors try: # This forces the trial to not start from checkpoint. trial.clear_checkpoint() @@ -222,7 +259,8 @@ def reset_trial(self, trial, new_config, new_experiment_tag): trial.experiment_tag = new_experiment_tag trial.config = new_config trainable = trial.runner - reset_val = ray.get(trainable.reset_config.remote(new_config)) + with warn_if_slow("reset_config"): + reset_val = ray.get(trainable.reset_config.remote(new_config)) return reset_val def get_running_trials(self): @@ -249,7 +287,8 @@ def fetch_result(self, trial): if not trial_future: raise ValueError("Trial was not running.") self._running.pop(trial_future[0]) - result = ray.get(trial_future[0]) + with warn_if_slow("fetch_result"): + result = ray.get(trial_future[0]) # For local mode if isinstance(result, _LocalWrapper): @@ -400,7 +439,8 @@ def save(self, trial, storage=Checkpoint.DISK): if storage == Checkpoint.MEMORY: trial._checkpoint.value = trial.runner.save_to_object.remote() else: - trial._checkpoint.value = ray.get(trial.runner.save.remote()) + with warn_if_slow("save_to_disk"): + trial._checkpoint.value = ray.get(trial.runner.save.remote()) return trial._checkpoint.value def restore(self, trial, checkpoint=None): @@ -421,11 +461,12 @@ def restore(self, trial, checkpoint=None): value = checkpoint.value if checkpoint.storage == Checkpoint.MEMORY: assert type(value) != Checkpoint, type(value) - ray.get(trial.runner.restore_from_object.remote(value)) + trial.runner.restore_from_object.remote(value) else: worker_ip = ray.get(trial.runner.current_ip.remote()) trial.sync_logger_to_new_location(worker_ip) - ray.get(trial.runner.restore.remote(value)) + with warn_if_slow("restore_from_disk"): + ray.get(trial.runner.restore.remote(value)) trial.last_result = checkpoint.last_result return True except Exception: diff --git a/python/ray/tune/schedulers/pbt.py b/python/ray/tune/schedulers/pbt.py index d21ab504417d..cdb11dc98348 100644 --- a/python/ray/tune/schedulers/pbt.py +++ b/python/ray/tune/schedulers/pbt.py @@ -218,23 +218,24 @@ def _exploit(self, trial_executor, trial, trial_to_clone): trial_state = self._trial_state[trial] new_state = self._trial_state[trial_to_clone] if not new_state.last_checkpoint: - logger.warning("[pbt]: no checkpoint for trial." - " Skip exploit for Trial {}".format(trial)) + logger.info("[pbt]: no checkpoint for trial." + " Skip exploit for Trial {}".format(trial)) return new_config = explore(trial_to_clone.config, self._hyperparam_mutations, self._resample_probability, self._custom_explore_fn) - logger.warning("[exploit] transferring weights from trial " - "{} (score {}) -> {} (score {})".format( - trial_to_clone, new_state.last_score, trial, - trial_state.last_score)) - # TODO(ekl) restarting the trial is expensive. We should implement a - # lighter way reset() method that can alter the trial config. + logger.info("[exploit] transferring weights from trial " + "{} (score {}) -> {} (score {})".format( + trial_to_clone, new_state.last_score, trial, + trial_state.last_score)) new_tag = make_experiment_tag(trial_state.orig_tag, new_config, self._hyperparam_mutations) reset_successful = trial_executor.reset_trial(trial, new_config, new_tag) - if not reset_successful: + if reset_successful: + trial_executor.restore( + trial, Checkpoint.from_object(new_state.last_checkpoint)) + else: trial_executor.stop_trial(trial, stop_logger=False) trial.config = new_config trial.experiment_tag = new_tag diff --git a/python/ray/tune/tests/test_actor_reuse.py b/python/ray/tune/tests/test_actor_reuse.py new file mode 100644 index 000000000000..9fc8579b79aa --- /dev/null +++ b/python/ray/tune/tests/test_actor_reuse.py @@ -0,0 +1,96 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import unittest + +import ray +from ray.tune import Trainable, run_experiments +from ray.tune.error import TuneError +from ray.tune.schedulers.trial_scheduler import FIFOScheduler, TrialScheduler + + +class FrequentPausesScheduler(FIFOScheduler): + def on_trial_result(self, trial_runner, trial, result): + return TrialScheduler.PAUSE + + +class MyResettableClass(Trainable): + def _setup(self, config): + self.config = config + self.num_resets = 0 + self.iter = 0 + + def _train(self): + self.iter += 1 + return {"num_resets": self.num_resets, "done": self.iter > 1} + + def _save(self, chkpt_dir): + return {"iter": self.iter} + + def _restore(self, item): + self.iter = item["iter"] + + def reset_config(self, new_config): + if "fake_reset_not_supported" in self.config: + return False + self.num_resets += 1 + return True + + +class ActorReuseTest(unittest.TestCase): + def setUp(self): + ray.init(num_cpus=1, num_gpus=0) + + def tearDown(self): + ray.shutdown() + + def testTrialReuseDisabled(self): + trials = run_experiments( + { + "foo": { + "run": MyResettableClass, + "num_samples": 4, + "config": {}, + } + }, + reuse_actors=False, + scheduler=FrequentPausesScheduler()) + self.assertEqual([t.last_result["num_resets"] for t in trials], + [0, 0, 0, 0]) + + def testTrialReuseEnabled(self): + trials = run_experiments( + { + "foo": { + "run": MyResettableClass, + "num_samples": 4, + "config": {}, + } + }, + reuse_actors=True, + scheduler=FrequentPausesScheduler()) + self.assertEqual([t.last_result["num_resets"] for t in trials], + [1, 2, 3, 4]) + + def testTrialReuseEnabledError(self): + def run(): + run_experiments( + { + "foo": { + "run": MyResettableClass, + "max_failures": 1, + "num_samples": 4, + "config": { + "fake_reset_not_supported": True + }, + } + }, + reuse_actors=True, + scheduler=FrequentPausesScheduler()) + + self.assertRaises(TuneError, lambda: run()) + + +if __name__ == "__main__": + unittest.main(verbosity=2) diff --git a/python/ray/tune/trainable.py b/python/ray/tune/trainable.py index 32ea413b6947..8b640cbb45fc 100644 --- a/python/ray/tune/trainable.py +++ b/python/ray/tune/trainable.py @@ -310,6 +310,9 @@ def restore(self, checkpoint_path): self._restore(checkpoint_dict) else: self._restore(checkpoint_path) + self._time_since_restore = 0.0 + self._timesteps_since_restore = 0 + self._iterations_since_restore = 0 self._restored = True def restore_from_object(self, obj): @@ -350,12 +353,16 @@ def export_model(self, export_formats, export_dir=None): def reset_config(self, new_config): """Resets configuration without restarting the trial. + This method is optional, but can be implemented to speed up algorithms + such as PBT, and to allow performance optimizations such as running + experiments with reuse_actors=True. + Args: new_config (dir): Updated hyperparameter configuration for the trainable. Returns: - True if configuration reset successfully else False. + True if reset was successful else False. """ return False diff --git a/python/ray/tune/trial_runner.py b/python/ray/tune/trial_runner.py index 96dfaa5deef6..c891505414ad 100644 --- a/python/ray/tune/trial_runner.py +++ b/python/ray/tune/trial_runner.py @@ -16,6 +16,7 @@ from ray.tune.result import TIME_THIS_ITER_S from ray.tune.trial import Trial, Checkpoint from ray.tune.schedulers import FIFOScheduler, TrialScheduler +from ray.tune.util import warn_if_slow from ray.tune.web_server import TuneServer MAX_DEBUG_TRIALS = 20 @@ -70,6 +71,7 @@ def __init__(self, server_port=TuneServer.DEFAULT_PORT, verbose=True, queue_trials=False, + reuse_actors=False, trial_executor=None): """Initializes a new TrialRunner. @@ -87,12 +89,16 @@ def __init__(self, not currently have enough resources to launch one. This should be set to True when running on an autoscaling cluster to enable automatic scale-up. + reuse_actors (bool): Whether to reuse actors between different + trials when possible. This can drastically speed up experiments + that start and stop actors often (e.g., PBT in + time-multiplexing mode). trial_executor (TrialExecutor): Defaults to RayTrialExecutor. """ self._search_alg = search_alg self._scheduler_alg = scheduler or FIFOScheduler() - self.trial_executor = trial_executor or \ - RayTrialExecutor(queue_trials=queue_trials) + self.trial_executor = (trial_executor or RayTrialExecutor( + queue_trials=queue_trials, reuse_actors=reuse_actors)) # For debugging, it may be useful to halt trials after some time has # elapsed. TODO(ekl) consider exposing this in the API. @@ -226,7 +232,8 @@ def step(self): self.trial_executor.on_step_begin() next_trial = self._get_next_trial() if next_trial is not None: - self.trial_executor.start_trial(next_trial) + with warn_if_slow("start_trial"): + self.trial_executor.start_trial(next_trial) elif self.trial_executor.get_running_trials(): self._process_events() else: @@ -284,7 +291,8 @@ def add_trial(self, trial): """ trial.set_verbose(self._verbose) self._trials.append(trial) - self._scheduler_alg.on_trial_add(self, trial) + with warn_if_slow("scheduler.on_trial_add"): + self._scheduler_alg.on_trial_add(self, trial) self.trial_executor.try_checkpoint_metadata(trial) def debug_string(self, max_debug=MAX_DEBUG_TRIALS): @@ -388,6 +396,10 @@ def _get_next_trial(self): def _process_events(self): trial = self.trial_executor.get_next_available_trial() + with warn_if_slow("process_trial"): + self._process_trial(trial) + + def _process_trial(self, trial): try: result = self.trial_executor.fetch_result(trial) self._total_time += result[TIME_THIS_ITER_S] @@ -400,12 +412,15 @@ def _process_events(self): decision = TrialScheduler.STOP else: - decision = self._scheduler_alg.on_trial_result( - self, trial, result) - self._search_alg.on_trial_result(trial.trial_id, result) + with warn_if_slow("scheduler.on_trial_result"): + decision = self._scheduler_alg.on_trial_result( + self, trial, result) + with warn_if_slow("search_alg.on_trial_result"): + self._search_alg.on_trial_result(trial.trial_id, result) if decision == TrialScheduler.STOP: - self._search_alg.on_trial_complete( - trial.trial_id, early_terminated=True) + with warn_if_slow("search_alg.on_trial_complete"): + self._search_alg.on_trial_complete( + trial.trial_id, early_terminated=True) trial.update_last_result( result, terminate=(decision == TrialScheduler.STOP)) @@ -484,7 +499,8 @@ def _requeue_trial(self, trial): """ self._scheduler_alg.on_trial_error(self, trial) self.trial_executor.set_status(trial, Trial.PENDING) - self._scheduler_alg.on_trial_add(self, trial) + with warn_if_slow("scheduler.on_trial_add"): + self._scheduler_alg.on_trial_add(self, trial) def _update_trial_queue(self, blocking=False, timeout=600): """Adds next trials to queue if possible. diff --git a/python/ray/tune/tune.py b/python/ray/tune/tune.py index 0294497bf1f3..febaf357501d 100644 --- a/python/ray/tune/tune.py +++ b/python/ray/tune/tune.py @@ -60,6 +60,7 @@ def run_experiments(experiments, verbose=2, resume=False, queue_trials=False, + reuse_actors=False, trial_executor=None, raise_on_failed_trial=True): """Runs and blocks until all trials finish. @@ -84,6 +85,10 @@ def run_experiments(experiments, not currently have enough resources to launch one. This should be set to True when running on an autoscaling cluster to enable automatic scale-up. + reuse_actors (bool): Whether to reuse actors between different trials + when possible. This can drastically speed up experiments that start + and stop actors often (e.g., PBT in time-multiplexing mode). This + requires trials to have the same resource requirements. trial_executor (TrialExecutor): Manage the execution of trials. raise_on_failed_trial (bool): Raise TuneError if there exists failed trial (of ERROR state) when the experiments complete. @@ -161,6 +166,7 @@ def run_experiments(experiments, server_port=server_port, verbose=bool(verbose > 1), queue_trials=queue_trials, + reuse_actors=reuse_actors, trial_executor=trial_executor) if verbose: diff --git a/python/ray/tune/util.py b/python/ray/tune/util.py index 75ac57ef188a..a9a9ace94387 100644 --- a/python/ray/tune/util.py +++ b/python/ray/tune/util.py @@ -2,12 +2,16 @@ from __future__ import division from __future__ import print_function +import logging import base64 import copy import numpy as np +import time import ray +logger = logging.getLogger(__name__) + _pinned_objects = [] PINNED_OBJECT_PREFIX = "ray.tune.PinnedObject:" @@ -36,6 +40,28 @@ def get_pinned_object(pinned_id): ObjectID(base64.b64decode(pinned_id[len(PINNED_OBJECT_PREFIX):])))) +class warn_if_slow(object): + """Prints a warning if a given operation is slower than 100ms. + + Example: + >>> with warn_if_slow("some_operation"): + ... ray.get(something) + """ + + def __init__(self, name): + self.name = name + + def __enter__(self): + self.start = time.time() + + def __exit__(self, type, value, traceback): + now = time.time() + if now - self.start > 0.1: + logger.warning("The `{}` operation took {} seconds to complete, ". + format(self.name, now - self.start) + + "which may be a performance bottleneck.") + + def merge_dicts(d1, d2): """Returns a new dict that is d1 and d2 deep merged.""" merged = copy.deepcopy(d1)