diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index a0b7526e8713..04095011bbd8 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -541,6 +541,13 @@ void NodeManager::HandleActorStateTransition(const ActorID &actor_id, << " already removed from the lineage cache. This is most " "likely due to reconstruction."; } + // Maintain the invariant that if a task is in the + // MethodsWaitingForActorCreation queue, then it is subscribed to its + // respective actor creation task and that task only. Since the actor + // location is now known, we can remove the task from the queue and + // forget its dependency on the actor creation task. + RAY_CHECK(task_dependency_manager_.UnsubscribeDependencies( + method.GetTaskSpecification().TaskId())); // The task's uncommitted lineage was already added to the local lineage // cache upon the initial submission, so it's okay to resubmit it with an // empty lineage this time. @@ -1154,6 +1161,15 @@ void NodeManager::SubmitTask(const Task &task, const Lineage &uncommitted_lineag // Keep the task queued until we discover the actor's location. // (See design_docs/task_states.rst for the state transition diagram.) local_queues_.QueueMethodsWaitingForActorCreation({task}); + // The actor has not yet been created and may have failed. To make sure + // that the actor is eventually recreated, we maintain the invariant that + // if a task is in the MethodsWaitingForActorCreation queue, then it is + // subscribed to its respective actor creation task and that task only. + // Once the actor has been created and this method removed from the + // waiting queue, the caller must make the corresponding call to + // UnsubscribeDependencies. + task_dependency_manager_.SubscribeDependencies(spec.TaskId(), + {spec.ActorCreationDummyObjectId()}); // Mark the task as pending. It will be canceled once we discover the // actor's location and either execute the task ourselves or forward it // to another node. @@ -1431,7 +1447,8 @@ void NodeManager::FinishAssignedTask(Worker &worker) { // Publish the actor creation event to all other nodes so that methods for // the actor will be forwarded directly to this node. - RAY_CHECK(actor_registry_.find(actor_id) == actor_registry_.end()); + RAY_CHECK(actor_registry_.find(actor_id) == actor_registry_.end()) + << "Created an actor that already exists"; auto actor_data = std::make_shared(); actor_data->actor_id = actor_id.binary(); actor_data->actor_creation_dummy_object_id = @@ -1447,6 +1464,10 @@ void NodeManager::FinishAssignedTask(Worker &worker) { // index in the log should succeed. auto failure_callback = [](gcs::AsyncGcsClient *client, const ActorID &id, const ActorTableDataT &data) { + // TODO(swang): Instead of making this a fatal check, we could just kill + // the duplicate actor process. If we do this, we must make sure to + // either resubmit the tasks that went to the duplicate actor, or wait + // for success before handling the actor state transition to ALIVE. RAY_LOG(FATAL) << "Failed to update state to ALIVE for actor " << id; }; RAY_CHECK_OK(gcs_client_->actor_table().AppendAt( diff --git a/test/component_failures_test.py b/test/component_failures_test.py index b9d257962120..fd09a17599cf 100644 --- a/test/component_failures_test.py +++ b/test/component_failures_test.py @@ -5,11 +5,14 @@ import os import json import signal +import sys import time +import numpy as np import pytest import ray +from ray.test.cluster_utils import Cluster from ray.test.test_utils import run_string_as_driver_nonblocking @@ -33,6 +36,26 @@ def shutdown_only(): ray.shutdown() +@pytest.fixture +def ray_start_cluster(): + node_args = { + "resources": dict(CPU=8), + "_internal_config": json.dumps({ + "initial_reconstruction_timeout_milliseconds": 1000, + "num_heartbeats_timeout": 10 + }) + } + # Start with 4 worker nodes and 8 cores each. + g = Cluster(initialize_head=True, connect=True, head_node_args=node_args) + workers = [] + for _ in range(4): + workers.append(g.add_node(**node_args)) + g.wait_for_nodes() + yield g + ray.shutdown() + g.shutdown() + + # This test checks that when a worker dies in the middle of a get, the plasma # store and raylet will not die. @pytest.mark.skipif( @@ -347,6 +370,51 @@ def test_plasma_store_failed(): ray.shutdown() +def test_actor_creation_node_failure(ray_start_cluster): + # TODO(swang): Refactor test_raylet_failed, etc to reuse the below code. + cluster = ray_start_cluster + + @ray.remote + class Child(object): + def __init__(self, death_probability): + self.death_probability = death_probability + + def ping(self): + # Exit process with some probability. + exit_chance = np.random.rand() + if exit_chance < self.death_probability: + sys.exit(-1) + + num_children = 100 + # Children actors will die about half the time. + death_probability = 0.5 + + children = [Child.remote(death_probability) for _ in range(num_children)] + while len(cluster.list_all_nodes()) > 1: + for j in range(3): + # Submit some tasks on the actors. About half of the actors will + # fail. + children_out = [child.ping.remote() for child in children] + # Wait a while for all the tasks to complete. This should trigger + # reconstruction for any actor creation tasks that were forwarded + # to nodes that then failed. + ready, _ = ray.wait( + children_out, + num_returns=len(children_out), + timeout=5 * 60 * 1000) + assert len(ready) == len(children_out) + + # Replace any actors that died. + for i, out in enumerate(children_out): + try: + ray.get(out) + except ray.worker.RayGetError: + children[i] = Child.remote(death_probability) + # Remove a node. Any actor creation tasks that were forwarded to this + # node must be reconstructed. + cluster.remove_node(cluster.list_all_nodes()[-1]) + + @pytest.mark.skipif( os.environ.get("RAY_USE_NEW_GCS") == "on", reason="Hanging with new GCS API.")