From 36996c8c4517dce5c9edd455b33d5567616bd377 Mon Sep 17 00:00:00 2001 From: joshlee Date: Fri, 17 Oct 2025 22:00:59 +0000 Subject: [PATCH 1/2] Make DrainRaylet + ShutdownRaylet Fault Tolerant Signed-off-by: joshlee --- .../ray/tests/test_raylet_fault_tolerance.py | 37 +++++++++++ src/ray/raylet/node_manager.cc | 6 ++ src/ray/raylet/node_manager.h | 9 ++- src/ray/raylet/tests/node_manager_test.cc | 66 +++++++++++++++++++ src/ray/raylet_rpc_client/raylet_client.cc | 26 ++++---- 5 files changed, 127 insertions(+), 17 deletions(-) diff --git a/python/ray/tests/test_raylet_fault_tolerance.py b/python/ray/tests/test_raylet_fault_tolerance.py index 21fcd1e84a5d..2ea7f18eebfd 100644 --- a/python/ray/tests/test_raylet_fault_tolerance.py +++ b/python/ray/tests/test_raylet_fault_tolerance.py @@ -3,6 +3,8 @@ import pytest import ray +from ray._private.test_utils import wait_for_condition +from ray.core.generated import autoscaler_pb2 from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy @@ -44,5 +46,40 @@ def simple_task_2(): assert ray.get([result_ref1, result_ref2]) == [0, 1] +def test_drain_node_idempotent(monkeypatch, shutdown_only, ray_start_cluster): + # NOTE: not testing response failure since the node is already marked as draining and shuts down gracefully. + monkeypatch.setenv( + "RAY_testing_rpc_failure", + "NodeManagerService.grpc_client.DrainRaylet=1:100:0", + ) + + cluster = ray_start_cluster + worker_node = cluster.add_node(num_cpus=1) + ray.init(address=cluster.address) + + worker_node_id = worker_node.node_id + + gcs_client = ray._raylet.GcsClient(address=cluster.address) + + is_accepted = gcs_client.drain_node( + worker_node_id, + autoscaler_pb2.DrainNodeReason.DRAIN_NODE_REASON_IDLE_TERMINATION, + "Test drain", + 0, + ) + assert is_accepted + + # After drain is accepted on an idle node since no tasks are running nor primary objects kept + # on that raylet, it should be marked idle and gracefully shut down. + def node_is_dead(): + nodes = ray.nodes() + for node in nodes: + if node["NodeID"] == worker_node_id: + return not node["Alive"] + return True + + wait_for_condition(node_is_dead, timeout=30) + + if __name__ == "__main__": sys.exit(pytest.main(["-sv", __file__])) diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 64edba8825e4..9f320af968f5 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -2060,6 +2060,12 @@ void NodeManager::HandleDrainRaylet(rpc::DrainRayletRequest request, << rpc::autoscaler::DrainNodeReason_Name(request.reason()) << ". Drain reason message: " << request.reason_message(); + if (cluster_resource_scheduler_.GetLocalResourceManager().IsLocalNodeDraining()) { + reply->set_is_accepted(true); + send_reply_callback(Status::OK(), nullptr, nullptr); + return; + } + if (request.reason() == rpc::autoscaler::DrainNodeReason::DRAIN_NODE_REASON_IDLE_TERMINATION) { const bool is_idle = diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index a5a7de57de87..6ae79d694aff 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -302,6 +302,10 @@ class NodeManager : public rpc::NodeManagerServiceHandler, rpc::CancelWorkerLeaseReply *reply, rpc::SendReplyCallback send_reply_callback) override; + void HandleDrainRaylet(rpc::DrainRayletRequest request, + rpc::DrainRayletReply *reply, + rpc::SendReplyCallback send_reply_callback) override; + private: FRIEND_TEST(NodeManagerStaticTest, TestHandleReportWorkerBacklog); @@ -589,11 +593,6 @@ class NodeManager : public rpc::NodeManagerServiceHandler, rpc::ShutdownRayletReply *reply, rpc::SendReplyCallback send_reply_callback) override; - /// Handle a `DrainRaylet` request. - void HandleDrainRaylet(rpc::DrainRayletRequest request, - rpc::DrainRayletReply *reply, - rpc::SendReplyCallback send_reply_callback) override; - void HandleIsLocalWorkerDead(rpc::IsLocalWorkerDeadRequest request, rpc::IsLocalWorkerDeadReply *reply, rpc::SendReplyCallback send_reply_callback) override; diff --git a/src/ray/raylet/tests/node_manager_test.cc b/src/ray/raylet/tests/node_manager_test.cc index 70e7f0ed923a..5a7c8d8d4828 100644 --- a/src/ray/raylet/tests/node_manager_test.cc +++ b/src/ray/raylet/tests/node_manager_test.cc @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include @@ -1264,6 +1265,71 @@ INSTANTIATE_TEST_SUITE_P(NodeManagerDeathVariations, NodeManagerDeathTest, testing::Bool()); +class DrainRayletIdempotencyTest + : public NodeManagerTest, + public ::testing::WithParamInterface< + std::tuple> {}; + +TEST_P(DrainRayletIdempotencyTest, TestHandleDrainRayletIdempotency) { + // drain_reason: the reason for the drain request (PREEMPTION or IDLE_TERMINATION). + // is_node_idle: determines whether the node is idle. + // is_node_idle == true: the node is idle. + // - drain_reason == PREEMPTION: DrainRaylet is expected to accept the request. + // - drain_reason == IDLE_TERMINATION: DrainRaylet is expected to accept the request. + // is_node_idle == false: the node is not idle. + // - drain_reason == PREEMPTION: DrainRaylet is expected to accept the request. + // - drain_reason == IDLE_TERMINATION: DrainRaylet is expected to reject the request. + + auto [drain_reason, is_node_idle] = GetParam(); + if (!is_node_idle) { + cluster_resource_scheduler_->GetLocalResourceManager().SetBusyFootprint( + WorkFootprint::NODE_WORKERS); + } + + // Whether the drain request is expected to be accepted. Note that for preemption we + // must always accept the request regardless of the node's idle state. + bool drain_request_accepted = false; + if (drain_reason == rpc::autoscaler::DrainNodeReason::DRAIN_NODE_REASON_PREEMPTION) { + drain_request_accepted = true; + } else { + drain_request_accepted = is_node_idle; + } + + rpc::DrainRayletRequest request; + request.set_reason(drain_reason); + request.set_reason_message("Test drain"); + request.set_deadline_timestamp_ms(std::numeric_limits::max()); + + rpc::DrainRayletReply reply1; + node_manager_->HandleDrainRaylet( + request, &reply1, [](Status s, std::function, std::function) { + ASSERT_TRUE(s.ok()); + }); + + ASSERT_EQ(reply1.is_accepted(), drain_request_accepted); + ASSERT_EQ(cluster_resource_scheduler_->GetLocalResourceManager().IsLocalNodeDraining(), + drain_request_accepted); + + rpc::DrainRayletReply reply2; + node_manager_->HandleDrainRaylet( + request, &reply2, [&](Status s, std::function, std::function) { + ASSERT_TRUE(s.ok()); + }); + + ASSERT_EQ(reply2.is_accepted(), drain_request_accepted); + ASSERT_EQ(cluster_resource_scheduler_->GetLocalResourceManager().IsLocalNodeDraining(), + drain_request_accepted); +} + +INSTANTIATE_TEST_SUITE_P( + DrainRayletIdempotencyVariations, + DrainRayletIdempotencyTest, + ::testing::Combine( + ::testing::Values( + rpc::autoscaler::DrainNodeReason::DRAIN_NODE_REASON_IDLE_TERMINATION, + rpc::autoscaler::DrainNodeReason::DRAIN_NODE_REASON_PREEMPTION), + ::testing::Bool())); + } // namespace ray::raylet int main(int argc, char **argv) { diff --git a/src/ray/raylet_rpc_client/raylet_client.cc b/src/ray/raylet_rpc_client/raylet_client.cc index f58f32adbe7a..0a004075f323 100644 --- a/src/ray/raylet_rpc_client/raylet_client.cc +++ b/src/ray/raylet_rpc_client/raylet_client.cc @@ -355,12 +355,13 @@ void RayletClient::ShutdownRaylet( const rpc::ClientCallback &callback) { rpc::ShutdownRayletRequest request; request.set_graceful(graceful); - INVOKE_RPC_CALL(NodeManagerService, - ShutdownRaylet, - request, - callback, - grpc_client_, - /*method_timeout_ms*/ -1); + INVOKE_RETRYABLE_RPC_CALL(retryable_grpc_client_, + NodeManagerService, + ShutdownRaylet, + request, + callback, + grpc_client_, + /*method_timeout_ms*/ -1); } void RayletClient::DrainRaylet( @@ -372,12 +373,13 @@ void RayletClient::DrainRaylet( request.set_reason(reason); request.set_reason_message(reason_message); request.set_deadline_timestamp_ms(deadline_timestamp_ms); - INVOKE_RPC_CALL(NodeManagerService, - DrainRaylet, - request, - callback, - grpc_client_, - /*method_timeout_ms*/ -1); + INVOKE_RETRYABLE_RPC_CALL(retryable_grpc_client_, + NodeManagerService, + DrainRaylet, + request, + callback, + grpc_client_, + /*method_timeout_ms*/ -1); } void RayletClient::IsLocalWorkerDead( From 323491ca6d672c9c73d424a77511191e169e5de4 Mon Sep 17 00:00:00 2001 From: joshlee Date: Mon, 20 Oct 2025 17:20:13 +0000 Subject: [PATCH 2/2] reduce timeout Signed-off-by: joshlee --- python/ray/tests/test_raylet_fault_tolerance.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/tests/test_raylet_fault_tolerance.py b/python/ray/tests/test_raylet_fault_tolerance.py index 2ea7f18eebfd..5e802094ed00 100644 --- a/python/ray/tests/test_raylet_fault_tolerance.py +++ b/python/ray/tests/test_raylet_fault_tolerance.py @@ -78,7 +78,7 @@ def node_is_dead(): return not node["Alive"] return True - wait_for_condition(node_is_dead, timeout=30) + wait_for_condition(node_is_dead, timeout=1) if __name__ == "__main__":