Skip to content
Open
3 changes: 2 additions & 1 deletion src/mock/ray/gcs_client/accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ class MockNodeInfoAccessor : public NodeInfoAccessor {
MOCK_METHOD(
void,
AsyncSubscribeToNodeAddressAndLivenessChange,
(std::function<void(NodeID, const rpc::GcsNodeAddressAndLiveness &)> subscribe,
(std::function<void(NodeID, const rpc::GcsNodeAddressAndLiveness &, const bool)>
subscribe,
StatusCallback done),
(override));
MOCK_METHOD(std::optional<rpc::GcsNodeAddressAndLiveness>,
Expand Down
5 changes: 3 additions & 2 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,8 @@ void CoreWorker::SubscribeToNodeChanges() {
raylet_client_pool = raylet_client_pool_,
core_worker_client_pool = core_worker_client_pool_](
const NodeID &node_id,
const rpc::GcsNodeAddressAndLiveness &data) {
const rpc::GcsNodeAddressAndLiveness &data,
const bool is_initializing) {
if (data.state() == rpc::GcsNodeInfo::DEAD) {
RAY_LOG(INFO).WithField(node_id)
<< "Node failure. All objects pinned on that node will be lost if object "
Expand All @@ -747,7 +748,7 @@ void CoreWorker::SubscribeToNodeChanges() {
auto cluster_size_based_rate_limiter =
dynamic_cast<ClusterSizeBasedLeaseRequestRateLimiter *>(rate_limiter.get());
if (cluster_size_based_rate_limiter != nullptr) {
cluster_size_based_rate_limiter->OnNodeChanges(data);
cluster_size_based_rate_limiter->OnNodeChanges(data, is_initializing);
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -826,8 +826,14 @@ size_t ClusterSizeBasedLeaseRequestRateLimiter::
}

void ClusterSizeBasedLeaseRequestRateLimiter::OnNodeChanges(
const rpc::GcsNodeAddressAndLiveness &data) {
const rpc::GcsNodeAddressAndLiveness &data, const bool is_initializing) {
if (data.state() == rpc::GcsNodeInfo::DEAD) {
if (is_initializing) {
// N.B.: We don't bother decrementing for dead node notifications which were
// received at time of initial subscription as we won't have received an equivalent
// 'alive' notification prior.
return;
}
if (num_alive_nodes_ != 0) {
num_alive_nodes_--;
} else {
Expand Down
7 changes: 4 additions & 3 deletions src/ray/core_worker/task_submission/normal_task_submitter.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ class ClusterSizeBasedLeaseRequestRateLimiter : public LeaseRequestRateLimiter {
public:
explicit ClusterSizeBasedLeaseRequestRateLimiter(size_t min_concurrent_lease_limit);
size_t GetMaxPendingLeaseRequestsPerSchedulingCategory() override;
void OnNodeChanges(const rpc::GcsNodeAddressAndLiveness &data);
void OnNodeChanges(const rpc::GcsNodeAddressAndLiveness &data,
const bool is_initializing = false);

private:
const size_t min_concurrent_lease_cap_;
Expand Down Expand Up @@ -365,14 +366,14 @@ class NormalTaskSubmitter {
// Generators that are currently running and need to be resubmitted.
absl::flat_hash_set<TaskID> generators_to_resubmit_ ABSL_GUARDED_BY(mu_);

// Tasks that have failed but we are waiting for their error cause to decide if they
// Tasks that have failed but were waiting for their error cause to decide if they
// should be retried or permanently failed.
absl::flat_hash_set<TaskID> failed_tasks_pending_failure_cause_ ABSL_GUARDED_BY(mu_);

// Ratelimiter controls the num of pending lease requests.
std::shared_ptr<LeaseRequestRateLimiter> lease_request_rate_limiter_;

// Retries cancelation requests if they were not successful.
// Retries cancellation requests if they were not successful.
instrumented_io_context &io_service_;

ray::observability::MetricInterface &scheduler_placement_time_ms_histogram_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1965,6 +1965,69 @@ TEST(LeaseRequestRateLimiterTest, ClusterSizeBasedLeaseRequestRateLimiter) {
}
}

TEST(LeaseRequestRateLimiterTest,
ClusterSizeBasedLeaseRequestRateLimiterSubscriptionBug) {
// This test examines the behavior of AsyncSubscribeToNodeAddressAndLivenessChange
// relative to the ClusterSizeBasedLeaseRequestRateLimiter.
// AsyncSubscribeToNodeAddressAndLivenessChange on initial subscription will query the
// gcs twice. Once to set up the subscription, the other to bootstrap the initial
// state. when bootstrapping the initial state, a callback is triggered for every node
// in the response, which will contain all nodes currently alive in the cluster and a
// limited number of nodes which had died. Consumers of the api need to be defensive to
// this, as it implies that for a given subscription it is not guaranteed that for every
// dead node notification you receive a previous 'alive' node notification. An example
// scenario where not being defensive to this can cause bugs looks as follows
//
// Scenario:
// 1. Cluster has 3 nodes: node1 (alive), node2 (dead), node3 (alive)
// 2. Client subscribes to node changes
// 3. GetAllNodeAddressAndLiveness returns all 3 nodes (including node2 which is dead)
// 4. Subscription triggers OnNodeChanges for all 3 nodes
// 5. Bug: OnNodeChanges(node2=DEAD) decrements counter even though we never saw
// node2 as ALIVE
// 6. The counter will now say there is only 1 node in the cluster as opposed to the
// correct number which is 2.

rpc::GcsNodeAddressAndLiveness alive_node1;
alive_node1.set_state(rpc::GcsNodeInfo::ALIVE);

rpc::GcsNodeAddressAndLiveness alive_node3;
alive_node3.set_state(rpc::GcsNodeInfo::ALIVE);

rpc::GcsNodeAddressAndLiveness dead_node2;
dead_node2.set_state(rpc::GcsNodeInfo::DEAD);

ClusterSizeBasedLeaseRequestRateLimiter limiter(0);

// Initially the limiter has min_concurrent_lease_limit = 0, no nodes
ASSERT_EQ(limiter.GetMaxPendingLeaseRequestsPerSchedulingCategory(), 0);

// First, we get node1 (alive) - counter should increment to 1
limiter.OnNodeChanges(alive_node1, true);
ASSERT_EQ(limiter.GetMaxPendingLeaseRequestsPerSchedulingCategory(), 1);

// Then we get node2 (dead) - this is the bug!
// We're getting a DEAD notification for a node we never saw as ALIVE
// The counter will try to decrement from 1 to 0
limiter.OnNodeChanges(dead_node2, true);

// BUG: The counter should still be 1 (we have 1 alive node: node1)
// But the implementation will decrement it to 0 because it received a DEAD notification
// This assertion will FAIL, demonstrating the bug:
ASSERT_EQ(limiter.GetMaxPendingLeaseRequestsPerSchedulingCategory(),
1); // Expected: 1 alive node (node1)
// Actual: 0 (because dead_node2 caused incorrect decrement)

// Continue with node3 (alive)
limiter.OnNodeChanges(alive_node3, true);

// Now we should have 2 alive nodes (node1, node3), but due to the bug we have 1
// This assertion will also FAIL:
ASSERT_EQ(limiter.GetMaxPendingLeaseRequestsPerSchedulingCategory(),
2); // Expected: 2 alive nodes (node1, node3)
// Actual: 1 (counter was incorrectly decremented, then incremented once)
}

} // namespace core
} // namespace ray

Expand Down
20 changes: 16 additions & 4 deletions src/ray/gcs_rpc_client/accessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,9 @@ void NodeInfoAccessor::AsyncGetAll(const MultiItemCallback<rpc::GcsNodeInfo> &ca
}

void NodeInfoAccessor::AsyncSubscribeToNodeAddressAndLivenessChange(
std::function<void(NodeID, const rpc::GcsNodeAddressAndLiveness &)> subscribe,
std::function<void(NodeID,
const rpc::GcsNodeAddressAndLiveness &,
const bool is_initializing)> subscribe,
StatusCallback done) {
/**
1. Subscribe to node info
Expand All @@ -319,7 +321,7 @@ void NodeInfoAccessor::AsyncSubscribeToNodeAddressAndLivenessChange(
const Status &status,
std::vector<rpc::GcsNodeAddressAndLiveness> &&node_info_list) {
for (auto &node_info : node_info_list) {
HandleNotification(std::move(node_info));
HandleNotification(std::move(node_info), true);
}
if (done_callback) {
done_callback(status);
Expand Down Expand Up @@ -401,9 +403,11 @@ bool NodeInfoAccessor::IsNodeAlive(const NodeID &node_id) const {
node_iter->second.state() == rpc::GcsNodeInfo::ALIVE;
}

void NodeInfoAccessor::HandleNotification(rpc::GcsNodeAddressAndLiveness &&node_info) {
void NodeInfoAccessor::HandleNotification(rpc::GcsNodeAddressAndLiveness &&node_info,
const bool is_initializing) {
NodeID node_id = NodeID::FromBinary(node_info.node_id());
bool is_alive = (node_info.state() == rpc::GcsNodeInfo::ALIVE);
bool is_alive_to_dead_transition = false;
std::optional<rpc::GcsNodeAddressAndLiveness> node_info_copy_for_callback;
{
absl::MutexLock lock(&node_cache_address_and_liveness_mutex_);
Expand All @@ -418,6 +422,7 @@ void NodeInfoAccessor::HandleNotification(rpc::GcsNodeAddressAndLiveness &&node_
// was alive and is now dead.
bool was_alive = (entry->second.state() == rpc::GcsNodeInfo::ALIVE);
is_notif_new = was_alive && !is_alive;
is_alive_to_dead_transition = is_notif_new;

// Handle the same logic as in HandleNotification for preventing re-adding removed
// nodes
Expand Down Expand Up @@ -451,7 +456,14 @@ void NodeInfoAccessor::HandleNotification(rpc::GcsNodeAddressAndLiveness &&node_

// If the notification is new, call registered callback.
if (node_info_copy_for_callback) {
node_change_callback_address_and_liveness_(node_id, *node_info_copy_for_callback);
// If we're seeing a DEAD notification during initialization, but the cache
// already had this node as ALIVE (meaning polling saw it first), pass
// is_initializing=false. Intention being to articulate that there's been
// an advancement in state, HOWEVER we should remove his logic once it's guaranteed
// that initialization callbacks are triggered before streaming callbacks
bool effective_is_initializing = is_initializing && !is_alive_to_dead_transition;
node_change_callback_address_and_liveness_(
node_id, *node_info_copy_for_callback, effective_is_initializing);
}
}

Expand Down
22 changes: 17 additions & 5 deletions src/ray/gcs_rpc_client/accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -206,11 +206,19 @@ class NodeInfoAccessor {
/// and liveness information for each node, excluding other node metadata.
///
/// \param subscribe Callback that will be called if a node is
/// added or a node is removed. The callback needs to be idempotent because it will also
/// be called for existing nodes.
/// added or a node is removed. Callbacks are triggered from two sources. One is
/// from an initialization stage and steady state stage, and they may interleave (there
/// is no ordering guarantee relative to these two stages). is_initialization denotes
/// the source of the callback and these are triggered from a compacted view of events
/// which will include a limited set of dead nodes (but not all). As such, it's not
/// guaranteed that for every dead node, that the callback will be triggered
/// twice (for alive and dead states) or even once (as it's possible a dead node was
/// trimmed from the list) during the lifetime of a cluster and a given subscription.
/// \param done Callback that will be called when subscription is complete.
virtual void AsyncSubscribeToNodeAddressAndLivenessChange(
std::function<void(NodeID, const rpc::GcsNodeAddressAndLiveness &)> subscribe,
std::function<void(NodeID,
const rpc::GcsNodeAddressAndLiveness &,
const bool is_initializing)> subscribe,
StatusCallback done);

/// Send a check alive request to GCS for the liveness of some nodes.
Expand Down Expand Up @@ -261,7 +269,8 @@ class NodeInfoAccessor {
virtual void AsyncResubscribe();

/// Add rpc::GcsNodeAddressAndLiveness information to accessor cache.
virtual void HandleNotification(rpc::GcsNodeAddressAndLiveness &&node_info);
virtual void HandleNotification(rpc::GcsNodeAddressAndLiveness &&node_info,
const bool is_initializing = false);

virtual bool IsSubscribedToNodeChange() const {
return node_change_callback_address_and_liveness_ != nullptr;
Expand All @@ -276,7 +285,8 @@ class NodeInfoAccessor {

/// The callback to call when a new node is added or a node is removed when leveraging
/// the GcsNodeAddressAndLiveness version of the node api
std::function<void(NodeID, const rpc::GcsNodeAddressAndLiveness &)>
std::function<void(
NodeID, const rpc::GcsNodeAddressAndLiveness &, const bool is_initializing)>
node_change_callback_address_and_liveness_ = nullptr;

/// Mutex to protect node_cache_address_and_liveness_ for thread-safe access
Expand All @@ -290,6 +300,8 @@ class NodeInfoAccessor {
// TODO(dayshah): Need to refactor gcs client / accessor to avoid this.
// https://github.com/ray-project/ray/issues/54805
FRIEND_TEST(NodeInfoAccessorTest, TestHandleNotification);
FRIEND_TEST(NodeInfoAccessorTest, TestHandleNotificationIsInitializingOverride);
FRIEND_TEST(NodeInfoAccessorTest, TestHandleNotificationIsInitializingPreserved);
};

/// \class NodeResourceInfoAccessor
Expand Down
64 changes: 63 additions & 1 deletion src/ray/gcs_rpc_client/tests/accessor_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ TEST(NodeInfoAccessorTest, TestHandleNotification) {
NodeInfoAccessor accessor;
int num_notifications = 0;
accessor.node_change_callback_address_and_liveness_ =
[&](NodeID, const rpc::GcsNodeAddressAndLiveness &) { num_notifications++; };
[&](NodeID, const rpc::GcsNodeAddressAndLiveness &, const bool) {
num_notifications++;
};
NodeID node_id = NodeID::FromRandom();

rpc::GcsNodeAddressAndLiveness node_info;
Expand Down Expand Up @@ -61,6 +63,66 @@ TEST(NodeInfoAccessorTest, TestHandleNotification) {
ASSERT_EQ(num_notifications, 2);
}

TEST(NodeInfoAccessorTest, TestHandleNotificationIsInitializingOverride) {
// Test that when a DEAD notification comes during initialization, but the cache
// already has the node as ALIVE (meaning polling saw it first), the callback
// receives is_initializing=false so that counters get decremented correctly.
//
// This fixes a bug where:
// 1. Poll returns ALIVE → counter increments
// 2. Init returns DEAD with is_initializing=true → counter skips decrement (BUG)
// 3. Later poll DEAD is filtered by cache
// 4. Counter stuck at wrong value

NodeInfoAccessor accessor;
std::vector<bool> is_initializing_values;
accessor.node_change_callback_address_and_liveness_ =
[&](NodeID, const rpc::GcsNodeAddressAndLiveness &, const bool is_initializing) {
is_initializing_values.push_back(is_initializing);
};
NodeID node_id = NodeID::FromRandom();

rpc::GcsNodeAddressAndLiveness node_info;
node_info.set_node_id(node_id.Binary());

// Simulate poll seeing ALIVE first (is_initializing=false)
node_info.set_state(rpc::GcsNodeInfo::ALIVE);
accessor.HandleNotification(rpc::GcsNodeAddressAndLiveness(node_info), false);
ASSERT_EQ(is_initializing_values.size(), 1);
ASSERT_EQ(is_initializing_values[0], false);

// Simulate init seeing DEAD (passed is_initializing=true, but should be overridden)
node_info.set_state(rpc::GcsNodeInfo::DEAD);
accessor.HandleNotification(rpc::GcsNodeAddressAndLiveness(node_info), true);
ASSERT_EQ(is_initializing_values.size(), 2);
// Key assertion: even though we passed is_initializing=true, the callback should
// receive false because the cache had ALIVE and we're transitioning to DEAD
ASSERT_EQ(is_initializing_values[1], false);
}

TEST(NodeInfoAccessorTest, TestHandleNotificationIsInitializingPreserved) {
// Test that when a DEAD notification comes during initialization and the cache
// does NOT have the node (no prior ALIVE), is_initializing=true is preserved.

NodeInfoAccessor accessor;
std::vector<bool> is_initializing_values;
accessor.node_change_callback_address_and_liveness_ =
[&](NodeID, const rpc::GcsNodeAddressAndLiveness &, const bool is_initializing) {
is_initializing_values.push_back(is_initializing);
};
NodeID node_id = NodeID::FromRandom();

rpc::GcsNodeAddressAndLiveness node_info;
node_info.set_node_id(node_id.Binary());

// Simulate init seeing DEAD first (no prior ALIVE in cache)
node_info.set_state(rpc::GcsNodeInfo::DEAD);
accessor.HandleNotification(rpc::GcsNodeAddressAndLiveness(node_info), true);
ASSERT_EQ(is_initializing_values.size(), 1);
// is_initializing should be preserved as true since there was no ALIVE→DEAD transition
ASSERT_EQ(is_initializing_values[0], true);
}

TEST(NodeInfoAccessorTest, TestHandleNotificationDeathInfo) {
NodeInfoAccessor accessor;
rpc::GcsNodeAddressAndLiveness node_info;
Expand Down
13 changes: 8 additions & 5 deletions src/ray/gcs_rpc_client/tests/gcs_client_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,9 @@ class GcsClientTest : public ::testing::TestWithParam<bool> {
}

bool SubscribeToNodeAddressAndLivenessChange(
std::function<void(NodeID, const rpc::GcsNodeAddressAndLiveness &)> subscribe) {
std::function<void(NodeID,
const rpc::GcsNodeAddressAndLiveness &,
const bool is_initializing)> subscribe) {
std::promise<bool> promise;
gcs_client_->Nodes().AsyncSubscribeToNodeAddressAndLivenessChange(
subscribe, [&promise](Status status) { promise.set_value(status.ok()); });
Expand Down Expand Up @@ -618,7 +620,8 @@ TEST_P(GcsClientTest, TestNodeInfo) {
std::atomic<int> unregister_count(0);
auto on_subscribe = [&register_count, &unregister_count](
const NodeID &node_id,
const rpc::GcsNodeAddressAndLiveness &data) {
const rpc::GcsNodeAddressAndLiveness &data,
const bool is_initializing) {
if (data.state() == rpc::GcsNodeInfo::ALIVE) {
++register_count;
} else if (data.state() == rpc::GcsNodeInfo::DEAD) {
Expand Down Expand Up @@ -810,9 +813,9 @@ TEST_P(GcsClientTest, TestNodeTableResubscribe) {
// Test that subscription of the node table can still work when GCS server restarts.
// Subscribe to node addition and removal events from GCS and cache those information.
std::atomic<int> node_change_count(0);
auto node_subscribe = [&node_change_count](
const NodeID &id,
const rpc::GcsNodeAddressAndLiveness &result) {
auto node_subscribe = [&node_change_count](const NodeID &id,
const rpc::GcsNodeAddressAndLiveness &result,
const bool is_initializing) {
++node_change_count;
};
ASSERT_TRUE(SubscribeToNodeAddressAndLivenessChange(node_subscribe));
Expand Down
3 changes: 2 additions & 1 deletion src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,8 @@ void NodeManager::Start(rpc::GcsNodeInfo &&self_node_info) {

void NodeManager::RegisterGcs() {
auto on_node_change = [this](const NodeID &node_id,
const rpc::GcsNodeAddressAndLiveness &data) {
const rpc::GcsNodeAddressAndLiveness &data,
const bool is_initializing) {
if (data.state() == GcsNodeInfo::ALIVE) {
NodeAdded(data);
} else {
Expand Down
Loading