Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions python/ray/tests/test_raylet_fault_tolerance.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import pytest

import ray
from ray._private.test_utils import wait_for_condition
from ray.core.generated import autoscaler_pb2
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy


Expand Down Expand Up @@ -44,5 +46,40 @@ def simple_task_2():
assert ray.get([result_ref1, result_ref2]) == [0, 1]


def test_drain_node_idempotent(monkeypatch, shutdown_only, ray_start_cluster):
# NOTE: not testing response failure since the node is already marked as draining and shuts down gracefully.
monkeypatch.setenv(
"RAY_testing_rpc_failure",
"NodeManagerService.grpc_client.DrainRaylet=1:100:0",
)

cluster = ray_start_cluster
worker_node = cluster.add_node(num_cpus=1)
ray.init(address=cluster.address)

worker_node_id = worker_node.node_id

gcs_client = ray._raylet.GcsClient(address=cluster.address)

is_accepted = gcs_client.drain_node(
worker_node_id,
autoscaler_pb2.DrainNodeReason.DRAIN_NODE_REASON_IDLE_TERMINATION,
"Test drain",
0,
)
assert is_accepted

# After drain is accepted on an idle node since no tasks are running nor primary objects kept
# on that raylet, it should be marked idle and gracefully shut down.
def node_is_dead():
nodes = ray.nodes()
for node in nodes:
if node["NodeID"] == worker_node_id:
return not node["Alive"]
return True

wait_for_condition(node_is_dead, timeout=1)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Node Drain Test Timeout Issues

The 1-second timeout in wait_for_condition within test_drain_node_idempotent is likely too aggressive. This integration test involves node draining, graceful shutdown, state propagation, and injected RPC failures, which could lead to test flakiness. This is especially concerning given prior PR discussions about even 30-second timeouts being insufficient.

Fix in Cursor Fix in Web



if __name__ == "__main__":
sys.exit(pytest.main(["-sv", __file__]))
6 changes: 6 additions & 0 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2060,6 +2060,12 @@ void NodeManager::HandleDrainRaylet(rpc::DrainRayletRequest request,
<< rpc::autoscaler::DrainNodeReason_Name(request.reason())
<< ". Drain reason message: " << request.reason_message();

if (cluster_resource_scheduler_.GetLocalResourceManager().IsLocalNodeDraining()) {
reply->set_is_accepted(true);
send_reply_callback(Status::OK(), nullptr, nullptr);
return;
}

if (request.reason() ==
rpc::autoscaler::DrainNodeReason::DRAIN_NODE_REASON_IDLE_TERMINATION) {
const bool is_idle =
Expand Down
9 changes: 4 additions & 5 deletions src/ray/raylet/node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,10 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
rpc::CancelWorkerLeaseReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

void HandleDrainRaylet(rpc::DrainRayletRequest request,
rpc::DrainRayletReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

private:
FRIEND_TEST(NodeManagerStaticTest, TestHandleReportWorkerBacklog);

Expand Down Expand Up @@ -589,11 +593,6 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
rpc::ShutdownRayletReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

/// Handle a `DrainRaylet` request.
void HandleDrainRaylet(rpc::DrainRayletRequest request,
rpc::DrainRayletReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

void HandleIsLocalWorkerDead(rpc::IsLocalWorkerDeadRequest request,
rpc::IsLocalWorkerDeadReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
Expand Down
66 changes: 66 additions & 0 deletions src/ray/raylet/tests/node_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <cstdint>
#include <memory>
#include <string>
#include <tuple>
#include <unordered_map>
#include <utility>
#include <vector>
Expand Down Expand Up @@ -1264,6 +1265,71 @@ INSTANTIATE_TEST_SUITE_P(NodeManagerDeathVariations,
NodeManagerDeathTest,
testing::Bool());

class DrainRayletIdempotencyTest
: public NodeManagerTest,
public ::testing::WithParamInterface<
std::tuple<rpc::autoscaler::DrainNodeReason, bool>> {};

TEST_P(DrainRayletIdempotencyTest, TestHandleDrainRayletIdempotency) {
// drain_reason: the reason for the drain request (PREEMPTION or IDLE_TERMINATION).
// is_node_idle: determines whether the node is idle.
// is_node_idle == true: the node is idle.
// - drain_reason == PREEMPTION: DrainRaylet is expected to accept the request.
// - drain_reason == IDLE_TERMINATION: DrainRaylet is expected to accept the request.
// is_node_idle == false: the node is not idle.
// - drain_reason == PREEMPTION: DrainRaylet is expected to accept the request.
// - drain_reason == IDLE_TERMINATION: DrainRaylet is expected to reject the request.

auto [drain_reason, is_node_idle] = GetParam();
if (!is_node_idle) {
cluster_resource_scheduler_->GetLocalResourceManager().SetBusyFootprint(
WorkFootprint::NODE_WORKERS);
}

// Whether the drain request is expected to be accepted. Note that for preemption we
// must always accept the request regardless of the node's idle state.
bool drain_request_accepted = false;
if (drain_reason == rpc::autoscaler::DrainNodeReason::DRAIN_NODE_REASON_PREEMPTION) {
drain_request_accepted = true;
} else {
drain_request_accepted = is_node_idle;
}

rpc::DrainRayletRequest request;
request.set_reason(drain_reason);
request.set_reason_message("Test drain");
request.set_deadline_timestamp_ms(std::numeric_limits<int64_t>::max());

rpc::DrainRayletReply reply1;
node_manager_->HandleDrainRaylet(
request, &reply1, [](Status s, std::function<void()>, std::function<void()>) {
ASSERT_TRUE(s.ok());
});

ASSERT_EQ(reply1.is_accepted(), drain_request_accepted);
ASSERT_EQ(cluster_resource_scheduler_->GetLocalResourceManager().IsLocalNodeDraining(),
drain_request_accepted);

rpc::DrainRayletReply reply2;
node_manager_->HandleDrainRaylet(
request, &reply2, [&](Status s, std::function<void()>, std::function<void()>) {
ASSERT_TRUE(s.ok());
});

ASSERT_EQ(reply2.is_accepted(), drain_request_accepted);
ASSERT_EQ(cluster_resource_scheduler_->GetLocalResourceManager().IsLocalNodeDraining(),
drain_request_accepted);
}

INSTANTIATE_TEST_SUITE_P(
DrainRayletIdempotencyVariations,
DrainRayletIdempotencyTest,
::testing::Combine(
::testing::Values(
rpc::autoscaler::DrainNodeReason::DRAIN_NODE_REASON_IDLE_TERMINATION,
rpc::autoscaler::DrainNodeReason::DRAIN_NODE_REASON_PREEMPTION),
::testing::Bool()));

} // namespace ray::raylet

int main(int argc, char **argv) {
Expand Down
26 changes: 14 additions & 12 deletions src/ray/raylet_rpc_client/raylet_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -355,12 +355,13 @@ void RayletClient::ShutdownRaylet(
const rpc::ClientCallback<rpc::ShutdownRayletReply> &callback) {
rpc::ShutdownRayletRequest request;
request.set_graceful(graceful);
INVOKE_RPC_CALL(NodeManagerService,
ShutdownRaylet,
request,
callback,
grpc_client_,
/*method_timeout_ms*/ -1);
INVOKE_RETRYABLE_RPC_CALL(retryable_grpc_client_,
NodeManagerService,
ShutdownRaylet,
request,
callback,
grpc_client_,
/*method_timeout_ms*/ -1);
}

void RayletClient::DrainRaylet(
Expand All @@ -372,12 +373,13 @@ void RayletClient::DrainRaylet(
request.set_reason(reason);
request.set_reason_message(reason_message);
request.set_deadline_timestamp_ms(deadline_timestamp_ms);
INVOKE_RPC_CALL(NodeManagerService,
DrainRaylet,
request,
callback,
grpc_client_,
/*method_timeout_ms*/ -1);
INVOKE_RETRYABLE_RPC_CALL(retryable_grpc_client_,
NodeManagerService,
DrainRaylet,
request,
callback,
grpc_client_,
/*method_timeout_ms*/ -1);
}

void RayletClient::IsLocalWorkerDead(
Expand Down