Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions src/fakes/ray/object_manager/plasma/fake_plasma_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,19 @@ class FakePlasmaClient : public PlasmaClientInterface {
Status Get(const std::vector<ObjectID> &object_ids,
int64_t timeout_ms,
std::vector<plasma::ObjectBuffer> *object_buffers) override {
object_buffers->reserve(object_ids.size());
for (const auto &id : object_ids) {
auto &buffers = objects_in_plasma_[id];
plasma::ObjectBuffer shm_buffer{std::make_shared<SharedMemoryBuffer>(
buffers.first.data(), buffers.first.size()),
std::make_shared<SharedMemoryBuffer>(
buffers.second.data(), buffers.second.size())};
object_buffers->emplace_back(shm_buffer);
if (objects_in_plasma_.contains(id)) {
auto &buffers = objects_in_plasma_[id];
plasma::ObjectBuffer shm_buffer{
std::make_shared<SharedMemoryBuffer>(buffers.first.data(),
buffers.first.size()),
std::make_shared<SharedMemoryBuffer>(buffers.second.data(),
buffers.second.size())};
object_buffers->emplace_back(shm_buffer);
} else {
object_buffers->emplace_back(plasma::ObjectBuffer{});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the change to emplace in the buffer even if the obj isn't there in fake plasma client?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prod version does this as well:

*out = std::vector<ObjectBuffer>(num_objects);

by resizing the vector then skipping over indices that don't contain. Hence not emplacing caused issues with testing since it varied from the test causing RAY_CHECK failure here:
RAY_CHECK_EQ(object_ids.size(), results.size());

TLDR: it uses these dummy default values to detect what objects are not in plasma

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need the change for this test though?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PinObjectIDs test also test the not happy path where we try to pin an object that doesn't exist which will trigger the RAY_CHECK in node_manager.cc I linked above

}
}
return Status::OK();
}
Expand Down
52 changes: 52 additions & 0 deletions src/ray/raylet/tests/node_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1067,6 +1067,58 @@ TEST_F(NodeManagerTest, TestHandleCancelWorkerLeaseNoLeaseIdempotent) {
ASSERT_EQ(reply2.success(), false);
}

class PinObjectIDsIdempotencyTest : public NodeManagerTest,
public ::testing::WithParamInterface<bool> {};

TEST_P(PinObjectIDsIdempotencyTest, TestHandlePinObjectIDsIdempotency) {
// object_exists: determines whether we add an object to the plasma store which is used
// for pinning.
// object_exists == true: an object is added to the plasma store and PinObjectIDs is
// expected to succeed. A true boolean value is inserted at the index of the object
// in reply.successes.
// object_exists == false: an object is not added to the plasma store. PinObjectIDs will
// still succeed and not return an error when trying to pin a non-existent object, but
// will instead at the index of the object in reply.successes insert a false
// boolean value.
const bool object_exists = GetParam();
ObjectID id = ObjectID::FromRandom();

if (object_exists) {
rpc::Address owner_addr;
plasma::flatbuf::ObjectSource source = plasma::flatbuf::ObjectSource::CreatedByWorker;
RAY_UNUSED(mock_store_client_->TryCreateImmediately(
id, owner_addr, 1024, nullptr, 1024, nullptr, source, 0));
}

rpc::PinObjectIDsRequest pin_request;
pin_request.add_object_ids(id.Binary());

rpc::PinObjectIDsReply reply1;
node_manager_->HandlePinObjectIDs(
pin_request,
&reply1,
[](Status s, std::function<void()> success, std::function<void()> failure) {});

int64_t primary_bytes = local_object_manager_->GetPrimaryBytes();
rpc::PinObjectIDsReply reply2;
node_manager_->HandlePinObjectIDs(
pin_request,
&reply2,
[](Status s, std::function<void()> success, std::function<void()> failure) {});

// For each invocation of HandlePinObjectIDs, we expect the size of reply.successes and
// the boolean values it contains to not change.
EXPECT_EQ(reply1.successes_size(), 1);
EXPECT_EQ(reply1.successes(0), object_exists);
EXPECT_EQ(reply2.successes_size(), 1);
EXPECT_EQ(reply2.successes(0), object_exists);
EXPECT_EQ(local_object_manager_->GetPrimaryBytes(), primary_bytes);
}

INSTANTIATE_TEST_SUITE_P(PinObjectIDsIdempotencyVariations,
PinObjectIDsIdempotencyTest,
testing::Bool());

} // namespace ray::raylet

int main(int argc, char **argv) {
Expand Down
13 changes: 7 additions & 6 deletions src/ray/rpc/raylet/raylet_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -338,12 +338,13 @@ void RayletClient::PinObjectIDs(
pins_in_flight_--;
callback(status, std::move(reply));
};
INVOKE_RPC_CALL(NodeManagerService,
PinObjectIDs,
request,
rpc_callback,
grpc_client_,
/*method_timeout_ms*/ -1);
INVOKE_RETRYABLE_RPC_CALL(retryable_grpc_client_,
NodeManagerService,
PinObjectIDs,
request,
rpc_callback,
grpc_client_,
/*method_timeout_ms*/ -1);
}

void RayletClient::ShutdownRaylet(
Expand Down