diff --git a/src/fakes/ray/object_manager/plasma/fake_plasma_client.h b/src/fakes/ray/object_manager/plasma/fake_plasma_client.h index 42030c1ab68a..cf0b30ff52af 100644 --- a/src/fakes/ray/object_manager/plasma/fake_plasma_client.h +++ b/src/fakes/ray/object_manager/plasma/fake_plasma_client.h @@ -71,13 +71,19 @@ class FakePlasmaClient : public PlasmaClientInterface { Status Get(const std::vector &object_ids, int64_t timeout_ms, std::vector *object_buffers) override { + object_buffers->reserve(object_ids.size()); for (const auto &id : object_ids) { - auto &buffers = objects_in_plasma_[id]; - plasma::ObjectBuffer shm_buffer{std::make_shared( - buffers.first.data(), buffers.first.size()), - std::make_shared( - buffers.second.data(), buffers.second.size())}; - object_buffers->emplace_back(shm_buffer); + if (objects_in_plasma_.contains(id)) { + auto &buffers = objects_in_plasma_[id]; + plasma::ObjectBuffer shm_buffer{ + std::make_shared(buffers.first.data(), + buffers.first.size()), + std::make_shared(buffers.second.data(), + buffers.second.size())}; + object_buffers->emplace_back(shm_buffer); + } else { + object_buffers->emplace_back(plasma::ObjectBuffer{}); + } } return Status::OK(); } diff --git a/src/ray/raylet/tests/node_manager_test.cc b/src/ray/raylet/tests/node_manager_test.cc index b05d4c5ac0bd..419de8320dea 100644 --- a/src/ray/raylet/tests/node_manager_test.cc +++ b/src/ray/raylet/tests/node_manager_test.cc @@ -1067,6 +1067,58 @@ TEST_F(NodeManagerTest, TestHandleCancelWorkerLeaseNoLeaseIdempotent) { ASSERT_EQ(reply2.success(), false); } +class PinObjectIDsIdempotencyTest : public NodeManagerTest, + public ::testing::WithParamInterface {}; + +TEST_P(PinObjectIDsIdempotencyTest, TestHandlePinObjectIDsIdempotency) { + // object_exists: determines whether we add an object to the plasma store which is used + // for pinning. + // object_exists == true: an object is added to the plasma store and PinObjectIDs is + // expected to succeed. A true boolean value is inserted at the index of the object + // in reply.successes. + // object_exists == false: an object is not added to the plasma store. PinObjectIDs will + // still succeed and not return an error when trying to pin a non-existent object, but + // will instead at the index of the object in reply.successes insert a false + // boolean value. + const bool object_exists = GetParam(); + ObjectID id = ObjectID::FromRandom(); + + if (object_exists) { + rpc::Address owner_addr; + plasma::flatbuf::ObjectSource source = plasma::flatbuf::ObjectSource::CreatedByWorker; + RAY_UNUSED(mock_store_client_->TryCreateImmediately( + id, owner_addr, 1024, nullptr, 1024, nullptr, source, 0)); + } + + rpc::PinObjectIDsRequest pin_request; + pin_request.add_object_ids(id.Binary()); + + rpc::PinObjectIDsReply reply1; + node_manager_->HandlePinObjectIDs( + pin_request, + &reply1, + [](Status s, std::function success, std::function failure) {}); + + int64_t primary_bytes = local_object_manager_->GetPrimaryBytes(); + rpc::PinObjectIDsReply reply2; + node_manager_->HandlePinObjectIDs( + pin_request, + &reply2, + [](Status s, std::function success, std::function failure) {}); + + // For each invocation of HandlePinObjectIDs, we expect the size of reply.successes and + // the boolean values it contains to not change. + EXPECT_EQ(reply1.successes_size(), 1); + EXPECT_EQ(reply1.successes(0), object_exists); + EXPECT_EQ(reply2.successes_size(), 1); + EXPECT_EQ(reply2.successes(0), object_exists); + EXPECT_EQ(local_object_manager_->GetPrimaryBytes(), primary_bytes); +} + +INSTANTIATE_TEST_SUITE_P(PinObjectIDsIdempotencyVariations, + PinObjectIDsIdempotencyTest, + testing::Bool()); + } // namespace ray::raylet int main(int argc, char **argv) { diff --git a/src/ray/rpc/raylet/raylet_client.cc b/src/ray/rpc/raylet/raylet_client.cc index 622271f4a30a..13e931b885a9 100644 --- a/src/ray/rpc/raylet/raylet_client.cc +++ b/src/ray/rpc/raylet/raylet_client.cc @@ -338,12 +338,13 @@ void RayletClient::PinObjectIDs( pins_in_flight_--; callback(status, std::move(reply)); }; - INVOKE_RPC_CALL(NodeManagerService, - PinObjectIDs, - request, - rpc_callback, - grpc_client_, - /*method_timeout_ms*/ -1); + INVOKE_RETRYABLE_RPC_CALL(retryable_grpc_client_, + NodeManagerService, + PinObjectIDs, + request, + rpc_callback, + grpc_client_, + /*method_timeout_ms*/ -1); } void RayletClient::ShutdownRaylet(