Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions python/ray/tests/test_raylet_fault_tolerance.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import pytest

import ray
from ray._private.test_utils import wait_for_condition
from ray.core.generated import autoscaler_pb2
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy


Expand Down Expand Up @@ -44,5 +46,40 @@ def simple_task_2():
assert ray.get([result_ref1, result_ref2]) == [0, 1]


def test_drain_node_idempotent(monkeypatch, shutdown_only, ray_start_cluster):
# NOTE: not testing response failure since the node is already marked as draining and shuts down gracefully.
monkeypatch.setenv(
"RAY_testing_rpc_failure",
"NodeManagerService.grpc_client.DrainRaylet=1:100:0",
)

cluster = ray_start_cluster
worker_node = cluster.add_node(num_cpus=1)
ray.init(address=cluster.address)

worker_node_id = worker_node.node_id

gcs_client = ray._raylet.GcsClient(address=cluster.address)

is_accepted = gcs_client.drain_node(
worker_node_id,
autoscaler_pb2.DrainNodeReason.DRAIN_NODE_REASON_IDLE_TERMINATION,
"Test drain",
0,
)
assert is_accepted

# After drain is accepted on an idle node since no tasks are running nor primary objects kept
# on that raylet, it should be marked idle and gracefully shut down.
def node_is_dead():
nodes = ray.nodes()
for node in nodes:
if node["NodeID"] == worker_node_id:
return not node["Alive"]
return True

wait_for_condition(node_is_dead, timeout=30)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how long does this test take to finish?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

timeout=30 is no bueno! maybe need to tune some timeout env var?

Copy link
Contributor Author

@Sparks0219 Sparks0219 Oct 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Total test takes around 2.5 - 3 seconds on avg to complete? The wait_for_condition itself though is only a millisecond or two, looks like 30 sec even as a failsafe was a bit excessive 😅. I'll reduce it down to 1 sec.



if __name__ == "__main__":
sys.exit(pytest.main(["-sv", __file__]))
6 changes: 6 additions & 0 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2060,6 +2060,12 @@ void NodeManager::HandleDrainRaylet(rpc::DrainRayletRequest request,
<< rpc::autoscaler::DrainNodeReason_Name(request.reason())
<< ". Drain reason message: " << request.reason_message();

if (cluster_resource_scheduler_.GetLocalResourceManager().IsLocalNodeDraining()) {
reply->set_is_accepted(true);
send_reply_callback(Status::OK(), nullptr, nullptr);
return;
}

if (request.reason() ==
rpc::autoscaler::DrainNodeReason::DRAIN_NODE_REASON_IDLE_TERMINATION) {
const bool is_idle =
Expand Down
9 changes: 4 additions & 5 deletions src/ray/raylet/node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,10 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
rpc::CancelWorkerLeaseReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

void HandleDrainRaylet(rpc::DrainRayletRequest request,
rpc::DrainRayletReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

private:
FRIEND_TEST(NodeManagerStaticTest, TestHandleReportWorkerBacklog);

Expand Down Expand Up @@ -589,11 +593,6 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
rpc::ShutdownRayletReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

/// Handle a `DrainRaylet` request.
void HandleDrainRaylet(rpc::DrainRayletRequest request,
rpc::DrainRayletReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

void HandleIsLocalWorkerDead(rpc::IsLocalWorkerDeadRequest request,
rpc::IsLocalWorkerDeadReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
Expand Down
66 changes: 66 additions & 0 deletions src/ray/raylet/tests/node_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <cstdint>
#include <memory>
#include <string>
#include <tuple>
#include <unordered_map>
#include <utility>
#include <vector>
Expand Down Expand Up @@ -1264,6 +1265,71 @@ INSTANTIATE_TEST_SUITE_P(NodeManagerDeathVariations,
NodeManagerDeathTest,
testing::Bool());

class DrainRayletIdempotencyTest
: public NodeManagerTest,
public ::testing::WithParamInterface<
std::tuple<rpc::autoscaler::DrainNodeReason, bool>> {};

TEST_P(DrainRayletIdempotencyTest, TestHandleDrainRayletIdempotency) {
// drain_reason: the reason for the drain request (PREEMPTION or IDLE_TERMINATION).
// is_node_idle: determines whether the node is idle.
// is_node_idle == true: the node is idle.
// - drain_reason == PREEMPTION: DrainRaylet is expected to accept the request.
// - drain_reason == IDLE_TERMINATION: DrainRaylet is expected to accept the request.
// is_node_idle == false: the node is not idle.
// - drain_reason == PREEMPTION: DrainRaylet is expected to accept the request.
// - drain_reason == IDLE_TERMINATION: DrainRaylet is expected to reject the request.

auto [drain_reason, is_node_idle] = GetParam();
if (!is_node_idle) {
cluster_resource_scheduler_->GetLocalResourceManager().SetBusyFootprint(
WorkFootprint::NODE_WORKERS);
}

// Whether the drain request is expected to be accepted. Note that for preemption we
// must always accept the request regardless of the node's idle state.
bool drain_request_accepted = false;
if (drain_reason == rpc::autoscaler::DrainNodeReason::DRAIN_NODE_REASON_PREEMPTION) {
drain_request_accepted = true;
} else {
drain_request_accepted = is_node_idle;
}

rpc::DrainRayletRequest request;
request.set_reason(drain_reason);
request.set_reason_message("Test drain");
request.set_deadline_timestamp_ms(std::numeric_limits<int64_t>::max());

rpc::DrainRayletReply reply1;
node_manager_->HandleDrainRaylet(
request, &reply1, [](Status s, std::function<void()>, std::function<void()>) {
ASSERT_TRUE(s.ok());
});

ASSERT_EQ(reply1.is_accepted(), drain_request_accepted);
ASSERT_EQ(cluster_resource_scheduler_->GetLocalResourceManager().IsLocalNodeDraining(),
drain_request_accepted);

rpc::DrainRayletReply reply2;
node_manager_->HandleDrainRaylet(
request, &reply2, [&](Status s, std::function<void()>, std::function<void()>) {
ASSERT_TRUE(s.ok());
});

ASSERT_EQ(reply2.is_accepted(), drain_request_accepted);
ASSERT_EQ(cluster_resource_scheduler_->GetLocalResourceManager().IsLocalNodeDraining(),
drain_request_accepted);
}

INSTANTIATE_TEST_SUITE_P(
DrainRayletIdempotencyVariations,
DrainRayletIdempotencyTest,
::testing::Combine(
::testing::Values(
rpc::autoscaler::DrainNodeReason::DRAIN_NODE_REASON_IDLE_TERMINATION,
rpc::autoscaler::DrainNodeReason::DRAIN_NODE_REASON_PREEMPTION),
::testing::Bool()));

} // namespace ray::raylet

int main(int argc, char **argv) {
Expand Down
26 changes: 14 additions & 12 deletions src/ray/raylet_rpc_client/raylet_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -355,12 +355,13 @@ void RayletClient::ShutdownRaylet(
const rpc::ClientCallback<rpc::ShutdownRayletReply> &callback) {
rpc::ShutdownRayletRequest request;
request.set_graceful(graceful);
INVOKE_RPC_CALL(NodeManagerService,
ShutdownRaylet,
request,
callback,
grpc_client_,
/*method_timeout_ms*/ -1);
INVOKE_RETRYABLE_RPC_CALL(retryable_grpc_client_,
NodeManagerService,
ShutdownRaylet,
request,
callback,
grpc_client_,
/*method_timeout_ms*/ -1);
}

void RayletClient::DrainRaylet(
Expand All @@ -372,12 +373,13 @@ void RayletClient::DrainRaylet(
request.set_reason(reason);
request.set_reason_message(reason_message);
request.set_deadline_timestamp_ms(deadline_timestamp_ms);
INVOKE_RPC_CALL(NodeManagerService,
DrainRaylet,
request,
callback,
grpc_client_,
/*method_timeout_ms*/ -1);
INVOKE_RETRYABLE_RPC_CALL(retryable_grpc_client_,
NodeManagerService,
DrainRaylet,
request,
callback,
grpc_client_,
/*method_timeout_ms*/ -1);
}

void RayletClient::IsLocalWorkerDead(
Expand Down