diff --git a/src/mock/ray/gcs_client/accessor.h b/src/mock/ray/gcs_client/accessor.h index 7448ead11a03..b54c43005c13 100644 --- a/src/mock/ray/gcs_client/accessor.h +++ b/src/mock/ray/gcs_client/accessor.h @@ -106,6 +106,7 @@ class MockNodeInfoAccessor : public NodeInfoAccessor { MOCK_METHOD(bool, IsNodeDead, (const NodeID &node_id), (const, override)); MOCK_METHOD(bool, IsNodeAlive, (const NodeID &node_id), (const, override)); MOCK_METHOD(void, AsyncResubscribe, (), (override)); + MOCK_METHOD(int, GetAliveNodeCount, (), (const, override)); }; } // namespace gcs diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 7fd094f0d7d8..8bbb33afa90d 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -733,7 +733,6 @@ void CoreWorker::SubscribeToNodeChanges() { // fiasco between gcs_client, reference_counter_, raylet_client_pool_, and // core_worker_client_pool_. auto on_node_change = [reference_counter = reference_counter_, - rate_limiter = lease_request_rate_limiter_, raylet_client_pool = raylet_client_pool_, core_worker_client_pool = core_worker_client_pool_]( const NodeID &node_id, @@ -746,11 +745,6 @@ void CoreWorker::SubscribeToNodeChanges() { raylet_client_pool->Disconnect(node_id); core_worker_client_pool->Disconnect(node_id); } - auto cluster_size_based_rate_limiter = - dynamic_cast(rate_limiter.get()); - if (cluster_size_based_rate_limiter != nullptr) { - cluster_size_based_rate_limiter->OnNodeChanges(data); - } }; gcs_client_->Nodes().AsyncSubscribeToNodeAddressAndLivenessChange( diff --git a/src/ray/core_worker/core_worker_process.cc b/src/ray/core_worker/core_worker_process.cc index 1e021929e8e2..937996ad10bf 100644 --- a/src/ray/core_worker/core_worker_process.cc +++ b/src/ray/core_worker/core_worker_process.cc @@ -343,7 +343,7 @@ std::shared_ptr CoreWorkerProcessImpl::CreateCoreWorker( << "max_pending_lease_requests_per_scheduling_category can't be 0"; lease_request_rate_limiter = std::make_shared( - /*min_concurrent_lease_cap_*/ 10); + /*min_concurrent_lease_cap_*/ 10, gcs_client.get()); } // We can turn on exit_on_connection_failure on for the core worker plasma diff --git a/src/ray/core_worker/task_submission/normal_task_submitter.cc b/src/ray/core_worker/task_submission/normal_task_submitter.cc index ebe984ea3b41..d27b42f448c0 100644 --- a/src/ray/core_worker/task_submission/normal_task_submitter.cc +++ b/src/ray/core_worker/task_submission/normal_task_submitter.cc @@ -26,6 +26,7 @@ #include "ray/common/lease/lease_spec.h" #include "ray/common/protobuf_utils.h" #include "ray/core_worker/task_submission/task_submission_util.h" +#include "ray/gcs_rpc_client/gcs_client.h" #include "ray/util/time.h" namespace ray { @@ -817,27 +818,13 @@ bool NormalTaskSubmitter::QueueGeneratorForResubmit(const TaskSpecification &spe } ClusterSizeBasedLeaseRequestRateLimiter::ClusterSizeBasedLeaseRequestRateLimiter( - size_t min_concurrent_lease_limit) - : min_concurrent_lease_cap_(min_concurrent_lease_limit), num_alive_nodes_(0) {} + size_t min_concurrent_lease_limit, gcs::GcsClient *gcs_client) + : min_concurrent_lease_cap_(min_concurrent_lease_limit), gcs_client_(gcs_client) {} size_t ClusterSizeBasedLeaseRequestRateLimiter:: GetMaxPendingLeaseRequestsPerSchedulingCategory() { - return std::max(min_concurrent_lease_cap_, num_alive_nodes_.load()); -} - -void ClusterSizeBasedLeaseRequestRateLimiter::OnNodeChanges( - const rpc::GcsNodeAddressAndLiveness &data) { - if (data.state() == rpc::GcsNodeInfo::DEAD) { - if (num_alive_nodes_ != 0) { - num_alive_nodes_--; - } else { - RAY_LOG(WARNING) << "Node" << data.node_manager_address() - << " change state to DEAD but num_alive_node is 0."; - } - } else { - num_alive_nodes_++; - } - RAY_LOG_EVERY_MS(INFO, 60000) << "Number of alive nodes:" << num_alive_nodes_.load(); + size_t alive_nodes = gcs_client_ ? gcs_client_->Nodes().GetAliveNodeCount() : 0; + return std::max(min_concurrent_lease_cap_, alive_nodes); } } // namespace core diff --git a/src/ray/core_worker/task_submission/normal_task_submitter.h b/src/ray/core_worker/task_submission/normal_task_submitter.h index d8090de144bc..0bc920774942 100644 --- a/src/ray/core_worker/task_submission/normal_task_submitter.h +++ b/src/ray/core_worker/task_submission/normal_task_submitter.h @@ -73,13 +73,13 @@ class StaticLeaseRequestRateLimiter : public LeaseRequestRateLimiter { // It returns max(num_nodes_in_cluster, min_concurrent_lease_limit) class ClusterSizeBasedLeaseRequestRateLimiter : public LeaseRequestRateLimiter { public: - explicit ClusterSizeBasedLeaseRequestRateLimiter(size_t min_concurrent_lease_limit); + ClusterSizeBasedLeaseRequestRateLimiter(size_t min_concurrent_lease_limit, + gcs::GcsClient *gcs_client); size_t GetMaxPendingLeaseRequestsPerSchedulingCategory() override; - void OnNodeChanges(const rpc::GcsNodeAddressAndLiveness &data); private: const size_t min_concurrent_lease_cap_; - std::atomic num_alive_nodes_; + gcs::GcsClient *gcs_client_; }; // This class is thread-safe. @@ -365,14 +365,14 @@ class NormalTaskSubmitter { // Generators that are currently running and need to be resubmitted. absl::flat_hash_set generators_to_resubmit_ ABSL_GUARDED_BY(mu_); - // Tasks that have failed but we are waiting for their error cause to decide if they + // Tasks that have failed but were waiting for their error cause to decide if they // should be retried or permanently failed. absl::flat_hash_set failed_tasks_pending_failure_cause_ ABSL_GUARDED_BY(mu_); // Ratelimiter controls the num of pending lease requests. std::shared_ptr lease_request_rate_limiter_; - // Retries cancelation requests if they were not successful. + // Retries cancellation requests if they were not successful. instrumented_io_context &io_service_; ray::observability::MetricInterface &scheduler_placement_time_ms_histogram_; diff --git a/src/ray/core_worker/task_submission/tests/normal_task_submitter_test.cc b/src/ray/core_worker/task_submission/tests/normal_task_submitter_test.cc index 2294bc51afe2..cda7d93e26e5 100644 --- a/src/ray/core_worker/task_submission/tests/normal_task_submitter_test.cc +++ b/src/ray/core_worker/task_submission/tests/normal_task_submitter_test.cc @@ -1934,34 +1934,44 @@ TEST(LeaseRequestRateLimiterTest, StaticLeaseRequestRateLimiter) { } TEST(LeaseRequestRateLimiterTest, ClusterSizeBasedLeaseRequestRateLimiter) { - rpc::GcsNodeAddressAndLiveness dead_node; - dead_node.set_state(rpc::GcsNodeInfo::DEAD); - rpc::GcsNodeAddressAndLiveness alive_node; - alive_node.set_state(rpc::GcsNodeInfo::ALIVE); + // Test that the rate limiter uses gcs_client to get alive node count { - ClusterSizeBasedLeaseRequestRateLimiter limiter(1); - ASSERT_EQ(limiter.GetMaxPendingLeaseRequestsPerSchedulingCategory(), 1); - limiter.OnNodeChanges(alive_node); - ASSERT_EQ(limiter.GetMaxPendingLeaseRequestsPerSchedulingCategory(), 1); - limiter.OnNodeChanges(alive_node); - ASSERT_EQ(limiter.GetMaxPendingLeaseRequestsPerSchedulingCategory(), 2); - limiter.OnNodeChanges(dead_node); - ASSERT_EQ(limiter.GetMaxPendingLeaseRequestsPerSchedulingCategory(), 1); - limiter.OnNodeChanges(dead_node); - ASSERT_EQ(limiter.GetMaxPendingLeaseRequestsPerSchedulingCategory(), 1); + gcs::MockGcsClient mock_gcs_client; + ClusterSizeBasedLeaseRequestRateLimiter limiter(1, &mock_gcs_client); + + // Expect GetAliveNodeCount to return 5 nodes + EXPECT_CALL(*mock_gcs_client.mock_node_accessor, GetAliveNodeCount()) + .WillOnce(::testing::Return(5)); + ASSERT_EQ(limiter.GetMaxPendingLeaseRequestsPerSchedulingCategory(), 5); + } + + { + gcs::MockGcsClient mock_gcs_client; + ClusterSizeBasedLeaseRequestRateLimiter limiter(10, &mock_gcs_client); + + // min_concurrent_lease_cap is 10, but only 3 nodes + EXPECT_CALL(*mock_gcs_client.mock_node_accessor, GetAliveNodeCount()) + .WillOnce(::testing::Return(3)); + // Should return max(10, 3) = 10 + ASSERT_EQ(limiter.GetMaxPendingLeaseRequestsPerSchedulingCategory(), 10); + } + + { + gcs::MockGcsClient mock_gcs_client; + ClusterSizeBasedLeaseRequestRateLimiter limiter(10, &mock_gcs_client); + + // Now with more nodes than the cap + EXPECT_CALL(*mock_gcs_client.mock_node_accessor, GetAliveNodeCount()) + .WillOnce(::testing::Return(15)); + // Should return max(10, 15) = 15 + ASSERT_EQ(limiter.GetMaxPendingLeaseRequestsPerSchedulingCategory(), 15); } + // Test with nullptr gcs_client { - ClusterSizeBasedLeaseRequestRateLimiter limiter(0); - ASSERT_EQ(limiter.GetMaxPendingLeaseRequestsPerSchedulingCategory(), 0); - limiter.OnNodeChanges(alive_node); - ASSERT_EQ(limiter.GetMaxPendingLeaseRequestsPerSchedulingCategory(), 1); - limiter.OnNodeChanges(dead_node); - ASSERT_EQ(limiter.GetMaxPendingLeaseRequestsPerSchedulingCategory(), 0); - limiter.OnNodeChanges(dead_node); - ASSERT_EQ(limiter.GetMaxPendingLeaseRequestsPerSchedulingCategory(), 0); - limiter.OnNodeChanges(alive_node); - ASSERT_EQ(limiter.GetMaxPendingLeaseRequestsPerSchedulingCategory(), 1); + ClusterSizeBasedLeaseRequestRateLimiter limiter(5, nullptr); + // With nullptr, should return min_concurrent_lease_cap + ASSERT_EQ(limiter.GetMaxPendingLeaseRequestsPerSchedulingCategory(), 5); } } diff --git a/src/ray/gcs_rpc_client/accessor.cc b/src/ray/gcs_rpc_client/accessor.cc index bd21bf4c38bd..e409975c12d1 100644 --- a/src/ray/gcs_rpc_client/accessor.cc +++ b/src/ray/gcs_rpc_client/accessor.cc @@ -359,6 +359,11 @@ NodeInfoAccessor::GetAllNodeAddressAndLiveness() const { return node_cache_address_and_liveness_; } +int NodeInfoAccessor::GetAliveNodeCount() const { + absl::MutexLock lock(&node_cache_address_and_liveness_mutex_); + return alive_node_count_; +} + StatusOr> NodeInfoAccessor::GetAllNoCache( int64_t timeout_ms, std::optional state_filter, @@ -416,6 +421,10 @@ void NodeInfoAccessor::HandleNotification(rpc::GcsNodeAddressAndLiveness &&node_ if (entry == node_cache_address_and_liveness_.end()) { // If the entry is not in the cache, then the notification is new. is_notif_new = true; + // New node being added - increment counter if alive + if (is_alive) { + alive_node_count_++; + } } else { // If the entry is in the cache, then the notification is new if the node // was alive and is now dead. @@ -430,6 +439,11 @@ void NodeInfoAccessor::HandleNotification(rpc::GcsNodeAddressAndLiveness &&node_ << node_id; return; } + + // Node transitioning from alive to dead - decrement counter + if (is_notif_new) { + alive_node_count_--; + } } // Add the notification to our address and liveness cache. diff --git a/src/ray/gcs_rpc_client/accessor.h b/src/ray/gcs_rpc_client/accessor.h index 9711e5f3965a..8091bbf7219f 100644 --- a/src/ray/gcs_rpc_client/accessor.h +++ b/src/ray/gcs_rpc_client/accessor.h @@ -194,6 +194,14 @@ class NodeInfoAccessor { virtual absl::flat_hash_map GetAllNodeAddressAndLiveness() const; + /// Get the number of alive nodes from the local cache. + /// Thread-safe. + /// Note, the local cache is only available if + /// `AsyncSubscribeToNodeAddressAndLivenessChange` is called before. + /// + /// \return The number of alive nodes in the local cache. + virtual int GetAliveNodeCount() const; + /// Get information of all nodes from an RPC to GCS synchronously with optional filters. /// /// \return All nodes that match the given filters from the gcs without the cache. @@ -287,6 +295,9 @@ class NodeInfoAccessor { node_cache_address_and_liveness_ ABSL_GUARDED_BY(node_cache_address_and_liveness_mutex_); + /// Counter tracking the number of alive nodes in the cache + int alive_node_count_ ABSL_GUARDED_BY(node_cache_address_and_liveness_mutex_) = 0; + // TODO(dayshah): Need to refactor gcs client / accessor to avoid this. // https://github.com/ray-project/ray/issues/54805 FRIEND_TEST(NodeInfoAccessorTest, TestHandleNotification); diff --git a/src/ray/gcs_rpc_client/tests/accessor_test.cc b/src/ray/gcs_rpc_client/tests/accessor_test.cc index 5dc81cb6f72f..1c4fe025ac8c 100644 --- a/src/ray/gcs_rpc_client/tests/accessor_test.cc +++ b/src/ray/gcs_rpc_client/tests/accessor_test.cc @@ -85,6 +85,279 @@ TEST(NodeInfoAccessorTest, TestHandleNotificationDeathInfo) { ASSERT_EQ(cached_node->death_info().reason_message(), "Test termination reason"); } +TEST(NodeInfoAccessorTest, TestGetAliveNodeCountInitiallyZero) { + // Test that the alive node count is 0 initially. + NodeInfoAccessor accessor; + ASSERT_EQ(accessor.GetAliveNodeCount(), 0); +} + +TEST(NodeInfoAccessorTest, TestGetAliveNodeCountIncrementsOnAliveNode) { + // Test that adding an alive node increments the count. + NodeInfoAccessor accessor; + ASSERT_EQ(accessor.GetAliveNodeCount(), 0); + + NodeID node_id = NodeID::FromRandom(); + rpc::GcsNodeAddressAndLiveness node_info; + node_info.set_node_id(node_id.Binary()); + node_info.set_state(rpc::GcsNodeInfo::ALIVE); + + accessor.HandleNotification(std::move(node_info)); + ASSERT_EQ(accessor.GetAliveNodeCount(), 1); +} + +TEST(NodeInfoAccessorTest, TestGetAliveNodeCountMultipleNodes) { + // Test that adding multiple alive nodes increments the count correctly. + NodeInfoAccessor accessor; + ASSERT_EQ(accessor.GetAliveNodeCount(), 0); + + // Add 3 alive nodes + for (int i = 0; i < 3; i++) { + NodeID node_id = NodeID::FromRandom(); + rpc::GcsNodeAddressAndLiveness node_info; + node_info.set_node_id(node_id.Binary()); + node_info.set_state(rpc::GcsNodeInfo::ALIVE); + accessor.HandleNotification(std::move(node_info)); + } + + ASSERT_EQ(accessor.GetAliveNodeCount(), 3); +} + +TEST(NodeInfoAccessorTest, TestGetAliveNodeCountDecrementsOnDeath) { + // Test that a node going from alive to dead decrements the count. + NodeInfoAccessor accessor; + NodeID node_id = NodeID::FromRandom(); + + // First add an alive node + rpc::GcsNodeAddressAndLiveness node_info; + node_info.set_node_id(node_id.Binary()); + node_info.set_state(rpc::GcsNodeInfo::ALIVE); + accessor.HandleNotification(rpc::GcsNodeAddressAndLiveness(node_info)); + ASSERT_EQ(accessor.GetAliveNodeCount(), 1); + + // Then mark it as dead + node_info.set_state(rpc::GcsNodeInfo::DEAD); + accessor.HandleNotification(std::move(node_info)); + ASSERT_EQ(accessor.GetAliveNodeCount(), 0); +} + +TEST(NodeInfoAccessorTest, TestGetAliveNodeCountDeadNodeDirectlyAdded) { + // Test that adding a dead node directly doesn't increment the count. + NodeInfoAccessor accessor; + ASSERT_EQ(accessor.GetAliveNodeCount(), 0); + + NodeID node_id = NodeID::FromRandom(); + rpc::GcsNodeAddressAndLiveness node_info; + node_info.set_node_id(node_id.Binary()); + node_info.set_state(rpc::GcsNodeInfo::DEAD); + + accessor.HandleNotification(std::move(node_info)); + ASSERT_EQ(accessor.GetAliveNodeCount(), 0); +} + +TEST(NodeInfoAccessorTest, TestGetAliveNodeCountResurrectionIgnored) { + // Test that attempting to resurrect a dead node doesn't affect the count. + // This corresponds to the TestHandleNotification test which verifies that + // a dead node cannot become alive again. + NodeInfoAccessor accessor; + NodeID node_id = NodeID::FromRandom(); + + // Add alive node + rpc::GcsNodeAddressAndLiveness node_info; + node_info.set_node_id(node_id.Binary()); + node_info.set_state(rpc::GcsNodeInfo::ALIVE); + accessor.HandleNotification(rpc::GcsNodeAddressAndLiveness(node_info)); + ASSERT_EQ(accessor.GetAliveNodeCount(), 1); + + // Mark as dead + node_info.set_state(rpc::GcsNodeInfo::DEAD); + accessor.HandleNotification(rpc::GcsNodeAddressAndLiveness(node_info)); + ASSERT_EQ(accessor.GetAliveNodeCount(), 0); + + // Try to resurrect - should be ignored + node_info.set_state(rpc::GcsNodeInfo::ALIVE); + accessor.HandleNotification(std::move(node_info)); + ASSERT_EQ(accessor.GetAliveNodeCount(), 0); // Still 0, resurrection is ignored +} + +TEST(NodeInfoAccessorTest, TestGetAliveNodeCountMultipleNodesWithDeaths) { + // Test the count with multiple nodes where some die. + NodeInfoAccessor accessor; + + // Add 5 alive nodes and track their IDs + std::vector node_ids; + for (int i = 0; i < 5; i++) { + NodeID node_id = NodeID::FromRandom(); + node_ids.push_back(node_id); + rpc::GcsNodeAddressAndLiveness node_info; + node_info.set_node_id(node_id.Binary()); + node_info.set_state(rpc::GcsNodeInfo::ALIVE); + accessor.HandleNotification(std::move(node_info)); + } + ASSERT_EQ(accessor.GetAliveNodeCount(), 5); + + // Kill 2 nodes + for (int i = 0; i < 2; i++) { + rpc::GcsNodeAddressAndLiveness node_info; + node_info.set_node_id(node_ids[i].Binary()); + node_info.set_state(rpc::GcsNodeInfo::DEAD); + accessor.HandleNotification(std::move(node_info)); + } + ASSERT_EQ(accessor.GetAliveNodeCount(), 3); + + // Add 2 more alive nodes + for (int i = 0; i < 2; i++) { + NodeID node_id = NodeID::FromRandom(); + rpc::GcsNodeAddressAndLiveness node_info; + node_info.set_node_id(node_id.Binary()); + node_info.set_state(rpc::GcsNodeInfo::ALIVE); + accessor.HandleNotification(std::move(node_info)); + } + ASSERT_EQ(accessor.GetAliveNodeCount(), 5); +} + +TEST(NodeInfoAccessorTest, TestGetAliveNodeCountDuplicateAliveNotification) { + // Test that receiving duplicate alive notifications for the same node + // doesn't double-count. + NodeInfoAccessor accessor; + NodeID node_id = NodeID::FromRandom(); + + rpc::GcsNodeAddressAndLiveness node_info; + node_info.set_node_id(node_id.Binary()); + node_info.set_state(rpc::GcsNodeInfo::ALIVE); + + // Send the same alive notification twice + accessor.HandleNotification(rpc::GcsNodeAddressAndLiveness(node_info)); + ASSERT_EQ(accessor.GetAliveNodeCount(), 1); + + accessor.HandleNotification(std::move(node_info)); + ASSERT_EQ(accessor.GetAliveNodeCount(), 1); // Still 1, not double-counted +} + +TEST(NodeInfoAccessorTest, TestGetAliveNodeCountDuplicateDeadNotification) { + // Test that receiving duplicate dead notifications for the same node + // doesn't incorrectly decrement the count. + // This can happen during bootstrap when pubsub messages arrive out of order + // relative to the initial state query. + NodeInfoAccessor accessor; + NodeID node_id = NodeID::FromRandom(); + + rpc::GcsNodeAddressAndLiveness node_info; + node_info.set_node_id(node_id.Binary()); + node_info.set_state(rpc::GcsNodeInfo::DEAD); + + // First dead notification (e.g., from pubsub during bootstrap) + accessor.HandleNotification(rpc::GcsNodeAddressAndLiveness(node_info)); + ASSERT_EQ(accessor.GetAliveNodeCount(), 0); + + // Second dead notification (e.g., from initial state query) + accessor.HandleNotification(std::move(node_info)); + ASSERT_EQ(accessor.GetAliveNodeCount(), 0); // Still 0, not decremented below 0 +} + +TEST(NodeInfoAccessorTest, TestGetAliveNodeCountResurrectionOfInitiallyDeadNode) { + // Test that a node that was first seen as DEAD cannot be resurrected to ALIVE. + // This can happen during bootstrap when: + // 1. Initial state query returns a node as DEAD (node died before we started) + // 2. A late/out-of-order pubsub message arrives saying the node is ALIVE + // The resurrection should be ignored. + NodeInfoAccessor accessor; + NodeID node_id = NodeID::FromRandom(); + + rpc::GcsNodeAddressAndLiveness node_info; + node_info.set_node_id(node_id.Binary()); + + // First notification: node is DEAD (e.g., from initial state bootstrap) + node_info.set_state(rpc::GcsNodeInfo::DEAD); + accessor.HandleNotification(rpc::GcsNodeAddressAndLiveness(node_info)); + ASSERT_EQ(accessor.GetAliveNodeCount(), 0); + ASSERT_TRUE(accessor.IsNodeDead(node_id)); + + // Second notification: late ALIVE message arrives (should be ignored) + node_info.set_state(rpc::GcsNodeInfo::ALIVE); + accessor.HandleNotification(std::move(node_info)); + ASSERT_EQ(accessor.GetAliveNodeCount(), 0); // Still 0, resurrection ignored + ASSERT_TRUE(accessor.IsNodeDead(node_id)); // Node is still dead +} + +TEST(NodeInfoAccessorTest, TestGetAliveNodeCountBootstrapRaceCondition) { + // Comprehensive test simulating bootstrap race conditions. + // During bootstrap, pubsub subscription starts before initial state is fetched, + // so messages can arrive in any order. + NodeInfoAccessor accessor; + + // Scenario: Multiple nodes with various race conditions + NodeID node1 = NodeID::FromRandom(); // Will be: pubsub ALIVE -> initial ALIVE + NodeID node2 = NodeID::FromRandom(); // Will be: pubsub ALIVE -> initial DEAD + NodeID node3 = NodeID::FromRandom(); // Will be: pubsub DEAD -> initial ALIVE + NodeID node4 = NodeID::FromRandom(); // Will be: pubsub DEAD -> initial DEAD + NodeID node5 = NodeID::FromRandom(); // Will be: only in initial state as ALIVE + + // Simulate pubsub messages arriving first + rpc::GcsNodeAddressAndLiveness node_info; + + // Node1: pubsub says ALIVE + node_info.set_node_id(node1.Binary()); + node_info.set_state(rpc::GcsNodeInfo::ALIVE); + accessor.HandleNotification(rpc::GcsNodeAddressAndLiveness(node_info)); + + // Node2: pubsub says ALIVE + node_info.set_node_id(node2.Binary()); + node_info.set_state(rpc::GcsNodeInfo::ALIVE); + accessor.HandleNotification(rpc::GcsNodeAddressAndLiveness(node_info)); + + // Node3: pubsub says DEAD + node_info.set_node_id(node3.Binary()); + node_info.set_state(rpc::GcsNodeInfo::DEAD); + accessor.HandleNotification(rpc::GcsNodeAddressAndLiveness(node_info)); + + // Node4: pubsub says DEAD + node_info.set_node_id(node4.Binary()); + node_info.set_state(rpc::GcsNodeInfo::DEAD); + accessor.HandleNotification(rpc::GcsNodeAddressAndLiveness(node_info)); + + ASSERT_EQ(accessor.GetAliveNodeCount(), 2); // node1 and node2 are alive + + // Now simulate initial state bootstrap arriving + // Node1: initial state says ALIVE (duplicate, should be ignored) + node_info.set_node_id(node1.Binary()); + node_info.set_state(rpc::GcsNodeInfo::ALIVE); + accessor.HandleNotification(rpc::GcsNodeAddressAndLiveness(node_info)); + + // Node2: initial state says DEAD (node died, count should decrement) + node_info.set_node_id(node2.Binary()); + node_info.set_state(rpc::GcsNodeInfo::DEAD); + accessor.HandleNotification(rpc::GcsNodeAddressAndLiveness(node_info)); + + // Node3: initial state says ALIVE (resurrection attempt, should be ignored) + node_info.set_node_id(node3.Binary()); + node_info.set_state(rpc::GcsNodeInfo::ALIVE); + accessor.HandleNotification(rpc::GcsNodeAddressAndLiveness(node_info)); + + // Node4: initial state says DEAD (duplicate dead, should be ignored) + node_info.set_node_id(node4.Binary()); + node_info.set_state(rpc::GcsNodeInfo::DEAD); + accessor.HandleNotification(rpc::GcsNodeAddressAndLiveness(node_info)); + + // Node5: only appears in initial state as ALIVE + node_info.set_node_id(node5.Binary()); + node_info.set_state(rpc::GcsNodeInfo::ALIVE); + accessor.HandleNotification(std::move(node_info)); + + // Final state: + // node1: ALIVE (was alive, got duplicate alive) + // node2: DEAD (was alive, then died) + // node3: DEAD (was dead, resurrection ignored) + // node4: DEAD (was dead, duplicate dead) + // node5: ALIVE (newly added) + ASSERT_EQ(accessor.GetAliveNodeCount(), 2); // node1 and node5 + + ASSERT_TRUE(accessor.IsNodeAlive(node1)); + ASSERT_TRUE(accessor.IsNodeDead(node2)); + ASSERT_TRUE(accessor.IsNodeDead(node3)); + ASSERT_TRUE(accessor.IsNodeDead(node4)); + ASSERT_TRUE(accessor.IsNodeAlive(node5)); +} + int main(int argc, char **argv) { ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); diff --git a/src/ray/raylet/tests/node_manager_test.cc b/src/ray/raylet/tests/node_manager_test.cc index 884e710ead16..5b415ea893b0 100644 --- a/src/ray/raylet/tests/node_manager_test.cc +++ b/src/ray/raylet/tests/node_manager_test.cc @@ -624,11 +624,12 @@ TEST_F(NodeManagerTest, TestDetachedWorkerIsKilledByFailedNode) { publish_node_change_callback; EXPECT_CALL(*mock_gcs_client_->mock_node_accessor, AsyncSubscribeToNodeAddressAndLivenessChange(_, _)) - .WillOnce([&](const rpc::SubscribeCallback - &subscribe, - const rpc::StatusCallback &done) { - publish_node_change_callback = subscribe; - }); + .WillOnce( + [&](const std::function + &subscribe, + const rpc::StatusCallback &done) { + publish_node_change_callback = subscribe; + }); node_manager_->RegisterGcs(); // Preparing a detached actor creation task spec for the later RequestWorkerLease rpc. @@ -1348,15 +1349,16 @@ TEST_P(NodeManagerDeathTest, TestGcsPublishesSelfDead) { // started const bool shutting_down_during_death_publish = GetParam(); - rpc::SubscribeCallback + std::function publish_node_change_callback; EXPECT_CALL(*mock_gcs_client_->mock_node_accessor, AsyncSubscribeToNodeAddressAndLivenessChange(_, _)) - .WillOnce([&](const rpc::SubscribeCallback - &subscribe, - const rpc::StatusCallback &done) { - publish_node_change_callback = subscribe; - }); + .WillOnce( + [&](const std::function + &subscribe, + const rpc::StatusCallback &done) { + publish_node_change_callback = subscribe; + }); node_manager_->RegisterGcs(); shutting_down_ = shutting_down_during_death_publish;