diff --git a/src/ray/object_manager/object_manager.cc b/src/ray/object_manager/object_manager.cc index bb56e1ad998f..2494133d01f6 100644 --- a/src/ray/object_manager/object_manager.cc +++ b/src/ray/object_manager/object_manager.cc @@ -106,6 +106,11 @@ ray::Status ObjectManager::SubscribeObjDeleted( } ray::Status ObjectManager::Pull(const ObjectID &object_id) { + // Check if object is already local. + if (local_objects_.count(object_id) != 0) { + RAY_LOG(ERROR) << object_id << " attempted to pull an object that's already local."; + return ray::Status::OK(); + } return PullGetLocations(object_id); } @@ -131,10 +136,13 @@ ray::Status ObjectManager::PullGetLocations(const ObjectID &object_id) { void ObjectManager::GetLocationsSuccess(const std::vector &client_ids, const ray::ObjectID &object_id) { - RAY_CHECK(!client_ids.empty()); - ClientID client_id = client_ids.front(); - ray::Status status = Pull(object_id, client_id); - RAY_CHECK_OK(status); + if (local_objects_.count(object_id) == 0) { + // Only pull objects that aren't local. + RAY_CHECK(!client_ids.empty()); + ClientID client_id = client_ids.front(); + ray::Status status_code = Pull(object_id, client_id); + RAY_CHECK_OK(status_code); + } } void ObjectManager::GetLocationsFailed(const ObjectID &object_id) { @@ -142,16 +150,21 @@ void ObjectManager::GetLocationsFailed(const ObjectID &object_id) { } ray::Status ObjectManager::Pull(const ObjectID &object_id, const ClientID &client_id) { + // Check if object is already local. + if (local_objects_.count(object_id) != 0) { + RAY_LOG(ERROR) << object_id << " attempted to pull an object that's already local."; + return ray::Status::OK(); + } + // Check if we're pulling from self. + if (client_id == client_id_) { + RAY_LOG(ERROR) << client_id_ << " attempted to pull an object from itself."; + return ray::Status::Invalid("A node cannot pull an object from itself."); + } return PullEstablishConnection(object_id, client_id); }; ray::Status ObjectManager::PullEstablishConnection(const ObjectID &object_id, const ClientID &client_id) { - // Check if object is already local, and client_id is not itself. - if (local_objects_.count(object_id) != 0 || client_id == client_id_) { - return ray::Status::OK(); - } - // Acquire a message connection and send pull request. ray::Status status; std::shared_ptr conn; diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 4214b80ada2f..b98716979e8f 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -378,10 +378,12 @@ void NodeManager::ProcessClientMessage( auto message = flatbuffers::GetRoot(message_data); ObjectID object_id = from_flatbuf(*message->object_id()); RAY_LOG(DEBUG) << "reconstructing object " << object_id; - // TODO(swang): Instead of calling Pull on the object directly, record the - // fact that the blocked task is dependent on this object_id in the task - // dependency manager. - RAY_CHECK_OK(object_manager_.Pull(object_id)); + if (!task_dependency_manager_.CheckObjectLocal(object_id)) { + // TODO(swang): Instead of calling Pull on the object directly, record the + // fact that the blocked task is dependent on this object_id in the task + // dependency manager. + RAY_CHECK_OK(object_manager_.Pull(object_id)); + } // If the blocked client is a worker, and the worker isn't already blocked, // then release any CPU resources that it acquired for its assigned task