Skip to content

Commit ede2bdf

Browse files
Sparks0219elliot-barn
authored andcommitted
[core] Make PinObjectIDs RPC Fault Tolerant (#56443)
Making PinObjectIDs RPC fault tolerant. Added cpp unit tests to verify idempotency. --------- Signed-off-by: joshlee <[email protected]> Signed-off-by: elliot-barn <[email protected]>
1 parent 8a1dc90 commit ede2bdf

File tree

3 files changed

+71
-12
lines changed

3 files changed

+71
-12
lines changed

src/fakes/ray/object_manager/plasma/fake_plasma_client.h

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -71,13 +71,19 @@ class FakePlasmaClient : public PlasmaClientInterface {
7171
Status Get(const std::vector<ObjectID> &object_ids,
7272
int64_t timeout_ms,
7373
std::vector<plasma::ObjectBuffer> *object_buffers) override {
74+
object_buffers->reserve(object_ids.size());
7475
for (const auto &id : object_ids) {
75-
auto &buffers = objects_in_plasma_[id];
76-
plasma::ObjectBuffer shm_buffer{std::make_shared<SharedMemoryBuffer>(
77-
buffers.first.data(), buffers.first.size()),
78-
std::make_shared<SharedMemoryBuffer>(
79-
buffers.second.data(), buffers.second.size())};
80-
object_buffers->emplace_back(shm_buffer);
76+
if (objects_in_plasma_.contains(id)) {
77+
auto &buffers = objects_in_plasma_[id];
78+
plasma::ObjectBuffer shm_buffer{
79+
std::make_shared<SharedMemoryBuffer>(buffers.first.data(),
80+
buffers.first.size()),
81+
std::make_shared<SharedMemoryBuffer>(buffers.second.data(),
82+
buffers.second.size())};
83+
object_buffers->emplace_back(shm_buffer);
84+
} else {
85+
object_buffers->emplace_back(plasma::ObjectBuffer{});
86+
}
8187
}
8288
return Status::OK();
8389
}

src/ray/raylet/tests/node_manager_test.cc

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1071,6 +1071,58 @@ TEST_F(NodeManagerTest, TestHandleCancelWorkerLeaseNoLeaseIdempotent) {
10711071
ASSERT_EQ(reply2.success(), false);
10721072
}
10731073

1074+
class PinObjectIDsIdempotencyTest : public NodeManagerTest,
1075+
public ::testing::WithParamInterface<bool> {};
1076+
1077+
TEST_P(PinObjectIDsIdempotencyTest, TestHandlePinObjectIDsIdempotency) {
1078+
// object_exists: determines whether we add an object to the plasma store which is used
1079+
// for pinning.
1080+
// object_exists == true: an object is added to the plasma store and PinObjectIDs is
1081+
// expected to succeed. A true boolean value is inserted at the index of the object
1082+
// in reply.successes.
1083+
// object_exists == false: an object is not added to the plasma store. PinObjectIDs will
1084+
// still succeed and not return an error when trying to pin a non-existent object, but
1085+
// will instead at the index of the object in reply.successes insert a false
1086+
// boolean value.
1087+
const bool object_exists = GetParam();
1088+
ObjectID id = ObjectID::FromRandom();
1089+
1090+
if (object_exists) {
1091+
rpc::Address owner_addr;
1092+
plasma::flatbuf::ObjectSource source = plasma::flatbuf::ObjectSource::CreatedByWorker;
1093+
RAY_UNUSED(mock_store_client_->TryCreateImmediately(
1094+
id, owner_addr, 1024, nullptr, 1024, nullptr, source, 0));
1095+
}
1096+
1097+
rpc::PinObjectIDsRequest pin_request;
1098+
pin_request.add_object_ids(id.Binary());
1099+
1100+
rpc::PinObjectIDsReply reply1;
1101+
node_manager_->HandlePinObjectIDs(
1102+
pin_request,
1103+
&reply1,
1104+
[](Status s, std::function<void()> success, std::function<void()> failure) {});
1105+
1106+
int64_t primary_bytes = local_object_manager_->GetPrimaryBytes();
1107+
rpc::PinObjectIDsReply reply2;
1108+
node_manager_->HandlePinObjectIDs(
1109+
pin_request,
1110+
&reply2,
1111+
[](Status s, std::function<void()> success, std::function<void()> failure) {});
1112+
1113+
// For each invocation of HandlePinObjectIDs, we expect the size of reply.successes and
1114+
// the boolean values it contains to not change.
1115+
EXPECT_EQ(reply1.successes_size(), 1);
1116+
EXPECT_EQ(reply1.successes(0), object_exists);
1117+
EXPECT_EQ(reply2.successes_size(), 1);
1118+
EXPECT_EQ(reply2.successes(0), object_exists);
1119+
EXPECT_EQ(local_object_manager_->GetPrimaryBytes(), primary_bytes);
1120+
}
1121+
1122+
INSTANTIATE_TEST_SUITE_P(PinObjectIDsIdempotencyVariations,
1123+
PinObjectIDsIdempotencyTest,
1124+
testing::Bool());
1125+
10741126
} // namespace ray::raylet
10751127

10761128
int main(int argc, char **argv) {

src/ray/rpc/raylet/raylet_client.cc

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -338,12 +338,13 @@ void RayletClient::PinObjectIDs(
338338
pins_in_flight_--;
339339
callback(status, std::move(reply));
340340
};
341-
INVOKE_RPC_CALL(NodeManagerService,
342-
PinObjectIDs,
343-
request,
344-
rpc_callback,
345-
grpc_client_,
346-
/*method_timeout_ms*/ -1);
341+
INVOKE_RETRYABLE_RPC_CALL(retryable_grpc_client_,
342+
NodeManagerService,
343+
PinObjectIDs,
344+
request,
345+
rpc_callback,
346+
grpc_client_,
347+
/*method_timeout_ms*/ -1);
347348
}
348349

349350
void RayletClient::ShutdownRaylet(

0 commit comments

Comments
 (0)