From fcbc6dea478074d4c00b185bc392d6dc55b501a2 Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Sat, 24 Nov 2018 20:00:27 -0800 Subject: [PATCH 01/13] Tests and better recovery handling --- python/ray/tune/ray_trial_executor.py | 28 +++++-- .../ray/tune/test/ray_trial_executor_test.py | 12 ++- python/ray/tune/test/trial_runner_test.py | 73 ++++++++++++++++--- python/ray/tune/trial.py | 8 +- python/ray/tune/trial_executor.py | 29 +------- python/ray/tune/trial_runner.py | 70 +++++++++++++++--- python/ray/tune/tune.py | 3 +- 7 files changed, 162 insertions(+), 61 deletions(-) diff --git a/python/ray/tune/ray_trial_executor.py b/python/ray/tune/ray_trial_executor.py index 7b725e05f342..dbee770b44cd 100644 --- a/python/ray/tune/ray_trial_executor.py +++ b/python/ray/tune/ray_trial_executor.py @@ -110,25 +110,39 @@ def _stop_trial(self, trial, error=False, error_msg=None, if stop_logger: trial.close_logger() - def start_trial(self, trial, checkpoint_obj=None): - """Starts the trial.""" + def start_trial(self, trial, checkpoint=None, raise_on_failure=False): + """Starts the trial. + + Will not return resources if trial repeatedly fails on start. + + Args: + trial (Trial): Trial to be started. + checkpoint(Checkpoint): A Python object or path storing the state + of trial. + raise_on_failure (bool): To raise exception on failure in starting. + + Raises: + Exception after 1 retries if `raise_on_failure` is True. + """ self._commit_resources(trial.resources) try: - self._start_trial(trial, checkpoint_obj) + self._start_trial(trial, checkpoint) except Exception: logger.exception("Error stopping runner - retrying...") error_msg = traceback.format_exc() time.sleep(2) self._stop_trial(trial, error=True, error_msg=error_msg) try: - self._start_trial(trial) - except Exception: + self._start_trial(trial, checkpoint) + except Exception as exc: logger.exception("Error starting runner, aborting!") error_msg = traceback.format_exc() self._stop_trial(trial, error=True, error_msg=error_msg) # note that we don't return the resources, since they may # have been lost + if raise_on_failure: + raise exc def _find_item(self, dictionary, item): out = [rid for rid, t in dictionary.items() if t is item] @@ -140,11 +154,13 @@ def stop_trial(self, trial, error=False, error_msg=None, stop_logger=True): self._stop_trial( trial, error=error, error_msg=error_msg, stop_logger=stop_logger) if prior_status == Trial.RUNNING: - self._return_resources(trial.resources) out = self._find_item(self._running, trial) for result_id in out: self._running.pop(result_id) + logger.debug("Returning resources for this trial.") + self._return_resources(trial.resources) + def continue_training(self, trial): """Continues the training of this trial.""" diff --git a/python/ray/tune/test/ray_trial_executor_test.py b/python/ray/tune/test/ray_trial_executor_test.py index 35c413e717bb..8e6ef765cee3 100644 --- a/python/ray/tune/test/ray_trial_executor_test.py +++ b/python/ray/tune/test/ray_trial_executor_test.py @@ -9,8 +9,9 @@ from ray.rllib import _register_all from ray.tune import Trainable from ray.tune.ray_trial_executor import RayTrialExecutor +from ray.tune.registry import _global_registry, TRAINABLE_CLASS from ray.tune.suggest import BasicVariantGenerator -from ray.tune.trial import Trial, Checkpoint +from ray.tune.trial import Trial, Checkpoint, Resources class RayTrialExecutorTest(unittest.TestCase): @@ -50,6 +51,15 @@ def testPauseResume(self): self.trial_executor.stop_trial(trial) self.assertEqual(Trial.TERMINATED, trial.status) + def testStartFailure(self): + _global_registry.register(TRAINABLE_CLASS, "asdf", None) + trial = Trial("asdf", resources=Resources(1, 0)) + self.trial_executor.start_trial(trial) + self.assertEqual(Trial.ERROR, trial.status) + self.assertRaises( + Exception, lambda: self.trial_executor.start_trial( + trial, raise_on_error=True)) + def testPauseResume2(self): """Tests that pausing works for trials being processed.""" trial = Trial("__fake") diff --git a/python/ray/tune/test/trial_runner_test.py b/python/ray/tune/test/trial_runner_test.py index 6b142d354ec7..6af30e87fd3a 100644 --- a/python/ray/tune/test/trial_runner_test.py +++ b/python/ray/tune/test/trial_runner_test.py @@ -5,6 +5,7 @@ import os import time import unittest +from unittest import mock import ray from ray.rllib import _register_all @@ -845,6 +846,23 @@ def testMaxConcurrentSuggestions(self): self.assertEqual(len(searcher.next_trials()), 0) +def create_mock_components(): + + class _MockScheduler(FIFOScheduler): + errored_trials = [] + def on_trial_error(self, trial_runner, trial): + self.errored_trials += [trial] + + class _MockSearchAlg(BasicVariantGenerator): + errored_trials = [] + def on_trial_complete(self, trial_id, error=False, **kwargs): + if error: + self.errored_trials += [trial_id] + + searchalg = _MockSearchAlg() + scheduler = _MockScheduler() + return searchalg, scheduler + class TrialRunnerTest(unittest.TestCase): def tearDown(self): ray.shutdown() @@ -889,16 +907,6 @@ def train(config, reporter): self.assertLessEqual(len(trial.logdir), 200) trial_executor.stop_trial(trial) - def testTrialErrorOnStart(self): - ray.init() - trial_executor = RayTrialExecutor() - _global_registry.register(TRAINABLE_CLASS, "asdf", None) - trial = Trial("asdf", resources=Resources(1, 0)) - try: - trial_executor.start_trial(trial) - except Exception as e: - self.assertIn("a class", str(e)) - def testExtraResources(self): ray.init(num_cpus=4, num_gpus=2) runner = TrialRunner(BasicVariantGenerator()) @@ -1055,7 +1063,9 @@ def testThrowOnOverstep(self): def testFailureRecoveryDisabled(self): ray.init(num_cpus=1, num_gpus=1) - runner = TrialRunner(BasicVariantGenerator()) + searchalg, scheduler = create_mock_components() + + runner = TrialRunner(searchalg, scheduler=scheduler) kwargs = { "resources": Resources(cpu=1, gpu=1), "checkpoint_freq": 1, @@ -1074,10 +1084,15 @@ def testFailureRecoveryDisabled(self): runner.step() self.assertEqual(trials[0].status, Trial.ERROR) self.assertEqual(trials[0].num_failures, 1) + self.assertEqual(len(searchalg.errored_trials), 1) + self.assertEqual(len(scheduler.errored_trials), 1) def testFailureRecoveryEnabled(self): ray.init(num_cpus=1, num_gpus=1) - runner = TrialRunner(BasicVariantGenerator()) + searchalg, scheduler = create_mock_components() + + runner = TrialRunner(searchalg, scheduler=scheduler) + kwargs = { "resources": Resources(cpu=1, gpu=1), "checkpoint_freq": 1, @@ -1098,6 +1113,40 @@ def testFailureRecoveryEnabled(self): self.assertEqual(trials[0].num_failures, 1) runner.step() self.assertEqual(trials[0].status, Trial.RUNNING) + self.assertEqual(len(searchalg.errored_trials), 0) + self.assertEqual(len(scheduler.errored_trials), 0) + + def testFailureRecoveryNodeRemoval(self): + ray.init(num_cpus=1, num_gpus=1) + searchalg, scheduler = create_mock_components() + + runner = TrialRunner(searchalg, scheduler=scheduler) + + kwargs = { + "resources": Resources(cpu=1, gpu=1), + "checkpoint_freq": 1, + "max_failures": 1, + "config": { + "mock_error": True, + }, + } + runner.add_trial(Trial("__fake", **kwargs)) + trials = runner.get_trials() + + with mock.patch('ray.global_state.cluster_resources') as res_mock: + res_mock.return_value = {"CPU": 1, "GPU": 1} + runner.step() + self.assertEqual(trials[0].status, Trial.RUNNING) + runner.step() + self.assertEqual(trials[0].status, Trial.RUNNING) + + # Mimic a node failure + res_mock.return_value = {"CPU": 0, "GPU": 0} + runner.step() + self.assertEqual(trials[0].status, Trial.PENDING) + self.assertEqual(trials[0].num_failures, 1) + self.assertEqual(len(searchalg.errored_trials), 0) + self.assertEqual(len(scheduler.errored_trials), 1) def testFailureRecoveryMaxFailures(self): ray.init(num_cpus=1, num_gpus=1) diff --git a/python/ray/tune/trial.py b/python/ray/tune/trial.py index 65683eeb53c7..83da6e0eff39 100644 --- a/python/ray/tune/trial.py +++ b/python/ray/tune/trial.py @@ -281,10 +281,12 @@ def has_checkpoint(self): def should_recover(self): """Returns whether the trial qualifies for restoring. - This is if a checkpoint frequency is set, which includes settings - where there may not yet be a checkpoint. + This is if a checkpoint frequency is set and has not failed more than + max_failures. This may return true even when there may not yet + be a checkpoint. """ - return self.checkpoint_freq > 0 + return (self.checkpoint_freq > 0 + and self.num_failures < self.max_failures) def update_last_result(self, result, terminate=False): if terminate: diff --git a/python/ray/tune/trial_executor.py b/python/ray/tune/trial_executor.py index e0b541218bf1..d67e7705a1b1 100644 --- a/python/ray/tune/trial_executor.py +++ b/python/ray/tune/trial_executor.py @@ -31,15 +31,14 @@ def has_resources(self, resources): raise NotImplementedError("Subclasses of TrialExecutor must provide " "has_resources() method") - def start_trial(self, trial, checkpoint=None): - """Starts the trial restoring from checkpoint if checkpoint != None. - - If an error is encountered when starting the trial, an exception will - be thrown. + def start_trial(self, trial, checkpoint=None, raise_on_failure=False): + """Starts the trial restoring from checkpoint if checkpoint is provided. Args: + trial (Trial): Trial to be started. checkpoint(Checkpoint): A Python object or path storing the state of trial. + raise_on_failure (bool): To raise exception on failure in starting. """ raise NotImplementedError("Subclasses of TrialExecutor must provide " "start_trial() method") @@ -59,26 +58,6 @@ def stop_trial(self, trial, error=False, error_msg=None, stop_logger=True): raise NotImplementedError("Subclasses of TrialExecutor must provide " "stop_trial() method") - def restart_trial(self, trial, error_msg=None): - """Restarts or requeues the trial. - - The state of the trial should restore from the last checkpoint. Trial - is requeued if the cluster no longer has resources to accomodate it. - - Args: - error_msg (str): Optional error message. - """ - self.stop_trial( - trial, - error=error_msg is not None, - error_msg=error_msg, - stop_logger=False) - trial.result_logger.flush() - if self.has_resources(trial.resources): - self.start_trial(trial) - else: - trial.status = Trial.PENDING - def continue_training(self, trial): """Continues the training of this trial.""" pass diff --git a/python/ray/tune/trial_runner.py b/python/ray/tune/trial_runner.py index 98bbbcb71c64..53107cbe86ff 100644 --- a/python/ray/tune/trial_runner.py +++ b/python/ray/tune/trial_runner.py @@ -11,8 +11,8 @@ from ray.tune import TuneError from ray.tune.ray_trial_executor import RayTrialExecutor -from ray.tune.result import TIME_THIS_ITER_S -from ray.tune.trial import Trial +from ray.tune.result import TIME_THIS_ITER_S, DEFAULT_RESULTS_DIR +from ray.tune.trial import Trial, Checkpoint from ray.tune.schedulers import FIFOScheduler, TrialScheduler from ray.tune.web_server import TuneServer @@ -116,6 +116,11 @@ def step(self): self.trial_executor.start_trial(next_trial) elif self.trial_executor.get_running_trials(): self._process_events() + if self._checkpoint_freq: + if self._iteration % self._checkpoint_freq == 0: + self.save() + + self._iteration += 1 else: for trial in self._trials: if trial.status == Trial.PENDING: @@ -297,24 +302,65 @@ def _process_events(self): logger.exception("Error processing event.") error_msg = traceback.format_exc() if trial.status == Trial.RUNNING: - if trial.should_recover() and \ - trial.num_failures < trial.max_failures: - self._try_recover(trial, error_msg) + if trial.should_recover(): + self.try_recover(trial, error_msg) else: + self.trial_executor.stop_trial( + trial, + error=error_msg is not None, + error_msg=error_msg) self._scheduler_alg.on_trial_error(self, trial) self._search_alg.on_trial_complete( trial.trial_id, error=True) - self.trial_executor.stop_trial(trial, True, error_msg) - def _try_recover(self, trial, error_msg): + def _checkpoint_if_needed(self, trial): + """Checkpoints trial based off trial.last_result.""" + if trial.should_checkpoint(): + + # Save trial runtime if possible + if hasattr(trial, "runner") and trial.runner: + self.trial_executor.save(trial, storage=Checkpoint.DISK) + + try: + self._trial_checkpoints[trial] = pickle.dumps(trial) + except ValueError: + logger.exception("Error checkpointing full trial state.") + + def try_recover(self, trial, error_msg): + """Tries to recover trial. + + Notifies SearchAlgorithm and Scheduler if failure to recover. + """ try: - logger.info("Attempting to recover" - " trial state from last checkpoint.") - self.trial_executor.restart_trial(trial, error_msg) + self.trial_executor.stop_trial( + trial, + error=error_msg is not None, + error_msg=error_msg, + stop_logger=False) + trial.result_logger.flush() + if self.trial_executor.has_resources(trial.resources): + logger.info("Attempting to recover" + " trial state from last checkpoint.") + self.trial_executor.start_trial(trial, raise_on_failure=True) + else: + logger.debug("Notifying Scheduler and requeueing trial.") + self._requeue_trial(trial) except Exception: error_msg = traceback.format_exc() - logger.warning("Error recovering trial from checkpoint, abort.") - self.trial_executor.stop_trial(trial, True, error_msg=error_msg) + logger.exception("Error recovering trial from checkpoint, abort.") + self._scheduler_alg.on_trial_error(self, trial) + self._search_alg.on_trial_complete( + trial.trial_id, error=True) + + def _requeue_trial(self, trial): + """Notification to TrialScheduler and requeue trial. + + This does not notify the SearchAlgorithm because + the function evaluation is still in progress. + """ + self._scheduler_alg.on_trial_error(self, trial) + trial.status = Trial.PENDING + self._scheduler_alg.on_trial_add(self, trial) def _update_trial_queue(self, blocking=False, timeout=600): """Adds next trials to queue if possible. diff --git a/python/ray/tune/tune.py b/python/ray/tune/tune.py index 335660ecb836..c7aa6e560edc 100644 --- a/python/ray/tune/tune.py +++ b/python/ray/tune/tune.py @@ -8,6 +8,7 @@ from ray.tune.error import TuneError from ray.tune.suggest import BasicVariantGenerator from ray.tune.trial import Trial, DEBUG_PRINT_INTERVAL +from ray.tune.result import DEFAULT_RESULTS_DIR from ray.tune.log_sync import wait_for_log_sync from ray.tune.trial_runner import TrialRunner from ray.tune.schedulers import (HyperBandScheduler, AsyncHyperBandScheduler, @@ -90,8 +91,6 @@ def run_experiments(experiments=None, if search_alg is None: search_alg = BasicVariantGenerator() - search_alg.add_configurations(experiments) - runner = TrialRunner( search_alg, scheduler=scheduler, From 5d8e414af9fd70edaa8faef89aafd4a7d043a516 Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Sun, 25 Nov 2018 18:01:19 -0800 Subject: [PATCH 02/13] py2mock --- .travis/install-dependencies.sh | 4 ++-- python/ray/tune/test/trial_runner_test.py | 13 +++++++++---- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/.travis/install-dependencies.sh b/.travis/install-dependencies.sh index 293c1b8b6b04..3f1ea4922bc8 100755 --- a/.travis/install-dependencies.sh +++ b/.travis/install-dependencies.sh @@ -25,7 +25,7 @@ if [[ "$PYTHON" == "2.7" ]] && [[ "$platform" == "linux" ]]; then bash miniconda.sh -b -p $HOME/miniconda export PATH="$HOME/miniconda/bin:$PATH" pip install -q cython==0.27.3 cmake tensorflow gym opencv-python pyyaml pandas==0.22 requests \ - feather-format lxml openpyxl xlrd py-spy setproctitle faulthandler pytest-timeout + feather-format lxml openpyxl xlrd py-spy setproctitle faulthandler pytest-timeout mock elif [[ "$PYTHON" == "3.5" ]] && [[ "$platform" == "linux" ]]; then sudo apt-get update sudo apt-get install -y cmake pkg-config python-dev python-numpy build-essential autoconf curl libtool unzip @@ -51,7 +51,7 @@ elif [[ "$PYTHON" == "2.7" ]] && [[ "$platform" == "macosx" ]]; then bash miniconda.sh -b -p $HOME/miniconda export PATH="$HOME/miniconda/bin:$PATH" pip install -q cython==0.27.3 cmake tensorflow gym opencv-python pyyaml pandas==0.22 requests \ - feather-format lxml openpyxl xlrd py-spy setproctitle faulthandler pytest-timeout + feather-format lxml openpyxl xlrd py-spy setproctitle faulthandler pytest-timeout mock elif [[ "$PYTHON" == "3.5" ]] && [[ "$platform" == "macosx" ]]; then # check that brew is installed which -s brew diff --git a/python/ray/tune/test/trial_runner_test.py b/python/ray/tune/test/trial_runner_test.py index 6af30e87fd3a..2d40e8eeb90f 100644 --- a/python/ray/tune/test/trial_runner_test.py +++ b/python/ray/tune/test/trial_runner_test.py @@ -3,9 +3,9 @@ from __future__ import print_function import os +import sys import time import unittest -from unittest import mock import ray from ray.rllib import _register_all @@ -26,6 +26,11 @@ SuggestionAlgorithm) from ray.tune.suggest.variant_generator import RecursiveDependencyError +if sys.version_info >= (3, 3): + from unittest.mock import patch +else: + from mock import patch + class TrainableFunctionApiTest(unittest.TestCase): def setUp(self): @@ -1133,15 +1138,15 @@ def testFailureRecoveryNodeRemoval(self): runner.add_trial(Trial("__fake", **kwargs)) trials = runner.get_trials() - with mock.patch('ray.global_state.cluster_resources') as res_mock: - res_mock.return_value = {"CPU": 1, "GPU": 1} + with patch('ray.global_state.cluster_resources') as resource_mock: + resource_mock.return_value = {"CPU": 1, "GPU": 1} runner.step() self.assertEqual(trials[0].status, Trial.RUNNING) runner.step() self.assertEqual(trials[0].status, Trial.RUNNING) # Mimic a node failure - res_mock.return_value = {"CPU": 0, "GPU": 0} + resource_mock.return_value = {"CPU": 0, "GPU": 0} runner.step() self.assertEqual(trials[0].status, Trial.PENDING) self.assertEqual(trials[0].num_failures, 1) From 9137de0d7cc58215a283c24fe9ad47e37afec33c Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Mon, 26 Nov 2018 18:44:27 -0800 Subject: [PATCH 03/13] nit --- python/ray/tune/ray_trial_executor.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/python/ray/tune/ray_trial_executor.py b/python/ray/tune/ray_trial_executor.py index dbee770b44cd..ec360852391d 100644 --- a/python/ray/tune/ray_trial_executor.py +++ b/python/ray/tune/ray_trial_executor.py @@ -154,13 +154,12 @@ def stop_trial(self, trial, error=False, error_msg=None, stop_logger=True): self._stop_trial( trial, error=error, error_msg=error_msg, stop_logger=stop_logger) if prior_status == Trial.RUNNING: + logger.debug("Returning resources for this trial.") + self._return_resources(trial.resources) out = self._find_item(self._running, trial) for result_id in out: self._running.pop(result_id) - logger.debug("Returning resources for this trial.") - self._return_resources(trial.resources) - def continue_training(self, trial): """Continues the training of this trial.""" From 445372493b58446cb2018ed23834b803213cc5be Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Mon, 26 Nov 2018 17:26:06 -0800 Subject: [PATCH 04/13] Fix counting resources test --- python/ray/tune/test/cluster_tests.py | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/python/ray/tune/test/cluster_tests.py b/python/ray/tune/test/cluster_tests.py index f9425cc3e301..d95c884a1d44 100644 --- a/python/ray/tune/test/cluster_tests.py +++ b/python/ray/tune/test/cluster_tests.py @@ -86,24 +86,24 @@ def test_counting_resources(start_connected_cluster): """Tests that Tune accounting is consistent with actual cluster.""" cluster = start_connected_cluster - assert ray.global_state.cluster_resources()["CPU"] == 1 nodes = [] - nodes += [cluster.add_node(resources=dict(CPU=1))] - assert cluster.wait_for_nodes() - assert ray.global_state.cluster_resources()["CPU"] == 2 - + assert ray.global_state.cluster_resources()["CPU"] == 1 runner = TrialRunner(BasicVariantGenerator()) kwargs = {"stopping_criterion": {"training_iteration": 10}} - trials = [Trial("test", **kwargs), Trial("test", **kwargs)] + trials = [Trial("__fake", **kwargs), Trial("__fake", **kwargs)] for t in trials: runner.add_trial(t) runner.step() # run 1 + nodes += [cluster.add_node(resources=dict(CPU=1))] + assert cluster.wait_for_nodes() + assert ray.global_state.cluster_resources()["CPU"] == 2 cluster.remove_node(nodes.pop()) assert cluster.wait_for_nodes() assert ray.global_state.cluster_resources()["CPU"] == 1 runner.step() # run 2 + assert sum(t.status == Trial.RUNNING for t in runner.get_trials()) == 1 for i in range(5): nodes += [cluster.add_node(resources=dict(CPU=1))] @@ -111,12 +111,7 @@ def test_counting_resources(start_connected_cluster): assert ray.global_state.cluster_resources()["CPU"] == 6 runner.step() # 1 result - - for i in range(5): - node = nodes.pop() - cluster.remove_node(node) - assert cluster.wait_for_nodes() - assert ray.global_state.cluster_resources()["CPU"] == 1 + assert sum(t.status == Trial.RUNNING for t in runner.get_trials()) == 2 @pytest.mark.skip("Add this test once reconstruction is fixed") From 5a2449970880a640386c4fff8329642509754ae5 Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Mon, 26 Nov 2018 18:54:17 -0800 Subject: [PATCH 05/13] Remove extraneous changes --- python/ray/tune/trial.py | 10 ++++++---- python/ray/tune/trial_runner.py | 17 ++--------------- 2 files changed, 8 insertions(+), 19 deletions(-) diff --git a/python/ray/tune/trial.py b/python/ray/tune/trial.py index 83da6e0eff39..b6bdbd1d755b 100644 --- a/python/ray/tune/trial.py +++ b/python/ray/tune/trial.py @@ -216,17 +216,19 @@ def should_stop(self, result): return False - def should_checkpoint(self, result): + def should_checkpoint(self): """Whether this trial is due for checkpointing.""" + result = self.last_result or {} if result.get(DONE) and self.checkpoint_at_end: return True - if not self.checkpoint_freq: + if self.checkpoint_freq: + return result.get( + TRAINING_ITERATION, 0) % self.checkpoint_freq == 0 + else: return False - return self.last_result[TRAINING_ITERATION] % self.checkpoint_freq == 0 - def progress_string(self): """Returns a progress message for printing out to the console.""" diff --git a/python/ray/tune/trial_runner.py b/python/ray/tune/trial_runner.py index 53107cbe86ff..d77da2b85b8c 100644 --- a/python/ray/tune/trial_runner.py +++ b/python/ray/tune/trial_runner.py @@ -116,11 +116,6 @@ def step(self): self.trial_executor.start_trial(next_trial) elif self.trial_executor.get_running_trials(): self._process_events() - if self._checkpoint_freq: - if self._iteration % self._checkpoint_freq == 0: - self.save() - - self._iteration += 1 else: for trial in self._trials: if trial.status == Trial.PENDING: @@ -283,17 +278,14 @@ def _process_events(self): result, terminate=(decision == TrialScheduler.STOP)) if decision == TrialScheduler.CONTINUE: - if trial.should_checkpoint(result): - # TODO(rliaw): This is a blocking call - self.trial_executor.save(trial) + self._checkpoint_if_needed(trial) self.trial_executor.continue_training(trial) elif decision == TrialScheduler.PAUSE: self.trial_executor.pause_trial(trial) elif decision == TrialScheduler.STOP: # Checkpoint before ending the trial # if checkpoint_at_end experiment option is set to True - if trial.should_checkpoint(result): - self.trial_executor.save(trial) + self._checkpoint_if_needed(trial) self.trial_executor.stop_trial(trial) else: assert False, "Invalid scheduling decision: {}".format( @@ -321,11 +313,6 @@ def _checkpoint_if_needed(self, trial): if hasattr(trial, "runner") and trial.runner: self.trial_executor.save(trial, storage=Checkpoint.DISK) - try: - self._trial_checkpoints[trial] = pickle.dumps(trial) - except ValueError: - logger.exception("Error checkpointing full trial state.") - def try_recover(self, trial, error_msg): """Tries to recover trial. From b750d4efdfada19704951756f96433d470d667b6 Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Mon, 26 Nov 2018 19:04:11 -0800 Subject: [PATCH 06/13] docs --- python/ray/tune/ray_trial_executor.py | 2 +- python/ray/tune/trial_runner.py | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/python/ray/tune/ray_trial_executor.py b/python/ray/tune/ray_trial_executor.py index ec360852391d..3e460e80dc0a 100644 --- a/python/ray/tune/ray_trial_executor.py +++ b/python/ray/tune/ray_trial_executor.py @@ -117,7 +117,7 @@ def start_trial(self, trial, checkpoint=None, raise_on_failure=False): Args: trial (Trial): Trial to be started. - checkpoint(Checkpoint): A Python object or path storing the state + checkpoint (Checkpoint): A Python object or path storing the state of trial. raise_on_failure (bool): To raise exception on failure in starting. diff --git a/python/ray/tune/trial_runner.py b/python/ray/tune/trial_runner.py index d77da2b85b8c..a3aa25bf0024 100644 --- a/python/ray/tune/trial_runner.py +++ b/python/ray/tune/trial_runner.py @@ -317,6 +317,10 @@ def try_recover(self, trial, error_msg): """Tries to recover trial. Notifies SearchAlgorithm and Scheduler if failure to recover. + + Args: + trial (Trial): Trial to recover. + error_msg (str): Error message from prior to invoking this method. """ try: self.trial_executor.stop_trial( From 14da6ec51f50f9205a70b79bd5c20d478e97be7c Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Sun, 25 Nov 2018 00:50:01 -0800 Subject: [PATCH 07/13] yapf --- python/ray/tune/trial.py | 4 ++-- python/ray/tune/trial_runner.py | 3 +-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/python/ray/tune/trial.py b/python/ray/tune/trial.py index b6bdbd1d755b..d3a4e8145103 100644 --- a/python/ray/tune/trial.py +++ b/python/ray/tune/trial.py @@ -224,8 +224,8 @@ def should_checkpoint(self): return True if self.checkpoint_freq: - return result.get( - TRAINING_ITERATION, 0) % self.checkpoint_freq == 0 + return result.get(TRAINING_ITERATION, + 0) % self.checkpoint_freq == 0 else: return False diff --git a/python/ray/tune/trial_runner.py b/python/ray/tune/trial_runner.py index a3aa25bf0024..20a2bac2d213 100644 --- a/python/ray/tune/trial_runner.py +++ b/python/ray/tune/trial_runner.py @@ -340,8 +340,7 @@ def try_recover(self, trial, error_msg): error_msg = traceback.format_exc() logger.exception("Error recovering trial from checkpoint, abort.") self._scheduler_alg.on_trial_error(self, trial) - self._search_alg.on_trial_complete( - trial.trial_id, error=True) + self._search_alg.on_trial_complete(trial.trial_id, error=True) def _requeue_trial(self, trial): """Notification to TrialScheduler and requeue trial. From 394c0e941931eb8781b34631f967ef094342c77e Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Mon, 26 Nov 2018 19:22:22 -0800 Subject: [PATCH 08/13] Lint and small changes to tests --- python/ray/tune/test/cluster_tests.py | 48 +++++++++++++---------- python/ray/tune/test/trial_runner_test.py | 4 +- 2 files changed, 31 insertions(+), 21 deletions(-) diff --git a/python/ray/tune/test/cluster_tests.py b/python/ray/tune/test/cluster_tests.py index d95c884a1d44..348de35f7b09 100644 --- a/python/ray/tune/test/cluster_tests.py +++ b/python/ray/tune/test/cluster_tests.py @@ -10,23 +10,28 @@ except ImportError: pytest_timeout = None -from ray.test.cluster_utils import Cluster import ray from ray import tune +from ray.rllib import _register_all +from ray.test.cluster_utils import Cluster from ray.tune.error import TuneError from ray.tune.trial import Trial from ray.tune.trial_runner import TrialRunner from ray.tune.suggest import BasicVariantGenerator -def register_test_trainable(): - class _Train(tune.Trainable): +def register_fail_trainable(): + class _Fail(tune.Trainable): + """Fails on the 4th iteration.""" + def _setup(self, config): - self.state = {"hi": 1} + self.state = {"hi": 0} def _train(self): self.state["hi"] += 1 time.sleep(0.5) + if self.state["hi"] >= 4: + assert False return {} def _save(self, path): @@ -35,13 +40,10 @@ def _save(self, path): def _restore(self, state): self.state = state - tune.register_trainable("test", _Train) + tune.register_trainable("test", _Fail) -@pytest.fixture -def start_connected_cluster(): - # Start the Ray processes. - +def _start_new_cluster(): cluster = Cluster( initialize_head=True, connect=True, @@ -51,7 +53,15 @@ def start_connected_cluster(): "num_heartbeats_timeout": 10 }) }) - register_test_trainable() + # Pytest doesn't play nicely with imports + _register_all() + return cluster + + +@pytest.fixture +def start_connected_cluster(): + # Start the Ray processes. + cluster = _start_new_cluster() yield cluster # The code after the yield will run as teardown code. ray.shutdown() @@ -71,17 +81,14 @@ def start_connected_emptyhead_cluster(): "num_heartbeats_timeout": 10 }) }) - register_test_trainable() + # Pytest doesn't play nicely with imports + _register_all() yield cluster # The code after the yield will run as teardown code. ray.shutdown() cluster.shutdown() -@pytest.mark.skipif( - pytest_timeout is None, - reason="Timeout package not installed; skipping test.") -@pytest.mark.timeout(10, method="thread") def test_counting_resources(start_connected_cluster): """Tests that Tune accounting is consistent with actual cluster.""" @@ -128,7 +135,7 @@ def test_remove_node_before_result(start_connected_cluster): runner = TrialRunner(BasicVariantGenerator()) kwargs = {"stopping_criterion": {"training_iteration": 3}} - trials = [Trial("test", **kwargs), Trial("test", **kwargs)] + trials = [Trial("__fake", **kwargs), Trial("__fake", **kwargs)] for t in trials: runner.add_trial(t) @@ -174,7 +181,7 @@ def test_trial_migration(start_connected_emptyhead_cluster): } # Test recovery of trial that hasn't been checkpointed - t = Trial("test", **kwargs) + t = Trial("__fake", **kwargs) runner.add_trial(t) runner.step() # start runner.step() # 1 result @@ -194,7 +201,7 @@ def test_trial_migration(start_connected_emptyhead_cluster): assert t.status == Trial.TERMINATED # Test recovery of trial that has been checkpointed - t2 = Trial("test", **kwargs) + t2 = Trial("__fake", **kwargs) runner.add_trial(t2) runner.step() # start runner.step() # 1 result @@ -211,7 +218,7 @@ def test_trial_migration(start_connected_emptyhead_cluster): assert t2.status == Trial.TERMINATED # Test recovery of trial that won't be checkpointed - t3 = Trial("test", **{"stopping_criterion": {"training_iteration": 3}}) + t3 = Trial("__fake", **{"stopping_criterion": {"training_iteration": 3}}) runner.add_trial(t3) runner.step() # start runner.step() # 1 result @@ -233,6 +240,7 @@ def test_trial_requeue(start_connected_emptyhead_cluster): """Removing a node in full cluster causes Trial to be requeued.""" cluster = start_connected_emptyhead_cluster node = cluster.add_node(resources=dict(CPU=1)) + assert cluster.wait_for_nodes() runner = TrialRunner(BasicVariantGenerator()) kwargs = { @@ -243,7 +251,7 @@ def test_trial_requeue(start_connected_emptyhead_cluster): "max_failures": 1 } - trials = [Trial("test", **kwargs), Trial("test", **kwargs)] + trials = [Trial("__fake", **kwargs), Trial("__fake", **kwargs)] for t in trials: runner.add_trial(t) diff --git a/python/ray/tune/test/trial_runner_test.py b/python/ray/tune/test/trial_runner_test.py index 2d40e8eeb90f..8e4aa2cea148 100644 --- a/python/ray/tune/test/trial_runner_test.py +++ b/python/ray/tune/test/trial_runner_test.py @@ -852,14 +852,15 @@ def testMaxConcurrentSuggestions(self): def create_mock_components(): - class _MockScheduler(FIFOScheduler): errored_trials = [] + def on_trial_error(self, trial_runner, trial): self.errored_trials += [trial] class _MockSearchAlg(BasicVariantGenerator): errored_trials = [] + def on_trial_complete(self, trial_id, error=False, **kwargs): if error: self.errored_trials += [trial_id] @@ -868,6 +869,7 @@ def on_trial_complete(self, trial_id, error=False, **kwargs): scheduler = _MockScheduler() return searchalg, scheduler + class TrialRunnerTest(unittest.TestCase): def tearDown(self): ray.shutdown() From 48fd3c3fcbf0e6fbd1fd559852095d12ab157a54 Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Mon, 26 Nov 2018 19:23:45 -0800 Subject: [PATCH 09/13] lint --- python/ray/tune/trial_runner.py | 2 +- python/ray/tune/tune.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/python/ray/tune/trial_runner.py b/python/ray/tune/trial_runner.py index 20a2bac2d213..d36bc1dec907 100644 --- a/python/ray/tune/trial_runner.py +++ b/python/ray/tune/trial_runner.py @@ -11,7 +11,7 @@ from ray.tune import TuneError from ray.tune.ray_trial_executor import RayTrialExecutor -from ray.tune.result import TIME_THIS_ITER_S, DEFAULT_RESULTS_DIR +from ray.tune.result import TIME_THIS_ITER_S from ray.tune.trial import Trial, Checkpoint from ray.tune.schedulers import FIFOScheduler, TrialScheduler from ray.tune.web_server import TuneServer diff --git a/python/ray/tune/tune.py b/python/ray/tune/tune.py index c7aa6e560edc..7840fa4fbdc2 100644 --- a/python/ray/tune/tune.py +++ b/python/ray/tune/tune.py @@ -8,7 +8,6 @@ from ray.tune.error import TuneError from ray.tune.suggest import BasicVariantGenerator from ray.tune.trial import Trial, DEBUG_PRINT_INTERVAL -from ray.tune.result import DEFAULT_RESULTS_DIR from ray.tune.log_sync import wait_for_log_sync from ray.tune.trial_runner import TrialRunner from ray.tune.schedulers import (HyperBandScheduler, AsyncHyperBandScheduler, From bcf40513722b34a60206f00513a1bb074fbf6896 Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Mon, 26 Nov 2018 19:27:23 -0800 Subject: [PATCH 10/13] nit --- python/ray/tune/tune.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/ray/tune/tune.py b/python/ray/tune/tune.py index 7840fa4fbdc2..335660ecb836 100644 --- a/python/ray/tune/tune.py +++ b/python/ray/tune/tune.py @@ -90,6 +90,8 @@ def run_experiments(experiments=None, if search_alg is None: search_alg = BasicVariantGenerator() + search_alg.add_configurations(experiments) + runner = TrialRunner( search_alg, scheduler=scheduler, From 0f67265a9fd6c1c39525a5c092f5eb29f946db1d Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Mon, 26 Nov 2018 20:52:16 -0800 Subject: [PATCH 11/13] small extraneous removals --- python/ray/tune/test/cluster_tests.py | 25 ------------------------- python/ray/tune/trial_runner.py | 7 ++----- 2 files changed, 2 insertions(+), 30 deletions(-) diff --git a/python/ray/tune/test/cluster_tests.py b/python/ray/tune/test/cluster_tests.py index 348de35f7b09..59f12181b8ff 100644 --- a/python/ray/tune/test/cluster_tests.py +++ b/python/ray/tune/test/cluster_tests.py @@ -3,7 +3,6 @@ from __future__ import print_function import json -import time import pytest try: import pytest_timeout @@ -11,7 +10,6 @@ pytest_timeout = None import ray -from ray import tune from ray.rllib import _register_all from ray.test.cluster_utils import Cluster from ray.tune.error import TuneError @@ -20,29 +18,6 @@ from ray.tune.suggest import BasicVariantGenerator -def register_fail_trainable(): - class _Fail(tune.Trainable): - """Fails on the 4th iteration.""" - - def _setup(self, config): - self.state = {"hi": 0} - - def _train(self): - self.state["hi"] += 1 - time.sleep(0.5) - if self.state["hi"] >= 4: - assert False - return {} - - def _save(self, path): - return self.state - - def _restore(self, state): - self.state = state - - tune.register_trainable("test", _Fail) - - def _start_new_cluster(): cluster = Cluster( initialize_head=True, diff --git a/python/ray/tune/trial_runner.py b/python/ray/tune/trial_runner.py index d36bc1dec907..b3ac14b98bc1 100644 --- a/python/ray/tune/trial_runner.py +++ b/python/ray/tune/trial_runner.py @@ -297,18 +297,15 @@ def _process_events(self): if trial.should_recover(): self.try_recover(trial, error_msg) else: - self.trial_executor.stop_trial( - trial, - error=error_msg is not None, - error_msg=error_msg) self._scheduler_alg.on_trial_error(self, trial) self._search_alg.on_trial_complete( trial.trial_id, error=True) + self.trial_executor.stop_trial( + trial, error=True, error_msg=error_msg) def _checkpoint_if_needed(self, trial): """Checkpoints trial based off trial.last_result.""" if trial.should_checkpoint(): - # Save trial runtime if possible if hasattr(trial, "runner") and trial.runner: self.trial_executor.save(trial, storage=Checkpoint.DISK) From 8e24c36ab484020bb1c5f5337a615764145b0f54 Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Fri, 30 Nov 2018 12:17:30 -0800 Subject: [PATCH 12/13] Removed error raising --- python/ray/tune/ray_trial_executor.py | 8 +------- python/ray/tune/test/ray_trial_executor_test.py | 3 --- python/ray/tune/trial_executor.py | 3 +-- python/ray/tune/trial_runner.py | 5 +++-- 4 files changed, 5 insertions(+), 14 deletions(-) diff --git a/python/ray/tune/ray_trial_executor.py b/python/ray/tune/ray_trial_executor.py index 3e460e80dc0a..e9129c0b2101 100644 --- a/python/ray/tune/ray_trial_executor.py +++ b/python/ray/tune/ray_trial_executor.py @@ -110,7 +110,7 @@ def _stop_trial(self, trial, error=False, error_msg=None, if stop_logger: trial.close_logger() - def start_trial(self, trial, checkpoint=None, raise_on_failure=False): + def start_trial(self, trial, checkpoint=None): """Starts the trial. Will not return resources if trial repeatedly fails on start. @@ -119,10 +119,6 @@ def start_trial(self, trial, checkpoint=None, raise_on_failure=False): trial (Trial): Trial to be started. checkpoint (Checkpoint): A Python object or path storing the state of trial. - raise_on_failure (bool): To raise exception on failure in starting. - - Raises: - Exception after 1 retries if `raise_on_failure` is True. """ self._commit_resources(trial.resources) @@ -141,8 +137,6 @@ def start_trial(self, trial, checkpoint=None, raise_on_failure=False): self._stop_trial(trial, error=True, error_msg=error_msg) # note that we don't return the resources, since they may # have been lost - if raise_on_failure: - raise exc def _find_item(self, dictionary, item): out = [rid for rid, t in dictionary.items() if t is item] diff --git a/python/ray/tune/test/ray_trial_executor_test.py b/python/ray/tune/test/ray_trial_executor_test.py index 8e6ef765cee3..86c4bb189595 100644 --- a/python/ray/tune/test/ray_trial_executor_test.py +++ b/python/ray/tune/test/ray_trial_executor_test.py @@ -56,9 +56,6 @@ def testStartFailure(self): trial = Trial("asdf", resources=Resources(1, 0)) self.trial_executor.start_trial(trial) self.assertEqual(Trial.ERROR, trial.status) - self.assertRaises( - Exception, lambda: self.trial_executor.start_trial( - trial, raise_on_error=True)) def testPauseResume2(self): """Tests that pausing works for trials being processed.""" diff --git a/python/ray/tune/trial_executor.py b/python/ray/tune/trial_executor.py index d67e7705a1b1..063129780b47 100644 --- a/python/ray/tune/trial_executor.py +++ b/python/ray/tune/trial_executor.py @@ -31,14 +31,13 @@ def has_resources(self, resources): raise NotImplementedError("Subclasses of TrialExecutor must provide " "has_resources() method") - def start_trial(self, trial, checkpoint=None, raise_on_failure=False): + def start_trial(self, trial, checkpoint=None): """Starts the trial restoring from checkpoint if checkpoint is provided. Args: trial (Trial): Trial to be started. checkpoint(Checkpoint): A Python object or path storing the state of trial. - raise_on_failure (bool): To raise exception on failure in starting. """ raise NotImplementedError("Subclasses of TrialExecutor must provide " "start_trial() method") diff --git a/python/ray/tune/trial_runner.py b/python/ray/tune/trial_runner.py index b3ac14b98bc1..ddc50653b3ef 100644 --- a/python/ray/tune/trial_runner.py +++ b/python/ray/tune/trial_runner.py @@ -329,12 +329,13 @@ def try_recover(self, trial, error_msg): if self.trial_executor.has_resources(trial.resources): logger.info("Attempting to recover" " trial state from last checkpoint.") - self.trial_executor.start_trial(trial, raise_on_failure=True) + self.trial_executor.start_trial(trial) + if trial.status == Trial.ERROR: + raise RuntimeError("Trial did not start correctly.") else: logger.debug("Notifying Scheduler and requeueing trial.") self._requeue_trial(trial) except Exception: - error_msg = traceback.format_exc() logger.exception("Error recovering trial from checkpoint, abort.") self._scheduler_alg.on_trial_error(self, trial) self._search_alg.on_trial_complete(trial.trial_id, error=True) From 5cbc97ff57bf658537d661263e596ce2d66c474f Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Fri, 30 Nov 2018 12:27:38 -0800 Subject: [PATCH 13/13] try recover --- python/ray/tune/ray_trial_executor.py | 2 +- python/ray/tune/trial_runner.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/python/ray/tune/ray_trial_executor.py b/python/ray/tune/ray_trial_executor.py index e9129c0b2101..6b107b17c82f 100644 --- a/python/ray/tune/ray_trial_executor.py +++ b/python/ray/tune/ray_trial_executor.py @@ -131,7 +131,7 @@ def start_trial(self, trial, checkpoint=None): self._stop_trial(trial, error=True, error_msg=error_msg) try: self._start_trial(trial, checkpoint) - except Exception as exc: + except Exception: logger.exception("Error starting runner, aborting!") error_msg = traceback.format_exc() self._stop_trial(trial, error=True, error_msg=error_msg) diff --git a/python/ray/tune/trial_runner.py b/python/ray/tune/trial_runner.py index b70e87dbf975..84457ff8d9e9 100644 --- a/python/ray/tune/trial_runner.py +++ b/python/ray/tune/trial_runner.py @@ -296,7 +296,7 @@ def _process_events(self): error_msg = traceback.format_exc() if trial.status == Trial.RUNNING: if trial.should_recover(): - self.try_recover(trial, error_msg) + self._try_recover(trial, error_msg) else: self._scheduler_alg.on_trial_error(self, trial) self._search_alg.on_trial_complete( @@ -311,7 +311,7 @@ def _checkpoint_if_needed(self, trial): if hasattr(trial, "runner") and trial.runner: self.trial_executor.save(trial, storage=Checkpoint.DISK) - def try_recover(self, trial, error_msg): + def _try_recover(self, trial, error_msg): """Tries to recover trial. Notifies SearchAlgorithm and Scheduler if failure to recover.