-
Notifications
You must be signed in to change notification settings - Fork 7.2k
[tune] Component notification on node failure + Tests #3414
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
fcbc6de
5d8e414
9137de0
4453724
5a24499
b750d4e
14da6ec
394c0e9
48fd3c3
bcf4051
0f67265
8e24c36
d67211c
5cbc97f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,45 +3,22 @@ | |
| from __future__ import print_function | ||
|
|
||
| import json | ||
| import time | ||
| import pytest | ||
| try: | ||
| import pytest_timeout | ||
| except ImportError: | ||
| pytest_timeout = None | ||
|
|
||
| from ray.test.cluster_utils import Cluster | ||
| import ray | ||
| from ray import tune | ||
| from ray.rllib import _register_all | ||
| from ray.test.cluster_utils import Cluster | ||
| from ray.tune.error import TuneError | ||
| from ray.tune.trial import Trial | ||
| from ray.tune.trial_runner import TrialRunner | ||
| from ray.tune.suggest import BasicVariantGenerator | ||
|
|
||
|
|
||
| def register_test_trainable(): | ||
| class _Train(tune.Trainable): | ||
| def _setup(self, config): | ||
| self.state = {"hi": 1} | ||
|
|
||
| def _train(self): | ||
| self.state["hi"] += 1 | ||
| time.sleep(0.5) | ||
| return {} | ||
|
|
||
| def _save(self, path): | ||
| return self.state | ||
|
|
||
| def _restore(self, state): | ||
| self.state = state | ||
|
|
||
| tune.register_trainable("test", _Train) | ||
|
|
||
|
|
||
| @pytest.fixture | ||
| def start_connected_cluster(): | ||
| # Start the Ray processes. | ||
|
|
||
| def _start_new_cluster(): | ||
| cluster = Cluster( | ||
| initialize_head=True, | ||
| connect=True, | ||
|
|
@@ -51,7 +28,15 @@ def start_connected_cluster(): | |
| "num_heartbeats_timeout": 10 | ||
| }) | ||
| }) | ||
| register_test_trainable() | ||
| # Pytest doesn't play nicely with imports | ||
| _register_all() | ||
| return cluster | ||
|
|
||
|
|
||
| @pytest.fixture | ||
| def start_connected_cluster(): | ||
| # Start the Ray processes. | ||
| cluster = _start_new_cluster() | ||
| yield cluster | ||
| # The code after the yield will run as teardown code. | ||
| ray.shutdown() | ||
|
|
@@ -71,52 +56,44 @@ def start_connected_emptyhead_cluster(): | |
| "num_heartbeats_timeout": 10 | ||
| }) | ||
| }) | ||
| register_test_trainable() | ||
| # Pytest doesn't play nicely with imports | ||
| _register_all() | ||
| yield cluster | ||
| # The code after the yield will run as teardown code. | ||
| ray.shutdown() | ||
| cluster.shutdown() | ||
|
|
||
|
|
||
| @pytest.mark.skipif( | ||
| pytest_timeout is None, | ||
| reason="Timeout package not installed; skipping test.") | ||
| @pytest.mark.timeout(10, method="thread") | ||
| def test_counting_resources(start_connected_cluster): | ||
| """Tests that Tune accounting is consistent with actual cluster.""" | ||
|
|
||
| cluster = start_connected_cluster | ||
| assert ray.global_state.cluster_resources()["CPU"] == 1 | ||
| nodes = [] | ||
| nodes += [cluster.add_node(resources=dict(CPU=1))] | ||
| assert cluster.wait_for_nodes() | ||
| assert ray.global_state.cluster_resources()["CPU"] == 2 | ||
|
|
||
| assert ray.global_state.cluster_resources()["CPU"] == 1 | ||
| runner = TrialRunner(BasicVariantGenerator()) | ||
| kwargs = {"stopping_criterion": {"training_iteration": 10}} | ||
|
|
||
| trials = [Trial("test", **kwargs), Trial("test", **kwargs)] | ||
| trials = [Trial("__fake", **kwargs), Trial("__fake", **kwargs)] | ||
| for t in trials: | ||
| runner.add_trial(t) | ||
|
|
||
| runner.step() # run 1 | ||
| nodes += [cluster.add_node(resources=dict(CPU=1))] | ||
| assert cluster.wait_for_nodes() | ||
| assert ray.global_state.cluster_resources()["CPU"] == 2 | ||
| cluster.remove_node(nodes.pop()) | ||
| assert cluster.wait_for_nodes() | ||
| assert ray.global_state.cluster_resources()["CPU"] == 1 | ||
| runner.step() # run 2 | ||
| assert sum(t.status == Trial.RUNNING for t in runner.get_trials()) == 1 | ||
|
|
||
| for i in range(5): | ||
| nodes += [cluster.add_node(resources=dict(CPU=1))] | ||
| assert cluster.wait_for_nodes() | ||
| assert ray.global_state.cluster_resources()["CPU"] == 6 | ||
|
|
||
| runner.step() # 1 result | ||
|
|
||
| for i in range(5): | ||
| node = nodes.pop() | ||
| cluster.remove_node(node) | ||
| assert cluster.wait_for_nodes() | ||
| assert ray.global_state.cluster_resources()["CPU"] == 1 | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This test previously didn't test Tune's resource tracking - updated test |
||
| assert sum(t.status == Trial.RUNNING for t in runner.get_trials()) == 2 | ||
|
|
||
|
|
||
| @pytest.mark.skip("Add this test once reconstruction is fixed") | ||
|
|
@@ -133,7 +110,7 @@ def test_remove_node_before_result(start_connected_cluster): | |
|
|
||
| runner = TrialRunner(BasicVariantGenerator()) | ||
| kwargs = {"stopping_criterion": {"training_iteration": 3}} | ||
| trials = [Trial("test", **kwargs), Trial("test", **kwargs)] | ||
| trials = [Trial("__fake", **kwargs), Trial("__fake", **kwargs)] | ||
| for t in trials: | ||
| runner.add_trial(t) | ||
|
|
||
|
|
@@ -179,7 +156,7 @@ def test_trial_migration(start_connected_emptyhead_cluster): | |
| } | ||
|
|
||
| # Test recovery of trial that hasn't been checkpointed | ||
| t = Trial("test", **kwargs) | ||
| t = Trial("__fake", **kwargs) | ||
| runner.add_trial(t) | ||
| runner.step() # start | ||
| runner.step() # 1 result | ||
|
|
@@ -199,7 +176,7 @@ def test_trial_migration(start_connected_emptyhead_cluster): | |
| assert t.status == Trial.TERMINATED | ||
|
|
||
| # Test recovery of trial that has been checkpointed | ||
| t2 = Trial("test", **kwargs) | ||
| t2 = Trial("__fake", **kwargs) | ||
| runner.add_trial(t2) | ||
| runner.step() # start | ||
| runner.step() # 1 result | ||
|
|
@@ -216,7 +193,7 @@ def test_trial_migration(start_connected_emptyhead_cluster): | |
| assert t2.status == Trial.TERMINATED | ||
|
|
||
| # Test recovery of trial that won't be checkpointed | ||
| t3 = Trial("test", **{"stopping_criterion": {"training_iteration": 3}}) | ||
| t3 = Trial("__fake", **{"stopping_criterion": {"training_iteration": 3}}) | ||
| runner.add_trial(t3) | ||
| runner.step() # start | ||
| runner.step() # 1 result | ||
|
|
@@ -238,6 +215,7 @@ def test_trial_requeue(start_connected_emptyhead_cluster): | |
| """Removing a node in full cluster causes Trial to be requeued.""" | ||
| cluster = start_connected_emptyhead_cluster | ||
| node = cluster.add_node(resources=dict(CPU=1)) | ||
| assert cluster.wait_for_nodes() | ||
|
|
||
| runner = TrialRunner(BasicVariantGenerator()) | ||
| kwargs = { | ||
|
|
@@ -248,7 +226,7 @@ def test_trial_requeue(start_connected_emptyhead_cluster): | |
| "max_failures": 1 | ||
| } | ||
|
|
||
| trials = [Trial("test", **kwargs), Trial("test", **kwargs)] | ||
| trials = [Trial("__fake", **kwargs), Trial("__fake", **kwargs)] | ||
| for t in trials: | ||
| runner.add_trial(t) | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,6 +3,7 @@ | |
| from __future__ import print_function | ||
|
|
||
| import os | ||
| import sys | ||
| import time | ||
| import unittest | ||
|
|
||
|
|
@@ -25,6 +26,11 @@ | |
| SuggestionAlgorithm) | ||
| from ray.tune.suggest.variant_generator import RecursiveDependencyError | ||
|
|
||
| if sys.version_info >= (3, 3): | ||
| from unittest.mock import patch | ||
| else: | ||
| from mock import patch | ||
|
|
||
|
|
||
| class TrainableFunctionApiTest(unittest.TestCase): | ||
| def setUp(self): | ||
|
|
@@ -845,6 +851,25 @@ def testMaxConcurrentSuggestions(self): | |
| self.assertEqual(len(searcher.next_trials()), 0) | ||
|
|
||
|
|
||
| def create_mock_components(): | ||
| class _MockScheduler(FIFOScheduler): | ||
| errored_trials = [] | ||
|
|
||
| def on_trial_error(self, trial_runner, trial): | ||
| self.errored_trials += [trial] | ||
|
|
||
| class _MockSearchAlg(BasicVariantGenerator): | ||
| errored_trials = [] | ||
|
|
||
| def on_trial_complete(self, trial_id, error=False, **kwargs): | ||
| if error: | ||
| self.errored_trials += [trial_id] | ||
|
|
||
| searchalg = _MockSearchAlg() | ||
| scheduler = _MockScheduler() | ||
| return searchalg, scheduler | ||
|
|
||
|
|
||
| class TrialRunnerTest(unittest.TestCase): | ||
| def tearDown(self): | ||
| ray.shutdown() | ||
|
|
@@ -889,16 +914,6 @@ def train(config, reporter): | |
| self.assertLessEqual(len(trial.logdir), 200) | ||
| trial_executor.stop_trial(trial) | ||
|
|
||
| def testTrialErrorOnStart(self): | ||
| ray.init() | ||
| trial_executor = RayTrialExecutor() | ||
| _global_registry.register(TRAINABLE_CLASS, "asdf", None) | ||
| trial = Trial("asdf", resources=Resources(1, 0)) | ||
| try: | ||
| trial_executor.start_trial(trial) | ||
| except Exception as e: | ||
| self.assertIn("a class", str(e)) | ||
|
|
||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This test actually didn't actually work because |
||
| def testExtraResources(self): | ||
| ray.init(num_cpus=4, num_gpus=2) | ||
| runner = TrialRunner(BasicVariantGenerator()) | ||
|
|
@@ -1055,7 +1070,9 @@ def testThrowOnOverstep(self): | |
|
|
||
| def testFailureRecoveryDisabled(self): | ||
| ray.init(num_cpus=1, num_gpus=1) | ||
| runner = TrialRunner(BasicVariantGenerator()) | ||
| searchalg, scheduler = create_mock_components() | ||
|
|
||
| runner = TrialRunner(searchalg, scheduler=scheduler) | ||
| kwargs = { | ||
| "resources": Resources(cpu=1, gpu=1), | ||
| "checkpoint_freq": 1, | ||
|
|
@@ -1074,10 +1091,15 @@ def testFailureRecoveryDisabled(self): | |
| runner.step() | ||
| self.assertEqual(trials[0].status, Trial.ERROR) | ||
| self.assertEqual(trials[0].num_failures, 1) | ||
| self.assertEqual(len(searchalg.errored_trials), 1) | ||
| self.assertEqual(len(scheduler.errored_trials), 1) | ||
|
|
||
| def testFailureRecoveryEnabled(self): | ||
| ray.init(num_cpus=1, num_gpus=1) | ||
| runner = TrialRunner(BasicVariantGenerator()) | ||
| searchalg, scheduler = create_mock_components() | ||
|
|
||
| runner = TrialRunner(searchalg, scheduler=scheduler) | ||
|
|
||
| kwargs = { | ||
| "resources": Resources(cpu=1, gpu=1), | ||
| "checkpoint_freq": 1, | ||
|
|
@@ -1098,6 +1120,40 @@ def testFailureRecoveryEnabled(self): | |
| self.assertEqual(trials[0].num_failures, 1) | ||
| runner.step() | ||
| self.assertEqual(trials[0].status, Trial.RUNNING) | ||
| self.assertEqual(len(searchalg.errored_trials), 0) | ||
| self.assertEqual(len(scheduler.errored_trials), 0) | ||
|
|
||
| def testFailureRecoveryNodeRemoval(self): | ||
| ray.init(num_cpus=1, num_gpus=1) | ||
| searchalg, scheduler = create_mock_components() | ||
|
|
||
| runner = TrialRunner(searchalg, scheduler=scheduler) | ||
|
|
||
| kwargs = { | ||
| "resources": Resources(cpu=1, gpu=1), | ||
| "checkpoint_freq": 1, | ||
| "max_failures": 1, | ||
| "config": { | ||
| "mock_error": True, | ||
| }, | ||
| } | ||
| runner.add_trial(Trial("__fake", **kwargs)) | ||
| trials = runner.get_trials() | ||
|
|
||
| with patch('ray.global_state.cluster_resources') as resource_mock: | ||
| resource_mock.return_value = {"CPU": 1, "GPU": 1} | ||
| runner.step() | ||
| self.assertEqual(trials[0].status, Trial.RUNNING) | ||
| runner.step() | ||
| self.assertEqual(trials[0].status, Trial.RUNNING) | ||
|
|
||
| # Mimic a node failure | ||
| resource_mock.return_value = {"CPU": 0, "GPU": 0} | ||
| runner.step() | ||
| self.assertEqual(trials[0].status, Trial.PENDING) | ||
| self.assertEqual(trials[0].num_failures, 1) | ||
| self.assertEqual(len(searchalg.errored_trials), 0) | ||
| self.assertEqual(len(scheduler.errored_trials), 1) | ||
|
|
||
| def testFailureRecoveryMaxFailures(self): | ||
| ray.init(num_cpus=1, num_gpus=1) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed in favor of
__fake