diff --git a/.travis/install-dependencies.sh b/.travis/install-dependencies.sh index 293c1b8b6b04..3f1ea4922bc8 100755 --- a/.travis/install-dependencies.sh +++ b/.travis/install-dependencies.sh @@ -25,7 +25,7 @@ if [[ "$PYTHON" == "2.7" ]] && [[ "$platform" == "linux" ]]; then bash miniconda.sh -b -p $HOME/miniconda export PATH="$HOME/miniconda/bin:$PATH" pip install -q cython==0.27.3 cmake tensorflow gym opencv-python pyyaml pandas==0.22 requests \ - feather-format lxml openpyxl xlrd py-spy setproctitle faulthandler pytest-timeout + feather-format lxml openpyxl xlrd py-spy setproctitle faulthandler pytest-timeout mock elif [[ "$PYTHON" == "3.5" ]] && [[ "$platform" == "linux" ]]; then sudo apt-get update sudo apt-get install -y cmake pkg-config python-dev python-numpy build-essential autoconf curl libtool unzip @@ -51,7 +51,7 @@ elif [[ "$PYTHON" == "2.7" ]] && [[ "$platform" == "macosx" ]]; then bash miniconda.sh -b -p $HOME/miniconda export PATH="$HOME/miniconda/bin:$PATH" pip install -q cython==0.27.3 cmake tensorflow gym opencv-python pyyaml pandas==0.22 requests \ - feather-format lxml openpyxl xlrd py-spy setproctitle faulthandler pytest-timeout + feather-format lxml openpyxl xlrd py-spy setproctitle faulthandler pytest-timeout mock elif [[ "$PYTHON" == "3.5" ]] && [[ "$platform" == "macosx" ]]; then # check that brew is installed which -s brew diff --git a/python/ray/test/cluster_utils.py b/python/ray/test/cluster_utils.py index 41dc3b6cdd26..b88282aefae5 100644 --- a/python/ray/test/cluster_utils.py +++ b/python/ray/test/cluster_utils.py @@ -43,6 +43,7 @@ def __init__(self, if connect: redis_password = head_node_args.get("redis_password") output_info = ray.init( + ignore_reinit_error=True, redis_address=self.redis_address, redis_password=redis_password) logger.info(output_info) diff --git a/python/ray/tune/logger.py b/python/ray/tune/logger.py index f73aa4a1ef8c..87f7e026d892 100644 --- a/python/ray/tune/logger.py +++ b/python/ray/tune/logger.py @@ -71,19 +71,19 @@ def _init(self): self._log_syncer = get_syncer(self.logdir, self.uri) def on_result(self, result): - for logger in self._loggers: - logger.on_result(result) + for _logger in self._loggers: + _logger.on_result(result) self._log_syncer.set_worker_ip(result.get(NODE_IP)) self._log_syncer.sync_if_needed() def close(self): - for logger in self._loggers: - logger.close() + for _logger in self._loggers: + _logger.close() self._log_syncer.sync_now(force=True) def flush(self): - for logger in self._loggers: - logger.flush() + for _logger in self._loggers: + _logger.flush() self._log_syncer.sync_now(force=True) self._log_syncer.wait() @@ -99,7 +99,7 @@ def _init(self): with open(config_out, "w") as f: json.dump(self.config, f, sort_keys=True, cls=_SafeFallbackEncoder) local_file = os.path.join(self.logdir, "result.json") - self.local_out = open(local_file, "w") + self.local_out = open(local_file, "a") def on_result(self, result): json.dump(result, self, cls=_SafeFallbackEncoder) @@ -109,6 +109,9 @@ def write(self, b): self.local_out.write(b) self.local_out.flush() + def flush(self): + self.local_out.flush() + def close(self): self.local_out.close() @@ -128,6 +131,7 @@ def to_tf_values(result, path): class _TFLogger(Logger): def _init(self): + # TODO(rliaw): Implement a proper resume functionality for this. self._file_writer = tf.summary.FileWriter(self.logdir) def on_result(self, result): @@ -135,7 +139,8 @@ def on_result(self, result): for k in [ "config", "pid", "timestamp", TIME_TOTAL_S, TRAINING_ITERATION ]: - del tmp[k] # not useful to tf log these + if k in tmp: + del tmp[k] # not useful to tf log these values = to_tf_values(tmp, ["ray", "tune"]) train_stats = tf.Summary(value=values) t = result.get(TIMESTEPS_TOTAL) or result[TRAINING_ITERATION] @@ -158,15 +163,21 @@ class _VisKitLogger(Logger): def _init(self): """CSV outputted with Headers as first set of results.""" # Note that we assume params.json was already created by JsonLogger - self._file = open(os.path.join(self.logdir, "progress.csv"), "w") + progress_file = os.path.join(self.logdir, "progress.csv") + self._continuing = os.path.exists(progress_file) + self._file = open(progress_file, "a") self._csv_out = None def on_result(self, result): if self._csv_out is None: self._csv_out = csv.DictWriter(self._file, result.keys()) - self._csv_out.writeheader() + if not self._continuing: + self._csv_out.writeheader() self._csv_out.writerow(result.copy()) + def flush(self): + self._file.flush() + def close(self): self._file.close() diff --git a/python/ray/tune/ray_trial_executor.py b/python/ray/tune/ray_trial_executor.py index 7b725e05f342..19cff512144c 100644 --- a/python/ray/tune/ray_trial_executor.py +++ b/python/ray/tune/ray_trial_executor.py @@ -19,8 +19,8 @@ class RayTrialExecutor(TrialExecutor): """An implemention of TrialExecutor based on Ray.""" - def __init__(self, queue_trials=False): - super(RayTrialExecutor, self).__init__(queue_trials) + def __init__(self, queue_trials=False, track_checkpoints=False): + super(RayTrialExecutor, self).__init__(queue_trials, track_checkpoints) self._running = {} # Since trial resume after paused should not run # trial.train.remote(), thus no more new remote object id generated. @@ -60,7 +60,7 @@ def _train(self, trial): def _start_trial(self, trial, checkpoint=None): prior_status = trial.status - trial.status = Trial.RUNNING + self.set_status(trial, Trial.RUNNING) trial.runner = self._setup_runner(trial) if not self.restore(trial, checkpoint): return @@ -88,9 +88,9 @@ def _stop_trial(self, trial, error=False, error_msg=None, """ if error: - trial.status = Trial.ERROR + self.set_status(trial, Trial.ERROR) else: - trial.status = Trial.TERMINATED + self.set_status(trial, Trial.TERMINATED) try: trial.write_error_log(error_msg) @@ -103,32 +103,46 @@ def _stop_trial(self, trial, error=False, error_msg=None, stop_tasks, num_returns=2, timeout=250) except Exception: logger.exception("Error stopping runner.") - trial.status = Trial.ERROR + self.set_status(trial, Trial.ERROR) finally: trial.runner = None if stop_logger: trial.close_logger() - def start_trial(self, trial, checkpoint_obj=None): - """Starts the trial.""" + def start_trial(self, trial, checkpoint=None, raise_on_failure=False): + """Starts the trial. + + Will not return resources if trial repeatedly fails on start. + + Args: + trial (Trial): Trial to be started. + checkpoint (Checkpoint): A Python object or path storing the state + of trial. + raise_on_failure (bool): To raise exception on failure in starting. + + Raises: + Exception after 1 retries if `raise_on_failure` is True. + """ self._commit_resources(trial.resources) try: - self._start_trial(trial, checkpoint_obj) + self._start_trial(trial, checkpoint) except Exception: logger.exception("Error stopping runner - retrying...") error_msg = traceback.format_exc() time.sleep(2) self._stop_trial(trial, error=True, error_msg=error_msg) try: - self._start_trial(trial) - except Exception: + self._start_trial(trial, checkpoint) + except Exception as exc: logger.exception("Error starting runner, aborting!") error_msg = traceback.format_exc() self._stop_trial(trial, error=True, error_msg=error_msg) # note that we don't return the resources, since they may # have been lost + if raise_on_failure: + raise exc def _find_item(self, dictionary, item): out = [rid for rid, t in dictionary.items() if t is item] @@ -140,6 +154,7 @@ def stop_trial(self, trial, error=False, error_msg=None, stop_logger=True): self._stop_trial( trial, error=error, error_msg=error_msg, stop_logger=stop_logger) if prior_status == Trial.RUNNING: + logger.debug("Returning resources for this trial.") self._return_resources(trial.resources) out = self._find_item(self._running, trial) for result_id in out: @@ -293,7 +308,7 @@ def restore(self, trial, checkpoint=None): return True if trial.runner is None: logger.error("Unable to restore - no runner.") - trial.status = Trial.ERROR + self.set_status(trial, Trial.ERROR) return False try: value = checkpoint.value @@ -307,5 +322,5 @@ def restore(self, trial, checkpoint=None): return True except Exception: logger.exception("Error restoring runner.") - trial.status = Trial.ERROR + self.set_status(trial, Trial.ERROR) return False diff --git a/python/ray/tune/test/cluster_tests.py b/python/ray/tune/test/cluster_tests.py index f9425cc3e301..8019929d1593 100644 --- a/python/ray/tune/test/cluster_tests.py +++ b/python/ray/tune/test/cluster_tests.py @@ -2,31 +2,39 @@ from __future__ import division from __future__ import print_function +import inspect import json import time +import os import pytest try: import pytest_timeout except ImportError: pytest_timeout = None -from ray.test.cluster_utils import Cluster import ray from ray import tune +from ray.rllib import _register_all +from ray.test.cluster_utils import Cluster +from ray.test.test_utils import run_string_as_driver_nonblocking from ray.tune.error import TuneError from ray.tune.trial import Trial from ray.tune.trial_runner import TrialRunner from ray.tune.suggest import BasicVariantGenerator -def register_test_trainable(): - class _Train(tune.Trainable): +def register_fail_trainable(): + class _Fail(tune.Trainable): + """Fails on the 4th iteration.""" + def _setup(self, config): - self.state = {"hi": 1} + self.state = {"hi": 0} def _train(self): self.state["hi"] += 1 time.sleep(0.5) + if self.state["hi"] >= 4: + assert False return {} def _save(self, path): @@ -35,13 +43,10 @@ def _save(self, path): def _restore(self, state): self.state = state - tune.register_trainable("test", _Train) + tune.register_trainable("test", _Fail) -@pytest.fixture -def start_connected_cluster(): - # Start the Ray processes. - +def _start_new_cluster(): cluster = Cluster( initialize_head=True, connect=True, @@ -51,7 +56,15 @@ def start_connected_cluster(): "num_heartbeats_timeout": 10 }) }) - register_test_trainable() + # Pytest doesn't play nicely with imports + _register_all() + return cluster + + +@pytest.fixture +def start_connected_cluster(): + # Start the Ray processes. + cluster = _start_new_cluster() yield cluster # The code after the yield will run as teardown code. ray.shutdown() @@ -71,39 +84,35 @@ def start_connected_emptyhead_cluster(): "num_heartbeats_timeout": 10 }) }) - register_test_trainable() + # Pytest doesn't play nicely with imports + _register_all() yield cluster # The code after the yield will run as teardown code. ray.shutdown() cluster.shutdown() -@pytest.mark.skipif( - pytest_timeout is None, - reason="Timeout package not installed; skipping test.") -@pytest.mark.timeout(10, method="thread") def test_counting_resources(start_connected_cluster): """Tests that Tune accounting is consistent with actual cluster.""" - cluster = start_connected_cluster - assert ray.global_state.cluster_resources()["CPU"] == 1 nodes = [] - nodes += [cluster.add_node(resources=dict(CPU=1))] - assert cluster.wait_for_nodes() - assert ray.global_state.cluster_resources()["CPU"] == 2 - + assert ray.global_state.cluster_resources()["CPU"] == 1 runner = TrialRunner(BasicVariantGenerator()) kwargs = {"stopping_criterion": {"training_iteration": 10}} - trials = [Trial("test", **kwargs), Trial("test", **kwargs)] + trials = [Trial("__fake", **kwargs), Trial("__fake", **kwargs)] for t in trials: runner.add_trial(t) runner.step() # run 1 + nodes += [cluster.add_node(resources=dict(CPU=1))] + assert cluster.wait_for_nodes() + assert ray.global_state.cluster_resources()["CPU"] == 2 cluster.remove_node(nodes.pop()) assert cluster.wait_for_nodes() assert ray.global_state.cluster_resources()["CPU"] == 1 runner.step() # run 2 + assert sum(t.status == Trial.RUNNING for t in runner.get_trials()) == 1 for i in range(5): nodes += [cluster.add_node(resources=dict(CPU=1))] @@ -111,12 +120,7 @@ def test_counting_resources(start_connected_cluster): assert ray.global_state.cluster_resources()["CPU"] == 6 runner.step() # 1 result - - for i in range(5): - node = nodes.pop() - cluster.remove_node(node) - assert cluster.wait_for_nodes() - assert ray.global_state.cluster_resources()["CPU"] == 1 + assert sum(t.status == Trial.RUNNING for t in runner.get_trials()) == 2 @pytest.mark.skip("Add this test once reconstruction is fixed") @@ -133,7 +137,7 @@ def test_remove_node_before_result(start_connected_cluster): runner = TrialRunner(BasicVariantGenerator()) kwargs = {"stopping_criterion": {"training_iteration": 3}} - trials = [Trial("test", **kwargs), Trial("test", **kwargs)] + trials = [Trial("__fake", **kwargs), Trial("__fake", **kwargs)] for t in trials: runner.add_trial(t) @@ -179,7 +183,7 @@ def test_trial_migration(start_connected_emptyhead_cluster): } # Test recovery of trial that hasn't been checkpointed - t = Trial("test", **kwargs) + t = Trial("__fake", **kwargs) runner.add_trial(t) runner.step() # start runner.step() # 1 result @@ -199,7 +203,7 @@ def test_trial_migration(start_connected_emptyhead_cluster): assert t.status == Trial.TERMINATED # Test recovery of trial that has been checkpointed - t2 = Trial("test", **kwargs) + t2 = Trial("__fake", **kwargs) runner.add_trial(t2) runner.step() # start runner.step() # 1 result @@ -216,7 +220,7 @@ def test_trial_migration(start_connected_emptyhead_cluster): assert t2.status == Trial.TERMINATED # Test recovery of trial that won't be checkpointed - t3 = Trial("test", **{"stopping_criterion": {"training_iteration": 3}}) + t3 = Trial("__fake", **{"stopping_criterion": {"training_iteration": 3}}) runner.add_trial(t3) runner.step() # start runner.step() # 1 result @@ -238,6 +242,7 @@ def test_trial_requeue(start_connected_emptyhead_cluster): """Removing a node in full cluster causes Trial to be requeued.""" cluster = start_connected_emptyhead_cluster node = cluster.add_node(resources=dict(CPU=1)) + assert cluster.wait_for_nodes() runner = TrialRunner(BasicVariantGenerator()) kwargs = { @@ -248,7 +253,7 @@ def test_trial_requeue(start_connected_emptyhead_cluster): "max_failures": 1 } - trials = [Trial("test", **kwargs), Trial("test", **kwargs)] + trials = [Trial("__fake", **kwargs), Trial("__fake", **kwargs)] for t in trials: runner.add_trial(t) @@ -262,3 +267,156 @@ def test_trial_requeue(start_connected_emptyhead_cluster): with pytest.raises(TuneError): runner.step() + + +def test_cluster_down_simple(start_connected_cluster, tmpdir): + """Tests that TrialRunner save/restore works on cluster shutdown.""" + cluster = start_connected_cluster + cluster.add_node(resources=dict(CPU=1)) + dirpath = str(tmpdir) + runner = TrialRunner( + BasicVariantGenerator(), checkpoint_freq=2, checkpoint_dir=dirpath) + kwargs = { + "stopping_criterion": { + "training_iteration": 2 + }, + "checkpoint_freq": 1, + "max_failures": 1 + } + trials = [Trial("__fake", **kwargs), Trial("__fake", **kwargs)] + for t in trials: + runner.add_trial(t) + + runner.step() # start + runner.step() # start2 + runner.step() # step + assert all(t.status == Trial.RUNNING for t in runner.get_trials()) + runner.save() + + cluster.shutdown() + ray.shutdown() + + cluster = _start_new_cluster() + runner = TrialRunner(BasicVariantGenerator()) + runner.restore(dirpath) + runner.step() # start + runner.step() # start2 + + for i in range(3): + runner.step() + + with pytest.raises(TuneError): + runner.step() + + assert all(t.status == Trial.TERMINATED for t in runner.get_trials()) + cluster.shutdown() + + +def test_cluster_down_full(start_connected_cluster, tmpdir): + """Tests that run_experiment restoring works on cluster shutdown.""" + cluster = start_connected_cluster + dirpath = str(tmpdir) + + exp1_args = dict( + run="__fake", + stop=dict(training_iteration=3), + checkpoint_freq=1) + exp2_args = dict(run="__fake", stop=dict(training_iteration=3)) + exp3_args = dict( + run="__fake", + stop=dict(training_iteration=3), + config=dict(mock_error=True)) + exp4_args = dict( + run="__fake", + stop=dict(training_iteration=3), + config=dict(mock_error=True), + checkpoint_freq=1) + + tune.run_experiments( + dict(exp1=exp1_args, exp2=exp2_args, exp3=exp3_args, exp4=exp4_args), + checkpoint_dir=dirpath, + checkpoint_freq=2, + raise_on_failed_trial=False) + + ray.shutdown() + cluster.shutdown() + cluster = _start_new_cluster() + + # Check that last_result.iteration = 1 + runner = TrialRunner(BasicVariantGenerator()) + runner.restore(dirpath) + trials = runner.get_trials() + trials = tune.run_experiments(restore_from_path=dirpath) + assert len(trials) == 2 + assert all(t.status in [Trial.TERMINATED, Trial.ERROR] for t in trials) + cluster.shutdown() + + +def test_cluster_interrupt(start_connected_cluster, tmpdir): + """Tests run_experiment on cluster shutdown even with atypical trial. + + The trial fails on the 4th step, and the checkpointing happens on + the 3rd step, so restoring should actually launch the trial again. + """ + cluster = start_connected_cluster + dirpath = str(tmpdir) + script = """ +import time +import ray +from ray import tune + +ray.init(redis_address="{redis_address}") + +{register_trainable_fn} +{run_register_trainable_fn}() + +kwargs = dict( + run="test", + stop=dict(training_iteration=5), + checkpoint_freq=1, + max_failures=1) + +# This will save to disk on step 0 and step 3 +tune.run_experiments( + dict(experiment1=kwargs), + checkpoint_dir="{checkpoint_dir}", + checkpoint_freq=3, + raise_on_failed_trial=False) +""".format( + redis_address=cluster.redis_address, + checkpoint_dir=dirpath, + register_trainable_fn=inspect.getsource(register_fail_trainable), + run_register_trainable_fn=register_fail_trainable.__name__) + run_string_as_driver_nonblocking(script) + + # Wait until the right checkpoint is saved. + # The trainable returns every 0.5 seconds, so this should not miss + # the checkpoint. + for i in range(30): + if os.path.exists(os.path.join(dirpath, "experiment.state")): + # Inspect the internal trialrunner + runner = TrialRunner(BasicVariantGenerator()) + runner.restore(dirpath) + trials = runner.get_trials() + last_res = trials[0].last_result + if last_res is not None and last_res["training_iteration"] == 3: + break + time.sleep(0.2) + + ray.shutdown() + cluster.shutdown() + cluster = _start_new_cluster() + register_fail_trainable() + + # Inspect the internal trialrunner just in case + runner = TrialRunner(BasicVariantGenerator()) + runner.restore(dirpath) + trials = runner.get_trials() + assert trials[0].last_result["training_iteration"] == 3 + assert trials[0].status == Trial.PENDING + + # Restore properly from checkpoint + trials = tune.run_experiments( + restore_from_path=dirpath, raise_on_failed_trial=False) + assert all(t.status == Trial.ERROR for t in trials) + cluster.shutdown() diff --git a/python/ray/tune/test/ray_trial_executor_test.py b/python/ray/tune/test/ray_trial_executor_test.py index 35c413e717bb..8e6ef765cee3 100644 --- a/python/ray/tune/test/ray_trial_executor_test.py +++ b/python/ray/tune/test/ray_trial_executor_test.py @@ -9,8 +9,9 @@ from ray.rllib import _register_all from ray.tune import Trainable from ray.tune.ray_trial_executor import RayTrialExecutor +from ray.tune.registry import _global_registry, TRAINABLE_CLASS from ray.tune.suggest import BasicVariantGenerator -from ray.tune.trial import Trial, Checkpoint +from ray.tune.trial import Trial, Checkpoint, Resources class RayTrialExecutorTest(unittest.TestCase): @@ -50,6 +51,15 @@ def testPauseResume(self): self.trial_executor.stop_trial(trial) self.assertEqual(Trial.TERMINATED, trial.status) + def testStartFailure(self): + _global_registry.register(TRAINABLE_CLASS, "asdf", None) + trial = Trial("asdf", resources=Resources(1, 0)) + self.trial_executor.start_trial(trial) + self.assertEqual(Trial.ERROR, trial.status) + self.assertRaises( + Exception, lambda: self.trial_executor.start_trial( + trial, raise_on_error=True)) + def testPauseResume2(self): """Tests that pausing works for trials being processed.""" trial = Trial("__fake") diff --git a/python/ray/tune/test/trial_runner_test.py b/python/ray/tune/test/trial_runner_test.py index 6b142d354ec7..d60ca7ee2b03 100644 --- a/python/ray/tune/test/trial_runner_test.py +++ b/python/ray/tune/test/trial_runner_test.py @@ -3,12 +3,14 @@ from __future__ import print_function import os +import shutil +import sys +import tempfile import time import unittest import ray from ray.rllib import _register_all - from ray.tune import Trainable, TuneError from ray.tune import register_env, register_trainable, run_experiments from ray.tune.ray_trial_executor import RayTrialExecutor @@ -25,6 +27,11 @@ SuggestionAlgorithm) from ray.tune.suggest.variant_generator import RecursiveDependencyError +if sys.version_info >= (3, 3): + from unittest.mock import patch +else: + from mock import patch + class TrainableFunctionApiTest(unittest.TestCase): def setUp(self): @@ -580,6 +587,25 @@ def train(config, reporter): self.assertEqual(trial.status, Trial.TERMINATED) self.assertEqual(trial.last_result[TIMESTEPS_TOTAL], 99) + def testSimultaneousExperimentRestore(self): + tmpdir = tempfile.mkdtemp() + def train(config, reporter): + for i in range(100): + reporter(timesteps_total=i) + + register_trainable("f1", train) + exp1 = Experiment(**{ + "name": "foo", + "run": "f1", + "config": { + "script_min_iter_time_s": 0 + } + }) + self.assertRaises( + AssertionError, lambda: run_experiments( + exp1, restore_from_path=tmpdir)) + shutil.rmtree(tmpdir) + def testExperimentList(self): def train(config, reporter): for i in range(100): @@ -845,6 +871,25 @@ def testMaxConcurrentSuggestions(self): self.assertEqual(len(searcher.next_trials()), 0) +def create_mock_components(): + class _MockScheduler(FIFOScheduler): + errored_trials = [] + + def on_trial_error(self, trial_runner, trial): + self.errored_trials += [trial] + + class _MockSearchAlg(BasicVariantGenerator): + errored_trials = [] + + def on_trial_complete(self, trial_id, error=False, **kwargs): + if error: + self.errored_trials += [trial_id] + + searchalg = _MockSearchAlg() + scheduler = _MockScheduler() + return searchalg, scheduler + + class TrialRunnerTest(unittest.TestCase): def tearDown(self): ray.shutdown() @@ -889,16 +934,6 @@ def train(config, reporter): self.assertLessEqual(len(trial.logdir), 200) trial_executor.stop_trial(trial) - def testTrialErrorOnStart(self): - ray.init() - trial_executor = RayTrialExecutor() - _global_registry.register(TRAINABLE_CLASS, "asdf", None) - trial = Trial("asdf", resources=Resources(1, 0)) - try: - trial_executor.start_trial(trial) - except Exception as e: - self.assertIn("a class", str(e)) - def testExtraResources(self): ray.init(num_cpus=4, num_gpus=2) runner = TrialRunner(BasicVariantGenerator()) @@ -1055,7 +1090,9 @@ def testThrowOnOverstep(self): def testFailureRecoveryDisabled(self): ray.init(num_cpus=1, num_gpus=1) - runner = TrialRunner(BasicVariantGenerator()) + searchalg, scheduler = create_mock_components() + + runner = TrialRunner(searchalg, scheduler=scheduler) kwargs = { "resources": Resources(cpu=1, gpu=1), "checkpoint_freq": 1, @@ -1074,10 +1111,15 @@ def testFailureRecoveryDisabled(self): runner.step() self.assertEqual(trials[0].status, Trial.ERROR) self.assertEqual(trials[0].num_failures, 1) + self.assertEqual(len(searchalg.errored_trials), 1) + self.assertEqual(len(scheduler.errored_trials), 1) def testFailureRecoveryEnabled(self): ray.init(num_cpus=1, num_gpus=1) - runner = TrialRunner(BasicVariantGenerator()) + searchalg, scheduler = create_mock_components() + + runner = TrialRunner(searchalg, scheduler=scheduler) + kwargs = { "resources": Resources(cpu=1, gpu=1), "checkpoint_freq": 1, @@ -1098,6 +1140,40 @@ def testFailureRecoveryEnabled(self): self.assertEqual(trials[0].num_failures, 1) runner.step() self.assertEqual(trials[0].status, Trial.RUNNING) + self.assertEqual(len(searchalg.errored_trials), 0) + self.assertEqual(len(scheduler.errored_trials), 0) + + def testFailureRecoveryNodeRemoval(self): + ray.init(num_cpus=1, num_gpus=1) + searchalg, scheduler = create_mock_components() + + runner = TrialRunner(searchalg, scheduler=scheduler) + + kwargs = { + "resources": Resources(cpu=1, gpu=1), + "checkpoint_freq": 1, + "max_failures": 1, + "config": { + "mock_error": True, + }, + } + runner.add_trial(Trial("__fake", **kwargs)) + trials = runner.get_trials() + + with patch('ray.global_state.cluster_resources') as resource_mock: + resource_mock.return_value = {"CPU": 1, "GPU": 1} + runner.step() + self.assertEqual(trials[0].status, Trial.RUNNING) + runner.step() + self.assertEqual(trials[0].status, Trial.RUNNING) + + # Mimic a node failure + resource_mock.return_value = {"CPU": 0, "GPU": 0} + runner.step() + self.assertEqual(trials[0].status, Trial.PENDING) + self.assertEqual(trials[0].num_failures, 1) + self.assertEqual(len(searchalg.errored_trials), 0) + self.assertEqual(len(scheduler.errored_trials), 1) def testFailureRecoveryMaxFailures(self): ray.init(num_cpus=1, num_gpus=1) @@ -1499,6 +1575,97 @@ def _suggest(self, trial_id): self.assertTrue(searcher.is_finished()) self.assertRaises(TuneError, runner.step) + def testSaveRestore(self): + """Creates trials of different status to test runner.save/restore.""" + ray.init(num_cpus=3) + tmpdir = tempfile.mkdtemp() + default_resources = Resources(cpu=1, gpu=0) + + runner = TrialRunner( + BasicVariantGenerator(), checkpoint_dir=tmpdir, checkpoint_freq=1) + trials = [ + Trial( + "__fake", + trial_id="trial_terminate", + stopping_criterion={"training_iteration": 1}, + checkpoint_freq=1, + resources=default_resources) + ] + runner.add_trial(trials[0]) + runner.step() # start + runner.step() + self.assertEquals(trials[0].status, Trial.TERMINATED) + + trials += [ + Trial( + "__fake", + trial_id="trial_fail", + stopping_criterion={"training_iteration": 3}, + checkpoint_freq=1, + config={"mock_error": True}, + resources=default_resources) + ] + runner.add_trial(trials[1]) + runner.step() + runner.step() + runner.step() + self.assertEquals(trials[1].status, Trial.ERROR) + + trials += [ + Trial( + "__fake", + trial_id="trial_succ", + stopping_criterion={"training_iteration": 2}, + checkpoint_freq=1, + resources=default_resources) + ] + runner.add_trial(trials[2]) + runner.step() + self.assertEquals(len(runner.trial_executor.get_checkpoints()), 3) + self.assertEquals(trials[2].status, Trial.RUNNING) + + runner2 = TrialRunner(BasicVariantGenerator()) + runner2.restore(tmpdir) + for tid in ["trial_terminate", "trial_fail"]: + original_trial = runner.get_trial(tid) + restored_trial = runner2.get_trial(tid) + self.assertEqual(original_trial.status, restored_trial.status) + + restored_trial = runner2.get_trial("trial_succ") + self.assertEqual(Trial.PENDING, restored_trial.status) + + runner2.step() + runner2.step() + runner2.step() + self.assertRaises(TuneError, runner2.step) + shutil.rmtree(tmpdir) + + def testNoSave(self): + """Check that non-checkpointing trials are not saved.""" + ray.init(num_cpus=3) + tmpdir = tempfile.mkdtemp() + default_resources = Resources(cpu=1, gpu=0) + + runner = TrialRunner( + BasicVariantGenerator(), checkpoint_dir=tmpdir, checkpoint_freq=1) + trials = [ + Trial( + "__fake", + trial_id="trial_terminate", + stopping_criterion={"training_iteration": 2}, + resources=default_resources) + ] + runner.add_trial(trials[0]) + runner.step() # start + runner.step() + + runner2 = TrialRunner(BasicVariantGenerator()) + runner2.restore(tmpdir) + self.assertEquals(len(runner2.get_trials()), 0) + runner2.step() + self.assertRaises(TuneError, runner2.step) + shutil.rmtree(tmpdir) + if __name__ == "__main__": unittest.main(verbosity=2) diff --git a/python/ray/tune/trial.py b/python/ray/tune/trial.py index 65683eeb53c7..e555e9512101 100644 --- a/python/ray/tune/trial.py +++ b/python/ray/tune/trial.py @@ -216,17 +216,19 @@ def should_stop(self, result): return False - def should_checkpoint(self, result): + def should_checkpoint(self): """Whether this trial is due for checkpointing.""" + result = self.last_result or {} if result.get(DONE) and self.checkpoint_at_end: return True - if not self.checkpoint_freq: + if self.checkpoint_freq: + return result.get(TRAINING_ITERATION, + 0) % self.checkpoint_freq == 0 + else: return False - return self.last_result[TRAINING_ITERATION] % self.checkpoint_freq == 0 - def progress_string(self): """Returns a progress message for printing out to the console.""" @@ -281,10 +283,12 @@ def has_checkpoint(self): def should_recover(self): """Returns whether the trial qualifies for restoring. - This is if a checkpoint frequency is set, which includes settings - where there may not yet be a checkpoint. + This is if a checkpoint frequency is set and has not failed more than + max_failures. This may return true even when there may not yet + be a checkpoint. """ - return self.checkpoint_freq > 0 + return (self.checkpoint_freq > 0 + and self.num_failures < self.max_failures) def update_last_result(self, result, terminate=False): if terminate: @@ -321,3 +325,27 @@ def __str__(self): if self.experiment_tag: identifier += "_" + self.experiment_tag return identifier + + def __getstate__(self): + if not self._checkpoint.storage == Checkpoint.DISK: + raise ValueError("Most recent checkpoint cannot be in-memory.") + state = self.__dict__.copy() + + if state["status"] == Trial.RUNNING: + state["status"] = Trial.PENDING + # Remove the unpicklable entries. + if state["result_logger"]: + state["result_logger"].flush() + state["_logger_started"] = True + else: + state["_logger_started"] = False + + state["result_logger"] = None + state["runner"] = None + return state + + def __setstate__(self, state): + logger_started = state.pop("_logger_started") + self.__dict__.update(state) + if logger_started: + self.init_logger() diff --git a/python/ray/tune/trial_executor.py b/python/ray/tune/trial_executor.py index e0b541218bf1..53b9d1d805c2 100644 --- a/python/ray/tune/trial_executor.py +++ b/python/ray/tune/trial_executor.py @@ -4,6 +4,7 @@ from __future__ import print_function import logging +import pickle from ray.tune.trial import Trial, Checkpoint @@ -15,7 +16,7 @@ class TrialExecutor(object): and starting/stopping trials. """ - def __init__(self, queue_trials=False): + def __init__(self, queue_trials=False, track_checkpoints=False): """Initializes a new TrialExecutor. Args: @@ -25,21 +26,58 @@ def __init__(self, queue_trials=False): automatic scale-up. """ self._queue_trials = queue_trials + self._track_checkpoints = track_checkpoints + self._checkpoints = {} + + def set_status(self, trial, status): + """Sets status and checkpoints metadata if needed. + + Only checkpoints metadata if trial status is a terminal condition. + PENDING, PAUSED, and RUNNING switches have checkpoints taken care of + in the TrialRunner. + + Args: + trial (Trial): Trial to checkpoint. + status (Trial.status): Status to set trial to. + """ + trial.status = status + if status in [Trial.TERMINATED, Trial.ERROR]: + self.try_checkpoint_metadata(trial) + + def try_checkpoint_metadata(self, trial): + """Checkpoints metadata if current session and trial allow. + + Args: + trial (Trial): Trial to checkpoint. + """ + if self._track_checkpoints and trial.checkpoint_freq > 0: + if trial._checkpoint.storage == Checkpoint.MEMORY: + logger.debug("Not saving data for trial w/ memory checkpoint.") + return + try: + logger.debug("Saving trial metadata.") + metadata = pickle.dumps(trial) + self._checkpoints[trial.trial_id] = metadata + except ValueError: + logger.exception("Error checkpointing trial metadata.") + + def get_checkpoints(self): + """Returns a copy of mapping of the trial ID to pickled metadata.""" + return self._checkpoints.copy() def has_resources(self, resources): """Returns whether this runner has at least the specified resources.""" raise NotImplementedError("Subclasses of TrialExecutor must provide " "has_resources() method") - def start_trial(self, trial, checkpoint=None): - """Starts the trial restoring from checkpoint if checkpoint != None. - - If an error is encountered when starting the trial, an exception will - be thrown. + def start_trial(self, trial, checkpoint=None, raise_on_failure=False): + """Starts the trial restoring from checkpoint if checkpoint is provided. Args: + trial (Trial): Trial to be started. checkpoint(Checkpoint): A Python object or path storing the state of trial. + raise_on_failure (bool): To raise exception on failure in starting. """ raise NotImplementedError("Subclasses of TrialExecutor must provide " "start_trial() method") @@ -59,26 +97,6 @@ def stop_trial(self, trial, error=False, error_msg=None, stop_logger=True): raise NotImplementedError("Subclasses of TrialExecutor must provide " "stop_trial() method") - def restart_trial(self, trial, error_msg=None): - """Restarts or requeues the trial. - - The state of the trial should restore from the last checkpoint. Trial - is requeued if the cluster no longer has resources to accomodate it. - - Args: - error_msg (str): Optional error message. - """ - self.stop_trial( - trial, - error=error_msg is not None, - error_msg=error_msg, - stop_logger=False) - trial.result_logger.flush() - if self.has_resources(trial.resources): - self.start_trial(trial) - else: - trial.status = Trial.PENDING - def continue_training(self, trial): """Continues the training of this trial.""" pass @@ -93,15 +111,15 @@ def pause_trial(self, trial): try: self.save(trial, Checkpoint.MEMORY) self.stop_trial(trial, stop_logger=False) - trial.status = Trial.PAUSED + self.set_status(trial, Trial.PAUSED) except Exception: logger.exception("Error pausing runner.") - trial.status = Trial.ERROR + self.set_status(trial, Trial.ERROR) def unpause_trial(self, trial): """Sets PAUSED trial to pending to allow scheduler to start.""" assert trial.status == Trial.PAUSED, trial.status - trial.status = Trial.PENDING + self.set_status(trial, Trial.PENDING) def resume_trial(self, trial): """Resumes PAUSED trials. This is a blocking call.""" diff --git a/python/ray/tune/trial_runner.py b/python/ray/tune/trial_runner.py index 98bbbcb71c64..548c68c51cc0 100644 --- a/python/ray/tune/trial_runner.py +++ b/python/ray/tune/trial_runner.py @@ -5,6 +5,7 @@ import collections import logging import os +import pickle import re import time import traceback @@ -12,7 +13,7 @@ from ray.tune import TuneError from ray.tune.ray_trial_executor import RayTrialExecutor from ray.tune.result import TIME_THIS_ITER_S -from ray.tune.trial import Trial +from ray.tune.trial import Trial, Checkpoint from ray.tune.schedulers import FIFOScheduler, TrialScheduler from ray.tune.web_server import TuneServer @@ -53,6 +54,8 @@ def __init__(self, search_alg, scheduler=None, launch_web_server=False, + checkpoint_dir=None, + checkpoint_freq=0, server_port=TuneServer.DEFAULT_PORT, verbose=True, queue_trials=False, @@ -64,6 +67,9 @@ def __init__(self, Trial objects. scheduler (TrialScheduler): Defaults to FIFOScheduler. launch_web_server (bool): Flag for starting TuneServer + checkpoint_dir (str): Path where global checkpoints are stored. + checkpoint_freq (int): How many steps between global + checkpoints. A value of 0 (default) disables checkpointing. server_port (int): Port number for launching TuneServer verbose (bool): Flag for verbosity. If False, trial results will not be output. @@ -77,19 +83,68 @@ def __init__(self, self._scheduler_alg = scheduler or FIFOScheduler() self._trials = [] self.trial_executor = trial_executor or \ - RayTrialExecutor(queue_trials=queue_trials) + RayTrialExecutor(queue_trials=queue_trials, + track_checkpoints=checkpoint_freq > 0) # For debugging, it may be useful to halt trials after some time has # elapsed. TODO(ekl) consider exposing this in the API. self._global_time_limit = float( os.environ.get("TRIALRUNNER_WALLTIME_LIMIT", float('inf'))) self._total_time = 0 + self._iteration = 0 self._server = None if launch_web_server: self._server = TuneServer(self, server_port) self._stop_queue = [] self._verbose = verbose self._queue_trials = queue_trials + self._checkpoint_dir = checkpoint_dir + self._checkpoint_freq = checkpoint_freq + self._trial_checkpoints = {} + + def save(self): + """Saves all trial checkpoints to `self._checkpoint_dir.`""" + checkpoint_dir = self._checkpoint_dir + if not os.path.exists(checkpoint_dir): + logger.debug("Checkpoint directory newly created.") + os.makedirs(checkpoint_dir) + logger.warning("Search Algorithm and Scheduler not checkpointed.") + # search_alg_checkpoint = self._search_alg.save(checkpoint_dir) + # scheduler_alg_checkpoint = self._scheduler_alg.save(checkpoint_dir) + runner_state = { + "checkpoints": list( + self.trial_executor.get_checkpoints().values()), + "total_time": self._total_time, + "stop_queue": self._stop_queue + } + with open(os.path.join(checkpoint_dir, "experiment.state"), "wb") as f: + pickle.dump(runner_state, f) + + return checkpoint_dir + + def restore(self, checkpoint_dir): + """Restores all checkpointed trials from previous run. + + Requires user to manually re-register their objects. Also stops + all ongoing trials. + + Args: + checkpoint_dir (str): Path to checkpoint (previously specified). + """ + logger.debug("Stopping all trials.") + for trial in self._trials: + self.stop_trial(trial) + + with open(os.path.join(checkpoint_dir, "experiment.state"), "rb") as f: + runner_state = pickle.load(f) + + logger.info("Replacing all trials with checkpoint state.") + for ckpt in runner_state["checkpoints"]: + trial = pickle.loads(ckpt) + self.add_trial(trial) + + self._total_time = runner_state["total_time"] + self._stop_queue = runner_state["stop_queue"] def is_finished(self): """Returns whether all trials have finished running.""" @@ -136,6 +191,13 @@ def step(self): "There are paused trials, but no more pending " "trials with sufficient resources.") + if self._checkpoint_freq: + if (self._iteration % self._checkpoint_freq == 0 + or self.is_finished()): + self.save() + + self._iteration += 1 + if self._server: self._process_requests() @@ -165,6 +227,7 @@ def add_trial(self, trial): """ trial.set_verbose(self._verbose) self._scheduler_alg.on_trial_add(self, trial) + self._checkpoint_if_needed(trial) self._trials.append(trial) def debug_string(self, max_debug=MAX_DEBUG_TRIALS): @@ -278,17 +341,14 @@ def _process_events(self): result, terminate=(decision == TrialScheduler.STOP)) if decision == TrialScheduler.CONTINUE: - if trial.should_checkpoint(result): - # TODO(rliaw): This is a blocking call - self.trial_executor.save(trial) + self._checkpoint_if_needed(trial) self.trial_executor.continue_training(trial) elif decision == TrialScheduler.PAUSE: self.trial_executor.pause_trial(trial) elif decision == TrialScheduler.STOP: # Checkpoint before ending the trial # if checkpoint_at_end experiment option is set to True - if trial.should_checkpoint(result): - self.trial_executor.save(trial) + self._checkpoint_if_needed(trial) self.trial_executor.stop_trial(trial) else: assert False, "Invalid scheduling decision: {}".format( @@ -297,24 +357,63 @@ def _process_events(self): logger.exception("Error processing event.") error_msg = traceback.format_exc() if trial.status == Trial.RUNNING: - if trial.should_recover() and \ - trial.num_failures < trial.max_failures: - self._try_recover(trial, error_msg) + if trial.should_recover(): + self.try_recover(trial, error_msg) else: + self.trial_executor.stop_trial( + trial, + error=error_msg is not None, + error_msg=error_msg) self._scheduler_alg.on_trial_error(self, trial) self._search_alg.on_trial_complete( trial.trial_id, error=True) - self.trial_executor.stop_trial(trial, True, error_msg) - def _try_recover(self, trial, error_msg): + def _checkpoint_if_needed(self, trial): + """Checkpoints trial based off trial.last_result.""" + if trial.should_checkpoint(): + # Save trial runtime if possible + if hasattr(trial, "runner") and trial.runner: + self.trial_executor.save(trial, storage=Checkpoint.DISK) + self.trial_executor.try_checkpoint_metadata(trial) + + def try_recover(self, trial, error_msg): + """Tries to recover trial. + + Notifies SearchAlgorithm and Scheduler if failure to recover. + + Args: + trial (Trial): Trial to recover. + error_msg (str): Error message from prior to invoking this method. + """ try: - logger.info("Attempting to recover" - " trial state from last checkpoint.") - self.trial_executor.restart_trial(trial, error_msg) + self.trial_executor.stop_trial( + trial, + error=error_msg is not None, + error_msg=error_msg, + stop_logger=False) + trial.result_logger.flush() + if self.trial_executor.has_resources(trial.resources): + logger.info("Attempting to recover" + " trial state from last checkpoint.") + self.trial_executor.start_trial(trial, raise_on_failure=True) + else: + logger.debug("Notifying Scheduler and requeueing trial.") + self._requeue_trial(trial) except Exception: error_msg = traceback.format_exc() - logger.warning("Error recovering trial from checkpoint, abort.") - self.trial_executor.stop_trial(trial, True, error_msg=error_msg) + logger.exception("Error recovering trial from checkpoint, abort.") + self._scheduler_alg.on_trial_error(self, trial) + self._search_alg.on_trial_complete(trial.trial_id, error=True) + + def _requeue_trial(self, trial): + """Notification to TrialScheduler and requeue trial. + + This does not notify the SearchAlgorithm because the function + evaluation is still in progress. + """ + self._scheduler_alg.on_trial_error(self, trial) + self.trial_executor.set_status(trial, Trial.PENDING) + self._scheduler_alg.on_trial_add(self, trial) def _update_trial_queue(self, blocking=False, timeout=600): """Adds next trials to queue if possible. diff --git a/python/ray/tune/tune.py b/python/ray/tune/tune.py index 335660ecb836..86a4d2ba79d5 100644 --- a/python/ray/tune/tune.py +++ b/python/ray/tune/tune.py @@ -3,11 +3,13 @@ from __future__ import print_function import logging +import os import time from ray.tune.error import TuneError from ray.tune.suggest import BasicVariantGenerator from ray.tune.trial import Trial, DEBUG_PRINT_INTERVAL +from ray.tune.result import DEFAULT_RESULTS_DIR from ray.tune.log_sync import wait_for_log_sync from ray.tune.trial_runner import TrialRunner from ray.tune.schedulers import (HyperBandScheduler, AsyncHyperBandScheduler, @@ -35,6 +37,9 @@ def _make_scheduler(args): def run_experiments(experiments=None, search_alg=None, scheduler=None, + restore_from_path=None, + checkpoint_dir=None, + checkpoint_freq=0, with_server=False, server_port=TuneServer.DEFAULT_PORT, verbose=True, @@ -51,6 +56,12 @@ def run_experiments(experiments=None, scheduler (TrialScheduler): Scheduler for executing the experiment. Choose among FIFO (default), MedianStopping, AsyncHyperBand, and HyperBand. + restore_from_path (str): Restores experiment execution state to + given checkpoint path. + checkpoint_dir (str): Path at which experiment checkpoints are stored. + Defaults to DEFAULT_RESULTS_DIR. + checkpoint_freq (int): How many trial results between + checkpoints. A value of 0 (default) disables checkpointing. with_server (bool): Starts a background Tune server. Needed for using the Client API. server_port (int): Port number for launching TuneServer. @@ -90,19 +101,27 @@ def run_experiments(experiments=None, if search_alg is None: search_alg = BasicVariantGenerator() - search_alg.add_configurations(experiments) - runner = TrialRunner( search_alg, scheduler=scheduler, + checkpoint_dir=checkpoint_dir or DEFAULT_RESULTS_DIR, + checkpoint_freq=checkpoint_freq, launch_web_server=with_server, server_port=server_port, verbose=verbose, queue_trials=queue_trials, trial_executor=trial_executor) - logger.info(runner.debug_string(max_debug=99999)) + if restore_from_path: + if not os.path.exists(restore_from_path): + raise ValueError("Provided path invalid: %s" % restore_from_path) + assert experiments is None, ( + "Simultaneous starting experiments and restoring not supported.") + runner.restore(restore_from_path) + else: + search_alg.add_configurations(experiments) + logger.info(runner.debug_string(max_debug=99999)) last_debug = 0 while not runner.is_finished(): runner.step()