From b2189897d6c11ea19fa33816bed71126ca3a5cb0 Mon Sep 17 00:00:00 2001 From: joshlee Date: Thu, 11 Sep 2025 05:25:42 +0000 Subject: [PATCH 1/7] Make PinObjectIDs RPC Fault Tolerant Signed-off-by: joshlee --- src/ray/raylet/tests/node_manager_test.cc | 71 +++++++++++++++++++++-- src/ray/rpc/raylet/raylet_client.cc | 13 +++-- 2 files changed, 72 insertions(+), 12 deletions(-) diff --git a/src/ray/raylet/tests/node_manager_test.cc b/src/ray/raylet/tests/node_manager_test.cc index 6b9469be35c5..ce90b5a1513b 100644 --- a/src/ray/raylet/tests/node_manager_test.cc +++ b/src/ray/raylet/tests/node_manager_test.cc @@ -134,13 +134,19 @@ class FakePlasmaClient : public plasma::PlasmaClientInterface { Status Get(const std::vector &object_ids, int64_t timeout_ms, std::vector *object_buffers) override { + object_buffers->reserve(object_ids.size()); for (const auto &id : object_ids) { - auto &buffers = objects_in_plasma_[id]; - plasma::ObjectBuffer shm_buffer{std::make_shared( - buffers.first.data(), buffers.first.size()), - std::make_shared( - buffers.second.data(), buffers.second.size())}; - object_buffers->emplace_back(shm_buffer); + if (objects_in_plasma_.contains(id)) { + auto &buffers = objects_in_plasma_[id]; + plasma::ObjectBuffer shm_buffer{ + std::make_shared(buffers.first.data(), + buffers.first.size()), + std::make_shared(buffers.second.data(), + buffers.second.size())}; + object_buffers->emplace_back(shm_buffer); + } else { + object_buffers->emplace_back(plasma::ObjectBuffer{}); + } } return Status::OK(); } @@ -1158,6 +1164,59 @@ TEST_F(NodeManagerTest, TestHandleCancelWorkerLeaseNoLeaseIdempotent) { ASSERT_EQ(reply2.success(), false); } +TEST_F(NodeManagerTest, TestHandlePinObjectIDsHasPlasmaObjectIdempotency) { + rpc::Address owner_addr; + plasma::flatbuf::ObjectSource source = plasma::flatbuf::ObjectSource::CreatedByWorker; + ObjectID id = ObjectID::FromRandom(); + + RAY_UNUSED(mock_store_client_->TryCreateImmediately( + id, owner_addr, 1024, nullptr, 1024, nullptr, source, 0)); + + rpc::PinObjectIDsRequest pin_request; + pin_request.add_object_ids(id.Binary()); + + rpc::PinObjectIDsReply reply1; + node_manager_->HandlePinObjectIDs( + pin_request, + &reply1, + [](Status s, std::function success, std::function failure) {}); + + rpc::PinObjectIDsReply reply2; + node_manager_->HandlePinObjectIDs( + pin_request, + &reply2, + [](Status s, std::function success, std::function failure) {}); + + EXPECT_EQ(reply1.successes_size(), 1); + EXPECT_TRUE(reply1.successes(0)); + EXPECT_EQ(reply2.successes_size(), 1); + EXPECT_TRUE(reply2.successes(0)); +} + +TEST_F(NodeManagerTest, TestHandlePinObjectIDsNoPlasmaObjectIdempotency) { + ObjectID id = ObjectID::FromRandom(); + + rpc::PinObjectIDsRequest pin_request; + pin_request.add_object_ids(id.Binary()); + + rpc::PinObjectIDsReply reply1; + node_manager_->HandlePinObjectIDs( + pin_request, + &reply1, + [](Status s, std::function success, std::function failure) {}); + + rpc::PinObjectIDsReply reply2; + node_manager_->HandlePinObjectIDs( + pin_request, + &reply2, + [](Status s, std::function success, std::function failure) {}); + + EXPECT_EQ(reply1.successes_size(), 1); + EXPECT_FALSE(reply1.successes(0)); + EXPECT_EQ(reply2.successes_size(), 1); + EXPECT_FALSE(reply2.successes(0)); +} + } // namespace ray::raylet int main(int argc, char **argv) { diff --git a/src/ray/rpc/raylet/raylet_client.cc b/src/ray/rpc/raylet/raylet_client.cc index 622271f4a30a..13e931b885a9 100644 --- a/src/ray/rpc/raylet/raylet_client.cc +++ b/src/ray/rpc/raylet/raylet_client.cc @@ -338,12 +338,13 @@ void RayletClient::PinObjectIDs( pins_in_flight_--; callback(status, std::move(reply)); }; - INVOKE_RPC_CALL(NodeManagerService, - PinObjectIDs, - request, - rpc_callback, - grpc_client_, - /*method_timeout_ms*/ -1); + INVOKE_RETRYABLE_RPC_CALL(retryable_grpc_client_, + NodeManagerService, + PinObjectIDs, + request, + rpc_callback, + grpc_client_, + /*method_timeout_ms*/ -1); } void RayletClient::ShutdownRaylet( From 9c7b2ca737c0194cc2f8d581dbd3db5cd47c5fe2 Mon Sep 17 00:00:00 2001 From: joshlee Date: Thu, 11 Sep 2025 22:40:23 +0000 Subject: [PATCH 2/7] Creating deterministic python integration test Signed-off-by: joshlee --- .../ray/tests/test_raylet_fault_tolerance.py | 58 +++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/python/ray/tests/test_raylet_fault_tolerance.py b/python/ray/tests/test_raylet_fault_tolerance.py index 21fcd1e84a5d..80a86389ebe5 100644 --- a/python/ray/tests/test_raylet_fault_tolerance.py +++ b/python/ray/tests/test_raylet_fault_tolerance.py @@ -1,5 +1,6 @@ import sys +import numpy as np import pytest import ray @@ -44,5 +45,62 @@ def simple_task_2(): assert ray.get([result_ref1, result_ref2]) == [0, 1] +@pytest.mark.parametrize("deterministic_failure", ["request", "response"]) +def test_pin_object_ids_idempotent( + monkeypatch, shutdown_only, deterministic_failure, ray_start_cluster_head +): + monkeypatch.setenv( + "RAY_testing_rpc_failure", + "NodeManagerService.grpc_client.PinObjectIDs=1:" + + ("100:0" if deterministic_failure == "request" else "0:100"), + ) + + cluster = ray_start_cluster_head + remote_node_1 = cluster.add_node( + num_cpus=1, + object_store_memory=200 * 1024 * 1024, + ) + remote_node_2 = cluster.add_node( + num_cpus=1, + object_store_memory=200 * 1024 * 1024, + ) + + # Max retries is 0 to prevent object reconstruction and force an ObjectLostError to occur when eviction happens + @ray.remote(max_retries=0) + def create_big_object(): + return np.zeros(150 * 1024 * 1024) + + @ray.remote(max_retries=0) + def move_big_object_ref(big_object_ref_list): + ray.get(big_object_ref_list[0]) + return "ok" + + big_object_ref = create_big_object.options( + scheduling_strategy=NodeAffinitySchedulingStrategy( + node_id=remote_node_1.node_id, soft=False + ) + ).remote() + result_ref = move_big_object_ref.options( + scheduling_strategy=NodeAffinitySchedulingStrategy( + node_id=remote_node_2.node_id, soft=False + ) + ).remote([big_object_ref]) + assert ray.get(result_ref) == "ok" + + # Kill remote_node_1 so that the secondary copy on remote_node_2 is pinned + cluster.remove_node(remote_node_1) + + # Create memory pressure on remote_node_2 so that the object is spilled + memory_pressure_ref = create_big_object.options( + scheduling_strategy=NodeAffinitySchedulingStrategy( + node_id=remote_node_2.node_id, soft=False + ) + ).remote() + ray.get(memory_pressure_ref) + # If the object was not pinned, it would be evicted and we would get an ObjectLostError. + # A successful get means that the object was spilled instead meaning it was pinned. + ray.get(big_object_ref) + + if __name__ == "__main__": sys.exit(pytest.main(["-sv", __file__])) From 0935b791c68fb6d044609ecd802c0303271cc1ab Mon Sep 17 00:00:00 2001 From: joshlee Date: Thu, 11 Sep 2025 22:46:18 +0000 Subject: [PATCH 3/7] Cleaning up cpp tests Signed-off-by: joshlee --- src/ray/raylet/tests/node_manager_test.cc | 53 ----------------------- 1 file changed, 53 deletions(-) diff --git a/src/ray/raylet/tests/node_manager_test.cc b/src/ray/raylet/tests/node_manager_test.cc index ce90b5a1513b..faae84a01c78 100644 --- a/src/ray/raylet/tests/node_manager_test.cc +++ b/src/ray/raylet/tests/node_manager_test.cc @@ -1164,59 +1164,6 @@ TEST_F(NodeManagerTest, TestHandleCancelWorkerLeaseNoLeaseIdempotent) { ASSERT_EQ(reply2.success(), false); } -TEST_F(NodeManagerTest, TestHandlePinObjectIDsHasPlasmaObjectIdempotency) { - rpc::Address owner_addr; - plasma::flatbuf::ObjectSource source = plasma::flatbuf::ObjectSource::CreatedByWorker; - ObjectID id = ObjectID::FromRandom(); - - RAY_UNUSED(mock_store_client_->TryCreateImmediately( - id, owner_addr, 1024, nullptr, 1024, nullptr, source, 0)); - - rpc::PinObjectIDsRequest pin_request; - pin_request.add_object_ids(id.Binary()); - - rpc::PinObjectIDsReply reply1; - node_manager_->HandlePinObjectIDs( - pin_request, - &reply1, - [](Status s, std::function success, std::function failure) {}); - - rpc::PinObjectIDsReply reply2; - node_manager_->HandlePinObjectIDs( - pin_request, - &reply2, - [](Status s, std::function success, std::function failure) {}); - - EXPECT_EQ(reply1.successes_size(), 1); - EXPECT_TRUE(reply1.successes(0)); - EXPECT_EQ(reply2.successes_size(), 1); - EXPECT_TRUE(reply2.successes(0)); -} - -TEST_F(NodeManagerTest, TestHandlePinObjectIDsNoPlasmaObjectIdempotency) { - ObjectID id = ObjectID::FromRandom(); - - rpc::PinObjectIDsRequest pin_request; - pin_request.add_object_ids(id.Binary()); - - rpc::PinObjectIDsReply reply1; - node_manager_->HandlePinObjectIDs( - pin_request, - &reply1, - [](Status s, std::function success, std::function failure) {}); - - rpc::PinObjectIDsReply reply2; - node_manager_->HandlePinObjectIDs( - pin_request, - &reply2, - [](Status s, std::function success, std::function failure) {}); - - EXPECT_EQ(reply1.successes_size(), 1); - EXPECT_FALSE(reply1.successes(0)); - EXPECT_EQ(reply2.successes_size(), 1); - EXPECT_FALSE(reply2.successes(0)); -} - } // namespace ray::raylet int main(int argc, char **argv) { From 29ceadaa8f59458767b648e609fc8324e8fcb647 Mon Sep 17 00:00:00 2001 From: joshlee Date: Fri, 12 Sep 2025 22:57:33 +0000 Subject: [PATCH 4/7] Fix pytest for PinObjectIds retries Signed-off-by: joshlee --- .../ray/tests/test_raylet_fault_tolerance.py | 32 +++++++++++-------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/python/ray/tests/test_raylet_fault_tolerance.py b/python/ray/tests/test_raylet_fault_tolerance.py index 80a86389ebe5..4fe0d019c817 100644 --- a/python/ray/tests/test_raylet_fault_tolerance.py +++ b/python/ray/tests/test_raylet_fault_tolerance.py @@ -45,17 +45,21 @@ def simple_task_2(): assert ray.get([result_ref1, result_ref2]) == [0, 1] -@pytest.mark.parametrize("deterministic_failure", ["request", "response"]) -def test_pin_object_ids_idempotent( - monkeypatch, shutdown_only, deterministic_failure, ray_start_cluster_head -): - monkeypatch.setenv( - "RAY_testing_rpc_failure", - "NodeManagerService.grpc_client.PinObjectIDs=1:" - + ("100:0" if deterministic_failure == "request" else "0:100"), - ) - - cluster = ray_start_cluster_head +@pytest.mark.parametrize( + "ray_start_cluster_head_with_env_vars", + [ + { + "env_vars": { + "RAY_testing_rpc_failure": "NodeManagerService.grpc_client.PinObjectIDs=1:100:0", + # Need to reduce this from 1 second otherwise the object will be evicted before the retry is received and pins the object + "RAY_grpc_client_check_connection_status_interval_milliseconds": "0", + }, + } + ], + indirect=True, +) +def test_pin_object_ids_idempotent(shutdown_only, ray_start_cluster_head_with_env_vars): + cluster = ray_start_cluster_head_with_env_vars remote_node_1 = cluster.add_node( num_cpus=1, object_store_memory=200 * 1024 * 1024, @@ -89,8 +93,8 @@ def move_big_object_ref(big_object_ref_list): # Kill remote_node_1 so that the secondary copy on remote_node_2 is pinned cluster.remove_node(remote_node_1) - - # Create memory pressure on remote_node_2 so that the object is spilled + # Create memory pressure on remote_node_2 so that the object is spilled if + # pinned correctly or evicted if not. memory_pressure_ref = create_big_object.options( scheduling_strategy=NodeAffinitySchedulingStrategy( node_id=remote_node_2.node_id, soft=False @@ -98,7 +102,7 @@ def move_big_object_ref(big_object_ref_list): ).remote() ray.get(memory_pressure_ref) # If the object was not pinned, it would be evicted and we would get an ObjectLostError. - # A successful get means that the object was spilled instead meaning it was pinned. + # A successful get means that the object was spilled meaning it was pinned successfully. ray.get(big_object_ref) From 02ee84c3d5df9bff73c374bce251528bc7856543 Mon Sep 17 00:00:00 2001 From: joshlee Date: Sun, 14 Sep 2025 06:05:40 +0000 Subject: [PATCH 5/7] Addressing comments Signed-off-by: joshlee --- src/ray/raylet/tests/node_manager_test.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/ray/raylet/tests/node_manager_test.cc b/src/ray/raylet/tests/node_manager_test.cc index 65f80001cb00..e2bbee3272d0 100644 --- a/src/ray/raylet/tests/node_manager_test.cc +++ b/src/ray/raylet/tests/node_manager_test.cc @@ -1090,6 +1090,7 @@ TEST_P(PinObjectIDsIdempotencyTest, TestHandlePinObjectIDsIdempotency) { &reply1, [](Status s, std::function success, std::function failure) {}); + int64_t primary_bytes = local_object_manager_->GetPrimaryBytes(); rpc::PinObjectIDsReply reply2; node_manager_->HandlePinObjectIDs( pin_request, @@ -1100,6 +1101,7 @@ TEST_P(PinObjectIDsIdempotencyTest, TestHandlePinObjectIDsIdempotency) { EXPECT_EQ(reply1.successes(0), object_exists); EXPECT_EQ(reply2.successes_size(), 1); EXPECT_EQ(reply2.successes(0), object_exists); + EXPECT_EQ(local_object_manager_->GetPrimaryBytes(), primary_bytes); } INSTANTIATE_TEST_SUITE_P(PinObjectIDsIdempotencyVariations, From 96efd2ca191f30e9e8648ccdcfa334d3b2d660d1 Mon Sep 17 00:00:00 2001 From: joshlee Date: Mon, 15 Sep 2025 22:48:31 +0000 Subject: [PATCH 6/7] Removing pytest Signed-off-by: joshlee --- .../ray/tests/test_raylet_fault_tolerance.py | 62 ------------------- 1 file changed, 62 deletions(-) diff --git a/python/ray/tests/test_raylet_fault_tolerance.py b/python/ray/tests/test_raylet_fault_tolerance.py index 4fe0d019c817..21fcd1e84a5d 100644 --- a/python/ray/tests/test_raylet_fault_tolerance.py +++ b/python/ray/tests/test_raylet_fault_tolerance.py @@ -1,6 +1,5 @@ import sys -import numpy as np import pytest import ray @@ -45,66 +44,5 @@ def simple_task_2(): assert ray.get([result_ref1, result_ref2]) == [0, 1] -@pytest.mark.parametrize( - "ray_start_cluster_head_with_env_vars", - [ - { - "env_vars": { - "RAY_testing_rpc_failure": "NodeManagerService.grpc_client.PinObjectIDs=1:100:0", - # Need to reduce this from 1 second otherwise the object will be evicted before the retry is received and pins the object - "RAY_grpc_client_check_connection_status_interval_milliseconds": "0", - }, - } - ], - indirect=True, -) -def test_pin_object_ids_idempotent(shutdown_only, ray_start_cluster_head_with_env_vars): - cluster = ray_start_cluster_head_with_env_vars - remote_node_1 = cluster.add_node( - num_cpus=1, - object_store_memory=200 * 1024 * 1024, - ) - remote_node_2 = cluster.add_node( - num_cpus=1, - object_store_memory=200 * 1024 * 1024, - ) - - # Max retries is 0 to prevent object reconstruction and force an ObjectLostError to occur when eviction happens - @ray.remote(max_retries=0) - def create_big_object(): - return np.zeros(150 * 1024 * 1024) - - @ray.remote(max_retries=0) - def move_big_object_ref(big_object_ref_list): - ray.get(big_object_ref_list[0]) - return "ok" - - big_object_ref = create_big_object.options( - scheduling_strategy=NodeAffinitySchedulingStrategy( - node_id=remote_node_1.node_id, soft=False - ) - ).remote() - result_ref = move_big_object_ref.options( - scheduling_strategy=NodeAffinitySchedulingStrategy( - node_id=remote_node_2.node_id, soft=False - ) - ).remote([big_object_ref]) - assert ray.get(result_ref) == "ok" - - # Kill remote_node_1 so that the secondary copy on remote_node_2 is pinned - cluster.remove_node(remote_node_1) - # Create memory pressure on remote_node_2 so that the object is spilled if - # pinned correctly or evicted if not. - memory_pressure_ref = create_big_object.options( - scheduling_strategy=NodeAffinitySchedulingStrategy( - node_id=remote_node_2.node_id, soft=False - ) - ).remote() - ray.get(memory_pressure_ref) - # If the object was not pinned, it would be evicted and we would get an ObjectLostError. - # A successful get means that the object was spilled meaning it was pinned successfully. - ray.get(big_object_ref) - - if __name__ == "__main__": sys.exit(pytest.main(["-sv", __file__])) From 7bb929b70296dd6f0b9eae57b89f07a8890dda27 Mon Sep 17 00:00:00 2001 From: joshlee Date: Tue, 16 Sep 2025 23:09:49 +0000 Subject: [PATCH 7/7] Addressing comments Signed-off-by: joshlee --- src/ray/raylet/tests/node_manager_test.cc | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/ray/raylet/tests/node_manager_test.cc b/src/ray/raylet/tests/node_manager_test.cc index e2bbee3272d0..419de8320dea 100644 --- a/src/ray/raylet/tests/node_manager_test.cc +++ b/src/ray/raylet/tests/node_manager_test.cc @@ -1071,6 +1071,15 @@ class PinObjectIDsIdempotencyTest : public NodeManagerTest, public ::testing::WithParamInterface {}; TEST_P(PinObjectIDsIdempotencyTest, TestHandlePinObjectIDsIdempotency) { + // object_exists: determines whether we add an object to the plasma store which is used + // for pinning. + // object_exists == true: an object is added to the plasma store and PinObjectIDs is + // expected to succeed. A true boolean value is inserted at the index of the object + // in reply.successes. + // object_exists == false: an object is not added to the plasma store. PinObjectIDs will + // still succeed and not return an error when trying to pin a non-existent object, but + // will instead at the index of the object in reply.successes insert a false + // boolean value. const bool object_exists = GetParam(); ObjectID id = ObjectID::FromRandom(); @@ -1097,6 +1106,8 @@ TEST_P(PinObjectIDsIdempotencyTest, TestHandlePinObjectIDsIdempotency) { &reply2, [](Status s, std::function success, std::function failure) {}); + // For each invocation of HandlePinObjectIDs, we expect the size of reply.successes and + // the boolean values it contains to not change. EXPECT_EQ(reply1.successes_size(), 1); EXPECT_EQ(reply1.successes(0), object_exists); EXPECT_EQ(reply2.successes_size(), 1);