From e67bdf759e90f22e68e81442fa5cf67b5cc2876b Mon Sep 17 00:00:00 2001 From: Stephanie Date: Tue, 27 Nov 2018 14:38:07 -0800 Subject: [PATCH 1/6] Add regression test --- test/component_failures_test.py | 59 +++++++++++++++++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/test/component_failures_test.py b/test/component_failures_test.py index b9d257962120..38e21bfe4d5e 100644 --- a/test/component_failures_test.py +++ b/test/component_failures_test.py @@ -5,11 +5,14 @@ import os import json import signal +import sys import time +import numpy as np import pytest import ray +from ray.test.cluster_utils import Cluster from ray.test.test_utils import run_string_as_driver_nonblocking @@ -33,6 +36,27 @@ def shutdown_only(): ray.shutdown() +@pytest.fixture +def ray_start_cluster(): + num_workers = 8 + node_args = { + "resources": dict(CPU=num_workers), + "_internal_config": json.dumps({ + "initial_reconstruction_timeout_milliseconds": 1000, + "num_heartbeats_timeout": 10 + }) + } + # Start with 4 workers and 4 cores. + g = Cluster(initialize_head=True, connect=True, head_node_args=node_args) + workers = [] + for _ in range(4): + workers.append(g.add_node(**node_args)) + g.wait_for_nodes() + yield g + ray.shutdown() + g.shutdown() + + # This test checks that when a worker dies in the middle of a get, the plasma # store and raylet will not die. @pytest.mark.skipif( @@ -347,6 +371,41 @@ def test_plasma_store_failed(): ray.shutdown() +def test_actor_creation_node_failure(ray_start_cluster): + # TODO(swang): Refactor test_raylet_failed, etc to reuse the below code. + cluster = ray_start_cluster + + @ray.remote + class Child(object): + def __init__(self, death_probability): + self.death_probability = death_probability + + def ping(self): + # Exit process with some probability. + exit_chance = np.random.rand() + if exit_chance < self.death_probability: + sys.exit(-1) + + num_children = 100 + # Children actors will die about half the time. + death_probability = 0.5 + + children = [Child.remote(death_probability) for _ in range(num_children)] + while len(cluster.list_all_nodes()) > 1: + for j in range(3): + children_out = [child.ping.remote() for child in children] + ready, _ = ray.wait( + children_out, num_returns=len(children_out), timeout=30000) + assert len(ready) == len(children_out) + + for i, out in enumerate(children_out): + try: + ray.get(out) + except ray.worker.RayGetError: + children[i] = Child.remote(death_probability) + cluster.remove_node(cluster.list_all_nodes()[-1]) + + @pytest.mark.skipif( os.environ.get("RAY_USE_NEW_GCS") == "on", reason="Hanging with new GCS API.") From e40add99ec68ef8bd3f7ac877c2405c17c1e1e5c Mon Sep 17 00:00:00 2001 From: Stephanie Date: Tue, 27 Nov 2018 14:59:01 -0800 Subject: [PATCH 2/6] Request actor creation if no actor location found --- src/ray/raylet/node_manager.cc | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index a0b7526e8713..9dab0f55f45d 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -541,6 +541,13 @@ void NodeManager::HandleActorStateTransition(const ActorID &actor_id, << " already removed from the lineage cache. This is most " "likely due to reconstruction."; } + // Maintain the invariant that if a task is in the + // MethodsWaitingForActorCreation queue, then it is subscribed to its + // respective actor creation task. Since the actor location is now known, + // we can remove the task from the queue and forget its dependency on the + // actor creation task. + RAY_CHECK(task_dependency_manager_.UnsubscribeDependencies( + method.GetTaskSpecification().TaskId())); // The task's uncommitted lineage was already added to the local lineage // cache upon the initial submission, so it's okay to resubmit it with an // empty lineage this time. @@ -1154,6 +1161,14 @@ void NodeManager::SubmitTask(const Task &task, const Lineage &uncommitted_lineag // Keep the task queued until we discover the actor's location. // (See design_docs/task_states.rst for the state transition diagram.) local_queues_.QueueMethodsWaitingForActorCreation({task}); + // The actor has not yet been created and may have failed. To make sure + // that the actor is eventually recreated, we maintain the invariant that + // if a task is in the MethodsWaitingForActorCreation queue, then it is + // subscribed to its respective actor creation task. Once the actor has + // been created and this method removed from the waiting queue, the + // caller must make the corresponding call to UnsubscribeDependencies. + task_dependency_manager_.SubscribeDependencies(spec.TaskId(), + {spec.ActorCreationDummyObjectId()}); // Mark the task as pending. It will be canceled once we discover the // actor's location and either execute the task ourselves or forward it // to another node. @@ -1431,7 +1446,8 @@ void NodeManager::FinishAssignedTask(Worker &worker) { // Publish the actor creation event to all other nodes so that methods for // the actor will be forwarded directly to this node. - RAY_CHECK(actor_registry_.find(actor_id) == actor_registry_.end()); + RAY_CHECK(actor_registry_.find(actor_id) == actor_registry_.end()) + << "Created an actor that already exists"; auto actor_data = std::make_shared(); actor_data->actor_id = actor_id.binary(); actor_data->actor_creation_dummy_object_id = @@ -1447,6 +1463,10 @@ void NodeManager::FinishAssignedTask(Worker &worker) { // index in the log should succeed. auto failure_callback = [](gcs::AsyncGcsClient *client, const ActorID &id, const ActorTableDataT &data) { + // TODO(swang): Instead of making this a fatal check, we could just kill + // the duplicate actor process. If we do this, we must make sure to + // either resubmit the tasks that went to the duplicate actor, or wait + // for success before handling the actor state transition to ALIVE. RAY_LOG(FATAL) << "Failed to update state to ALIVE for actor " << id; }; RAY_CHECK_OK(gcs_client_->actor_table().AppendAt( From 608816efacc92075929d3d9306e734d0a9a6c2e4 Mon Sep 17 00:00:00 2001 From: Stephanie Date: Tue, 27 Nov 2018 15:13:38 -0800 Subject: [PATCH 3/6] Comments --- test/component_failures_test.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/test/component_failures_test.py b/test/component_failures_test.py index 38e21bfe4d5e..42e621353c3f 100644 --- a/test/component_failures_test.py +++ b/test/component_failures_test.py @@ -393,16 +393,24 @@ def ping(self): children = [Child.remote(death_probability) for _ in range(num_children)] while len(cluster.list_all_nodes()) > 1: for j in range(3): + # Submit some tasks on the actors. About half of the actors will + # fail. children_out = [child.ping.remote() for child in children] + # Wait for all the tasks to complete. This should trigger + # reconstruction for any actor creation tasks that were forwarded + # to nodes that then failed. ready, _ = ray.wait( children_out, num_returns=len(children_out), timeout=30000) assert len(ready) == len(children_out) + # Replace any actors that died. for i, out in enumerate(children_out): try: ray.get(out) except ray.worker.RayGetError: children[i] = Child.remote(death_probability) + # Remove a node. Any actor creation tasks that were forwarded to this + # node must be reconstructed. cluster.remove_node(cluster.list_all_nodes()[-1]) From a5e0055204fa4987c0559167d398c414f437adf8 Mon Sep 17 00:00:00 2001 From: Stephanie Date: Tue, 27 Nov 2018 15:36:21 -0800 Subject: [PATCH 4/6] Address comments --- src/ray/raylet/node_manager.cc | 13 +++++++------ test/component_failures_test.py | 5 ++--- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 9dab0f55f45d..04095011bbd8 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -543,9 +543,9 @@ void NodeManager::HandleActorStateTransition(const ActorID &actor_id, } // Maintain the invariant that if a task is in the // MethodsWaitingForActorCreation queue, then it is subscribed to its - // respective actor creation task. Since the actor location is now known, - // we can remove the task from the queue and forget its dependency on the - // actor creation task. + // respective actor creation task and that task only. Since the actor + // location is now known, we can remove the task from the queue and + // forget its dependency on the actor creation task. RAY_CHECK(task_dependency_manager_.UnsubscribeDependencies( method.GetTaskSpecification().TaskId())); // The task's uncommitted lineage was already added to the local lineage @@ -1164,9 +1164,10 @@ void NodeManager::SubmitTask(const Task &task, const Lineage &uncommitted_lineag // The actor has not yet been created and may have failed. To make sure // that the actor is eventually recreated, we maintain the invariant that // if a task is in the MethodsWaitingForActorCreation queue, then it is - // subscribed to its respective actor creation task. Once the actor has - // been created and this method removed from the waiting queue, the - // caller must make the corresponding call to UnsubscribeDependencies. + // subscribed to its respective actor creation task and that task only. + // Once the actor has been created and this method removed from the + // waiting queue, the caller must make the corresponding call to + // UnsubscribeDependencies. task_dependency_manager_.SubscribeDependencies(spec.TaskId(), {spec.ActorCreationDummyObjectId()}); // Mark the task as pending. It will be canceled once we discover the diff --git a/test/component_failures_test.py b/test/component_failures_test.py index 42e621353c3f..21a5e5ce5884 100644 --- a/test/component_failures_test.py +++ b/test/component_failures_test.py @@ -38,15 +38,14 @@ def shutdown_only(): @pytest.fixture def ray_start_cluster(): - num_workers = 8 node_args = { - "resources": dict(CPU=num_workers), + "resources": dict(CPU=8), "_internal_config": json.dumps({ "initial_reconstruction_timeout_milliseconds": 1000, "num_heartbeats_timeout": 10 }) } - # Start with 4 workers and 4 cores. + # Start with 4 worker nodes and 8 cores each. g = Cluster(initialize_head=True, connect=True, head_node_args=node_args) workers = [] for _ in range(4): From b9506c0d28e852766020919f73c50c257190631a Mon Sep 17 00:00:00 2001 From: Stephanie Date: Tue, 27 Nov 2018 19:06:30 -0800 Subject: [PATCH 5/6] Increase test timeout --- test/component_failures_test.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/test/component_failures_test.py b/test/component_failures_test.py index 21a5e5ce5884..fd09a17599cf 100644 --- a/test/component_failures_test.py +++ b/test/component_failures_test.py @@ -395,11 +395,13 @@ def ping(self): # Submit some tasks on the actors. About half of the actors will # fail. children_out = [child.ping.remote() for child in children] - # Wait for all the tasks to complete. This should trigger + # Wait a while for all the tasks to complete. This should trigger # reconstruction for any actor creation tasks that were forwarded # to nodes that then failed. ready, _ = ray.wait( - children_out, num_returns=len(children_out), timeout=30000) + children_out, + num_returns=len(children_out), + timeout=5 * 60 * 1000) assert len(ready) == len(children_out) # Replace any actors that died. From 06ac02aeee65993a1585ab856ebd55f6608e3931 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Thu, 29 Nov 2018 01:10:09 -0800 Subject: [PATCH 6/6] Trigger test