From e136a32969982d6eab78381259f605ec20bc35ce Mon Sep 17 00:00:00 2001 From: Stephanie Date: Fri, 11 May 2018 15:42:07 -0700 Subject: [PATCH] Don't crash on duplicate actor notifications --- src/ray/raylet/node_manager.cc | 46 ++++++++++++++++++++-------------- 1 file changed, 27 insertions(+), 19 deletions(-) diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 82740f1f5985..0400e8b72e20 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -251,31 +251,39 @@ void NodeManager::HandleActorCreation(const ActorID &actor_id, // Register the new actor. ActorRegistration actor_registration(data.back()); + ClientID received_node_manager_id = actor_registration.GetNodeManagerId(); // Extend the frontier to include the actor creation task. NOTE(swang): The // creator of the actor is always assigned nil as the actor handle ID. actor_registration.ExtendFrontier(ActorHandleID::nil(), actor_registration.GetActorCreationDependency()); auto inserted = actor_registry_.emplace(actor_id, std::move(actor_registration)); - RAY_CHECK(inserted.second); - - // Dequeue any methods that were submitted before the actor's location was - // known. - const auto &methods = local_queues_.GetUncreatedActorMethods(); - std::unordered_set created_actor_method_ids; - for (const auto &method : methods) { - if (method.GetTaskSpecification().ActorId() == actor_id) { - created_actor_method_ids.insert(method.GetTaskSpecification().TaskId()); + if (!inserted.second) { + // If we weren't able to insert the actor's location, check that the + // existing entry is the same as the new one. + // TODO(swang): This is not true in the case of failures. + RAY_CHECK(received_node_manager_id == inserted.first->second.GetNodeManagerId()) + << "Actor scheduled on " << inserted.first->second.GetNodeManagerId() + << ", but received notification for " << received_node_manager_id; + } else { + // The actor's location is now known. Dequeue any methods that were + // submitted before the actor's location was known. + const auto &methods = local_queues_.GetUncreatedActorMethods(); + std::unordered_set created_actor_method_ids; + for (const auto &method : methods) { + if (method.GetTaskSpecification().ActorId() == actor_id) { + created_actor_method_ids.insert(method.GetTaskSpecification().TaskId()); + } + } + // Resubmit the methods that were submitted before the actor's location was + // known. + auto created_actor_methods = local_queues_.RemoveTasks(created_actor_method_ids); + for (const auto &method : created_actor_methods) { + lineage_cache_.RemoveWaitingTask(method.GetTaskSpecification().TaskId()); + // The task's uncommitted lineage was already added to the local lineage + // cache upon the initial submission, so it's okay to resubmit it with an + // empty lineage this time. + SubmitTask(method, Lineage()); } - } - // Resubmit the methods that were submitted before the actor's location was - // known. - auto created_actor_methods = local_queues_.RemoveTasks(created_actor_method_ids); - for (const auto &method : created_actor_methods) { - lineage_cache_.RemoveWaitingTask(method.GetTaskSpecification().TaskId()); - // The task's uncommitted lineage was already added to the local lineage - // cache upon the initial submission, so it's okay to resubmit it with an - // empty lineage this time. - SubmitTask(method, Lineage()); } }