diff --git a/python/ray/tests/test_raylet_fault_tolerance.py b/python/ray/tests/test_raylet_fault_tolerance.py index a5e968c89453..560db13e0edb 100644 --- a/python/ray/tests/test_raylet_fault_tolerance.py +++ b/python/ray/tests/test_raylet_fault_tolerance.py @@ -92,7 +92,7 @@ def node_is_dead(): # propagated to all the raylets. Since this is inherently racy, we block CancelResourceReserve RPCs # from ever succeeding to make this test deterministic. @pytest.fixture -def inject_rpc_failures(monkeypatch, request): +def inject_release_unused_bundles_rpc_failure(monkeypatch, request): deterministic_failure = request.param monkeypatch.setenv( "RAY_testing_rpc_failure", @@ -102,14 +102,16 @@ def inject_rpc_failures(monkeypatch, request): ) -@pytest.mark.parametrize("inject_rpc_failures", ["request", "response"], indirect=True) +@pytest.mark.parametrize( + "inject_release_unused_bundles_rpc_failure", ["request", "response"], indirect=True +) @pytest.mark.parametrize( "ray_start_cluster_head_with_external_redis", [{"num_cpus": 1}], indirect=True, ) def test_release_unused_bundles_idempotent( - inject_rpc_failures, + inject_release_unused_bundles_rpc_failure, ray_start_cluster_head_with_external_redis, ): cluster = ray_start_cluster_head_with_external_redis @@ -141,6 +143,63 @@ def task(): assert result == "success" +@pytest.fixture +def inject_notify_gcs_restart_rpc_failure(monkeypatch, request): + deterministic_failure = request.param + monkeypatch.setenv( + "RAY_testing_rpc_failure", + "NodeManagerService.grpc_client.NotifyGCSRestart=1:" + + ("100:0" if deterministic_failure == "request" else "0:100"), + ) + + +@pytest.mark.parametrize( + "inject_notify_gcs_restart_rpc_failure", ["request", "response"], indirect=True +) +@pytest.mark.parametrize( + "ray_start_cluster_head_with_external_redis", + [ + { + "_system_config": { + # Extending the fallback timeout to focus on death + # notification received from GCS_ACTOR_CHANNEL pubsub + "timeout_ms_task_wait_for_death_info": 10000, + } + } + ], + indirect=True, +) +def test_notify_gcs_restart_idempotent( + inject_notify_gcs_restart_rpc_failure, + ray_start_cluster_head_with_external_redis, +): + cluster = ray_start_cluster_head_with_external_redis + + @ray.remote(num_cpus=1, max_restarts=0) + class DummyActor: + def get_pid(self): + return psutil.Process().pid + + def ping(self): + return "pong" + + actor = DummyActor.remote() + ray.get(actor.ping.remote()) + actor_pid = ray.get(actor.get_pid.remote()) + + cluster.head_node.kill_gcs_server() + cluster.head_node.start_gcs_server() + + p = psutil.Process(actor_pid) + p.kill() + + # If the actor death notification is not received from the GCS pubsub, this will timeout since + # the fallback via wait_for_death_info_tasks in the actor task submitter will never trigger + # since it's set to 10 seconds. + with pytest.raises(ray.exceptions.RayActorError): + ray.get(actor.ping.remote(), timeout=5) + + def test_kill_local_actor_rpc_retry_and_idempotency(monkeypatch, shutdown_only): """Test that KillLocalActor RPC retries work correctly and guarantee actor death. Not testing response since the actor is killed either way. diff --git a/src/mock/ray/gcs_client/accessor.h b/src/mock/ray/gcs_client/accessor.h index d2c30ba65754..819f384376d9 100644 --- a/src/mock/ray/gcs_client/accessor.h +++ b/src/mock/ray/gcs_client/accessor.h @@ -63,13 +63,13 @@ class MockActorInfoAccessor : public ActorInfoAccessor { (const TaskSpecification &task_spec, const rpc::ClientCallback &callback), (override)); - MOCK_METHOD(Status, + MOCK_METHOD(void, AsyncSubscribe, (const ActorID &actor_id, (const SubscribeCallback &subscribe), const StatusCallback &done), (override)); - MOCK_METHOD(Status, AsyncUnsubscribe, (const ActorID &actor_id), (override)); + MOCK_METHOD(void, AsyncUnsubscribe, (const ActorID &actor_id), (override)); MOCK_METHOD(void, AsyncResubscribe, (), (override)); MOCK_METHOD(bool, IsActorUnsubscribed, (const ActorID &actor_id), (override)); }; @@ -91,7 +91,7 @@ class MockJobInfoAccessor : public JobInfoAccessor { AsyncMarkFinished, (const JobID &job_id, const StatusCallback &callback), (override)); - MOCK_METHOD(Status, + MOCK_METHOD(void, AsyncSubscribeAll, ((const SubscribeCallback &subscribe), const StatusCallback &done), @@ -191,7 +191,6 @@ class MockNodeResourceInfoAccessor : public NodeResourceInfoAccessor { AsyncGetAllAvailableResources, (const MultiItemCallback &callback), (override)); - MOCK_METHOD(void, AsyncResubscribe, (), (override)); MOCK_METHOD(void, AsyncGetAllResourceUsage, (const ItemCallback &callback), @@ -231,7 +230,7 @@ namespace gcs { class MockWorkerInfoAccessor : public WorkerInfoAccessor { public: - MOCK_METHOD(Status, + MOCK_METHOD(void, AsyncSubscribeToWorkerFailures, (const ItemCallback &subscribe, const StatusCallback &done), diff --git a/src/ray/core_worker/actor_manager.cc b/src/ray/core_worker/actor_manager.cc index 3d52817de3de..f8320c36b35b 100644 --- a/src/ray/core_worker/actor_manager.cc +++ b/src/ray/core_worker/actor_manager.cc @@ -308,7 +308,7 @@ void ActorManager::SubscribeActorState(const ActorID &actor_id) { this, std::placeholders::_1, std::placeholders::_2); - RAY_CHECK_OK(gcs_client_->Actors().AsyncSubscribe( + gcs_client_->Actors().AsyncSubscribe( actor_id, actor_notification_callback, [this, actor_id, cached_actor_name](Status status) { @@ -323,7 +323,7 @@ void ActorManager::SubscribeActorState(const ActorID &actor_id) { cached_actor_name_to_ids_.emplace(cached_actor_name, actor_id); } } - })); + }); } void ActorManager::MarkActorKilledOrOutOfScope( diff --git a/src/ray/core_worker/tests/actor_manager_test.cc b/src/ray/core_worker/tests/actor_manager_test.cc index 6e8f6bf65395..792158f4790d 100644 --- a/src/ray/core_worker/tests/actor_manager_test.cc +++ b/src/ray/core_worker/tests/actor_manager_test.cc @@ -37,7 +37,7 @@ class MockActorInfoAccessor : public gcs::ActorInfoAccessor { ~MockActorInfoAccessor() {} - Status AsyncSubscribe( + void AsyncSubscribe( const ActorID &actor_id, const gcs::SubscribeCallback &subscribe, const gcs::StatusCallback &done) { @@ -45,7 +45,6 @@ class MockActorInfoAccessor : public gcs::ActorInfoAccessor { callback_map_.emplace(actor_id, subscribe); subscribe_finished_callback_map_[actor_id] = done; actor_subscribed_times_[actor_id]++; - return Status::OK(); } bool ActorStateNotificationPublished(const ActorID &actor_id, diff --git a/src/ray/gcs/gcs_node_manager.cc b/src/ray/gcs/gcs_node_manager.cc index 9d10271ac79c..5bdb9e49bf32 100644 --- a/src/ray/gcs/gcs_node_manager.cc +++ b/src/ray/gcs/gcs_node_manager.cc @@ -690,7 +690,14 @@ void GcsNodeManager::Initialize(const GcsInitData &gcs_init_data) { auto remote_address = rpc::RayletClientPool::GenerateRayletAddress( node_id, node_info.node_manager_address(), node_info.node_manager_port()); auto raylet_client = raylet_client_pool_->GetOrConnectByAddress(remote_address); - raylet_client->NotifyGCSRestart(nullptr); + raylet_client->NotifyGCSRestart( + [](const Status &status, const rpc::NotifyGCSRestartReply &reply) { + if (!status.ok()) { + RAY_LOG(WARNING) << "NotifyGCSRestart failed. This is expected if the " + "target node has died. Status: " + << status; + } + }); } else if (node_info.state() == rpc::GcsNodeInfo::DEAD) { dead_nodes_.emplace(node_id, std::make_shared(node_info)); sorted_dead_node_list_.emplace_back(node_id, node_info.end_time_ms()); diff --git a/src/ray/gcs_rpc_client/accessor.cc b/src/ray/gcs_rpc_client/accessor.cc index 31fe447da155..2447446e6da3 100644 --- a/src/ray/gcs_rpc_client/accessor.cc +++ b/src/ray/gcs_rpc_client/accessor.cc @@ -69,7 +69,7 @@ void JobInfoAccessor::AsyncMarkFinished(const JobID &job_id, }); } -Status JobInfoAccessor::AsyncSubscribeAll( +void JobInfoAccessor::AsyncSubscribeAll( const SubscribeCallback &subscribe, const StatusCallback &done) { RAY_CHECK(subscribe != nullptr); @@ -91,9 +91,9 @@ Status JobInfoAccessor::AsyncSubscribeAll( /*timeout_ms=*/-1); }; subscribe_operation_ = [this, subscribe](const StatusCallback &done_callback) { - return client_impl_->GetGcsSubscriber().SubscribeAllJobs(subscribe, done_callback); + client_impl_->GetGcsSubscriber().SubscribeAllJobs(subscribe, done_callback); }; - return subscribe_operation_( + subscribe_operation_( [this, done](const Status &status) { fetch_all_data_operation_(done); }); } @@ -105,9 +105,9 @@ void JobInfoAccessor::AsyncResubscribe() { }; if (subscribe_operation_ != nullptr) { - RAY_CHECK_OK(subscribe_operation_([this, fetch_all_done](const Status &) { + subscribe_operation_([this, fetch_all_done](const Status &) { fetch_all_data_operation_(fetch_all_done); - })); + }); } } @@ -391,7 +391,7 @@ void ActorInfoAccessor::AsyncReportActorOutOfScope( timeout_ms); } -Status ActorInfoAccessor::AsyncSubscribe( +void ActorInfoAccessor::AsyncSubscribe( const ActorID &actor_id, const SubscribeCallback &subscribe, const StatusCallback &done) { @@ -418,28 +418,27 @@ Status ActorInfoAccessor::AsyncSubscribe( absl::MutexLock lock(&mutex_); resubscribe_operations_[actor_id] = [this, actor_id, subscribe](const StatusCallback &subscribe_done) { - return client_impl_->GetGcsSubscriber().SubscribeActor( + client_impl_->GetGcsSubscriber().SubscribeActor( actor_id, subscribe, subscribe_done); }; fetch_data_operations_[actor_id] = fetch_data_operation; } - return client_impl_->GetGcsSubscriber().SubscribeActor( + client_impl_->GetGcsSubscriber().SubscribeActor( actor_id, subscribe, [fetch_data_operation, done](const Status &) { fetch_data_operation(done); }); } -Status ActorInfoAccessor::AsyncUnsubscribe(const ActorID &actor_id) { +void ActorInfoAccessor::AsyncUnsubscribe(const ActorID &actor_id) { RAY_LOG(DEBUG).WithField(actor_id).WithField(actor_id.JobId()) << "Cancelling subscription to an actor"; - auto status = client_impl_->GetGcsSubscriber().UnsubscribeActor(actor_id); + client_impl_->GetGcsSubscriber().UnsubscribeActor(actor_id); absl::MutexLock lock(&mutex_); resubscribe_operations_.erase(actor_id); fetch_data_operations_.erase(actor_id); RAY_LOG(DEBUG).WithField(actor_id).WithField(actor_id.JobId()) << "Finished cancelling subscription to an actor"; - return status; } void ActorInfoAccessor::AsyncResubscribe() { @@ -449,7 +448,7 @@ void ActorInfoAccessor::AsyncResubscribe() { // server first, then fetch data from the GCS server. absl::MutexLock lock(&mutex_); for (auto &[actor_id, resubscribe_op] : resubscribe_operations_) { - RAY_CHECK_OK(resubscribe_op([this, id = actor_id](const Status &status) { + resubscribe_op([this, id = actor_id](const Status &status) { absl::MutexLock callback_lock(&mutex_); auto fetch_data_operation = fetch_data_operations_[id]; // `fetch_data_operation` is called in the callback function of subscribe. @@ -458,7 +457,7 @@ void ActorInfoAccessor::AsyncResubscribe() { if (fetch_data_operation != nullptr) { fetch_data_operation(nullptr); } - })); + }); } } @@ -935,16 +934,6 @@ void NodeResourceInfoAccessor::AsyncGetDrainingNodes( }); } -void NodeResourceInfoAccessor::AsyncResubscribe() { - RAY_LOG(DEBUG) << "Reestablishing subscription for node resource info."; - if (subscribe_resource_operation_ != nullptr) { - RAY_CHECK_OK(subscribe_resource_operation_(nullptr)); - } - if (subscribe_batch_resource_usage_operation_ != nullptr) { - RAY_CHECK_OK(subscribe_batch_resource_usage_operation_(nullptr)); - } -} - void NodeResourceInfoAccessor::AsyncGetAllResourceUsage( const ItemCallback &callback) { rpc::GetAllResourceUsageRequest request; @@ -1009,14 +998,13 @@ void ErrorInfoAccessor::AsyncReportJobError(rpc::ErrorTableData data) { WorkerInfoAccessor::WorkerInfoAccessor(GcsClient *client_impl) : client_impl_(client_impl) {} -Status WorkerInfoAccessor::AsyncSubscribeToWorkerFailures( +void WorkerInfoAccessor::AsyncSubscribeToWorkerFailures( const ItemCallback &subscribe, const StatusCallback &done) { RAY_CHECK(subscribe != nullptr); subscribe_operation_ = [this, subscribe](const StatusCallback &done_callback) { - return client_impl_->GetGcsSubscriber().SubscribeAllWorkerFailures(subscribe, - done_callback); + client_impl_->GetGcsSubscriber().SubscribeAllWorkerFailures(subscribe, done_callback); }; - return subscribe_operation_(done); + subscribe_operation_(done); } void WorkerInfoAccessor::AsyncResubscribe() { @@ -1025,7 +1013,7 @@ void WorkerInfoAccessor::AsyncResubscribe() { RAY_LOG(DEBUG) << "Reestablishing subscription for worker failures."; // The pub-sub server has restarted, we need to resubscribe to the pub-sub server. if (subscribe_operation_ != nullptr) { - RAY_CHECK_OK(subscribe_operation_(nullptr)); + subscribe_operation_(nullptr); } } diff --git a/src/ray/gcs_rpc_client/accessor.h b/src/ray/gcs_rpc_client/accessor.h index a193506c5ed8..eedb84be57ce 100644 --- a/src/ray/gcs_rpc_client/accessor.h +++ b/src/ray/gcs_rpc_client/accessor.h @@ -38,7 +38,7 @@ namespace gcs { // RAY_gcs_server_request_timeout_seconds int64_t GetGcsTimeoutMs(); -using SubscribeOperation = std::function; +using SubscribeOperation = std::function; using FetchDataOperation = std::function; class GcsClient; @@ -170,8 +170,7 @@ class ActorInfoAccessor { /// \param actor_id The ID of actor to be subscribed to. /// \param subscribe Callback that will be called each time when the actor is updated. /// \param done Callback that will be called when subscription is complete. - /// \return Status - virtual Status AsyncSubscribe( + virtual void AsyncSubscribe( const ActorID &actor_id, const SubscribeCallback &subscribe, const StatusCallback &done); @@ -179,8 +178,7 @@ class ActorInfoAccessor { /// Cancel subscription to an actor. /// /// \param actor_id The ID of the actor to be unsubscribed to. - /// \return Status - virtual Status AsyncUnsubscribe(const ActorID &actor_id); + virtual void AsyncUnsubscribe(const ActorID &actor_id); /// Reestablish subscription. /// This should be called when GCS server restarts from a failure. @@ -237,8 +235,7 @@ class JobInfoAccessor { /// /// \param subscribe Callback that will be called each time when a job updates. /// \param done Callback that will be called when subscription is complete. - /// \return Status - virtual Status AsyncSubscribeAll( + virtual void AsyncSubscribeAll( const SubscribeCallback &subscribe, const StatusCallback &done); @@ -511,13 +508,6 @@ class NodeResourceInfoAccessor { virtual void AsyncGetDrainingNodes( const ItemCallback> &callback); - /// Reestablish subscription. - /// This should be called when GCS server restarts from a failure. - /// PubSub server restart will cause GCS server restart. In this case, we need to - /// resubscribe from PubSub server, otherwise we only need to fetch data from GCS - /// server. - virtual void AsyncResubscribe(); - /// Get newest resource usage of all nodes from GCS asynchronously. /// /// \param callback Callback that will be called after lookup finishes. @@ -533,11 +523,6 @@ class NodeResourceInfoAccessor { rpc::GetAllResourceUsageReply &reply); private: - /// Save the subscribe operation in this function, so we can call it again when PubSub - /// server restarts from a failure. - SubscribeOperation subscribe_resource_operation_; - SubscribeOperation subscribe_batch_resource_usage_operation_; - GcsClient *client_impl_; Sequencer sequencer_; @@ -607,8 +592,7 @@ class WorkerInfoAccessor { /// /// \param subscribe Callback that will be called each time when a worker failed. /// \param done Callback that will be called when subscription is complete. - /// \return Status - virtual Status AsyncSubscribeToWorkerFailures( + virtual void AsyncSubscribeToWorkerFailures( const ItemCallback &subscribe, const StatusCallback &done); /// Report a worker failure to GCS asynchronously. diff --git a/src/ray/gcs_rpc_client/gcs_client.cc b/src/ray/gcs_rpc_client/gcs_client.cc index 884ae9012e18..6cc2dae8f44c 100644 --- a/src/ray/gcs_rpc_client/gcs_client.cc +++ b/src/ray/gcs_rpc_client/gcs_client.cc @@ -118,7 +118,6 @@ Status GcsClient::Connect(instrumented_io_context &io_service, int64_t timeout_m job_accessor_->AsyncResubscribe(); actor_accessor_->AsyncResubscribe(); node_accessor_->AsyncResubscribe(); - node_resource_accessor_->AsyncResubscribe(); worker_accessor_->AsyncResubscribe(); }; diff --git a/src/ray/gcs_rpc_client/tests/gcs_client_test.cc b/src/ray/gcs_rpc_client/tests/gcs_client_test.cc index 5729af6d76f6..b9842b7d2612 100644 --- a/src/ray/gcs_rpc_client/tests/gcs_client_test.cc +++ b/src/ray/gcs_rpc_client/tests/gcs_client_test.cc @@ -235,8 +235,8 @@ class GcsClientTest : public ::testing::TestWithParam { bool SubscribeToAllJobs( const gcs::SubscribeCallback &subscribe) { std::promise promise; - RAY_CHECK_OK(gcs_client_->Jobs().AsyncSubscribeAll( - subscribe, [&promise](Status status) { promise.set_value(status.ok()); })); + gcs_client_->Jobs().AsyncSubscribeAll( + subscribe, [&promise](Status status) { promise.set_value(status.ok()); }); return WaitReady(promise.get_future(), timeout_ms_); } @@ -271,15 +271,14 @@ class GcsClientTest : public ::testing::TestWithParam { const ActorID &actor_id, const gcs::SubscribeCallback &subscribe) { std::promise promise; - RAY_CHECK_OK(gcs_client_->Actors().AsyncSubscribe( - actor_id, subscribe, [&promise](Status status) { - promise.set_value(status.ok()); - })); + gcs_client_->Actors().AsyncSubscribe(actor_id, subscribe, [&promise](Status status) { + promise.set_value(status.ok()); + }); return WaitReady(promise.get_future(), timeout_ms_); } void UnsubscribeActor(const ActorID &actor_id) { - RAY_CHECK_OK(gcs_client_->Actors().AsyncUnsubscribe(actor_id)); + gcs_client_->Actors().AsyncUnsubscribe(actor_id); } void WaitForActorUnsubscribed(const ActorID &actor_id) { @@ -426,8 +425,8 @@ class GcsClientTest : public ::testing::TestWithParam { bool SubscribeToWorkerFailures( const gcs::ItemCallback &subscribe) { std::promise promise; - RAY_CHECK_OK(gcs_client_->Workers().AsyncSubscribeToWorkerFailures( - subscribe, [&promise](Status status) { promise.set_value(status.ok()); })); + gcs_client_->Workers().AsyncSubscribeToWorkerFailures( + subscribe, [&promise](Status status) { promise.set_value(status.ok()); }); return WaitReady(promise.get_future(), timeout_ms_); } diff --git a/src/ray/pubsub/gcs_subscriber.cc b/src/ray/pubsub/gcs_subscriber.cc index eb99e8fa33f9..10437c6864fd 100644 --- a/src/ray/pubsub/gcs_subscriber.cc +++ b/src/ray/pubsub/gcs_subscriber.cc @@ -21,7 +21,7 @@ namespace ray { namespace pubsub { -Status GcsSubscriber::SubscribeAllJobs( +void GcsSubscriber::SubscribeAllJobs( const gcs::SubscribeCallback &subscribe, const gcs::StatusCallback &done) { auto subscribe_item_callback = [subscribe](rpc::PubMessage &&msg) { @@ -44,10 +44,9 @@ Status GcsSubscriber::SubscribeAllJobs( }, std::move(subscribe_item_callback), std::move(subscription_failure_callback)); - return Status::OK(); } -Status GcsSubscriber::SubscribeActor( +void GcsSubscriber::SubscribeActor( const ActorID &id, const gcs::SubscribeCallback &subscribe, const gcs::StatusCallback &done) { @@ -74,13 +73,11 @@ Status GcsSubscriber::SubscribeActor( }, std::move(subscription_callback), std::move(subscription_failure_callback)); - return Status::OK(); } -Status GcsSubscriber::UnsubscribeActor(const ActorID &id) { +void GcsSubscriber::UnsubscribeActor(const ActorID &id) { subscriber_->Unsubscribe( rpc::ChannelType::GCS_ACTOR_CHANNEL, gcs_address_, id.Binary()); - return Status::OK(); } bool GcsSubscriber::IsActorUnsubscribed(const ActorID &id) { @@ -138,7 +135,7 @@ void GcsSubscriber::SubscribeAllNodeAddressAndLiveness( std::move(subscription_failure_callback)); } -Status GcsSubscriber::SubscribeAllWorkerFailures( +void GcsSubscriber::SubscribeAllWorkerFailures( const gcs::ItemCallback &subscribe, const gcs::StatusCallback &done) { auto subscribe_item_callback = [subscribe](rpc::PubMessage &&msg) { @@ -163,7 +160,6 @@ Status GcsSubscriber::SubscribeAllWorkerFailures( }, std::move(subscribe_item_callback), std::move(subscription_failure_callback)); - return Status::OK(); } } // namespace pubsub diff --git a/src/ray/pubsub/gcs_subscriber.h b/src/ray/pubsub/gcs_subscriber.h index 7d4d5858d6a6..ce439b3657db 100644 --- a/src/ray/pubsub/gcs_subscriber.h +++ b/src/ray/pubsub/gcs_subscriber.h @@ -42,17 +42,16 @@ class GcsSubscriber { /// empty. /// Uses GCS pubsub when created with `subscriber`. - Status SubscribeActor( + void SubscribeActor( const ActorID &id, const gcs::SubscribeCallback &subscribe, const gcs::StatusCallback &done); - Status UnsubscribeActor(const ActorID &id); + void UnsubscribeActor(const ActorID &id); bool IsActorUnsubscribed(const ActorID &id); - Status SubscribeAllJobs( - const gcs::SubscribeCallback &subscribe, - const gcs::StatusCallback &done); + void SubscribeAllJobs(const gcs::SubscribeCallback &subscribe, + const gcs::StatusCallback &done); void SubscribeAllNodeInfo(const gcs::ItemCallback &subscribe, const gcs::StatusCallback &done); @@ -61,7 +60,7 @@ class GcsSubscriber { const gcs::ItemCallback &subscribe, const gcs::StatusCallback &done); - Status SubscribeAllWorkerFailures( + void SubscribeAllWorkerFailures( const gcs::ItemCallback &subscribe, const gcs::StatusCallback &done); diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 4145f9427094..4a2d97e49037 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -368,8 +368,7 @@ void NodeManager::RegisterGcs() { HandleUnexpectedWorkerFailure( WorkerID::FromBinary(worker_failure_data.worker_id())); }; - RAY_CHECK_OK(gcs_client_.Workers().AsyncSubscribeToWorkerFailures( - worker_failure_handler, nullptr)); + gcs_client_.Workers().AsyncSubscribeToWorkerFailures(worker_failure_handler, nullptr); // Subscribe to job updates. const auto job_subscribe_handler = [this](const JobID &job_id, @@ -386,7 +385,7 @@ void NodeManager::RegisterGcs() { HandleJobFinished(job_id, job_data); } }; - RAY_CHECK_OK(gcs_client_.Jobs().AsyncSubscribeAll(job_subscribe_handler, nullptr)); + gcs_client_.Jobs().AsyncSubscribeAll(job_subscribe_handler, nullptr); periodical_runner_->RunFnPeriodically( [this] { diff --git a/src/ray/raylet/tests/node_manager_test.cc b/src/ray/raylet/tests/node_manager_test.cc index 807c83a7b465..8999f013903c 100644 --- a/src/ray/raylet/tests/node_manager_test.cc +++ b/src/ray/raylet/tests/node_manager_test.cc @@ -476,10 +476,8 @@ TEST_F(NodeManagerTest, TestRegisterGcsAndCheckSelfAlive) { AsyncSubscribeToNodeAddressAndLivenessChange(_, _)) .Times(1); EXPECT_CALL(*mock_gcs_client_->mock_worker_accessor, - AsyncSubscribeToWorkerFailures(_, _)) - .WillOnce(Return(Status::OK())); - EXPECT_CALL(*mock_gcs_client_->mock_job_accessor, AsyncSubscribeAll(_, _)) - .WillOnce(Return(Status::OK())); + AsyncSubscribeToWorkerFailures(_, _)); + EXPECT_CALL(*mock_gcs_client_->mock_job_accessor, AsyncSubscribeAll(_, _)); EXPECT_CALL(mock_worker_pool_, GetAllRegisteredWorkers(_, _)) .WillRepeatedly(Return(std::vector>{})); EXPECT_CALL(mock_worker_pool_, GetAllRegisteredDrivers(_, _)) @@ -506,8 +504,7 @@ TEST_F(NodeManagerTest, TestDetachedWorkerIsKilledByFailedWorker) { EXPECT_CALL(*mock_gcs_client_->mock_node_accessor, AsyncSubscribeToNodeAddressAndLivenessChange(_, _)) .Times(1); - EXPECT_CALL(*mock_gcs_client_->mock_job_accessor, AsyncSubscribeAll(_, _)) - .WillOnce(Return(Status::OK())); + EXPECT_CALL(*mock_gcs_client_->mock_job_accessor, AsyncSubscribeAll(_, _)); EXPECT_CALL(mock_worker_pool_, GetAllRegisteredWorkers(_, _)) .WillRepeatedly(Return(std::vector>{})); EXPECT_CALL(mock_worker_pool_, GetAllRegisteredDrivers(_, _)) @@ -582,10 +579,8 @@ TEST_F(NodeManagerTest, TestDetachedWorkerIsKilledByFailedNode) { EXPECT_CALL(*mock_object_directory_, HandleNodeRemoved(_)).Times(1); EXPECT_CALL(*mock_object_manager_, HandleNodeRemoved(_)).Times(1); EXPECT_CALL(*mock_gcs_client_->mock_worker_accessor, - AsyncSubscribeToWorkerFailures(_, _)) - .WillOnce(Return(Status::OK())); - EXPECT_CALL(*mock_gcs_client_->mock_job_accessor, AsyncSubscribeAll(_, _)) - .WillOnce(Return(Status::OK())); + AsyncSubscribeToWorkerFailures(_, _)); + EXPECT_CALL(*mock_gcs_client_->mock_job_accessor, AsyncSubscribeAll(_, _)); EXPECT_CALL(mock_worker_pool_, GetAllRegisteredWorkers(_, _)) .WillRepeatedly(Return(std::vector>{})); EXPECT_CALL(mock_worker_pool_, GetAllRegisteredDrivers(_, _)) diff --git a/src/ray/raylet_rpc_client/raylet_client.cc b/src/ray/raylet_rpc_client/raylet_client.cc index c3484c568dfd..eee711936a02 100644 --- a/src/ray/raylet_rpc_client/raylet_client.cc +++ b/src/ray/raylet_rpc_client/raylet_client.cc @@ -440,12 +440,13 @@ void RayletClient::CancelLeasesWithResourceShapes( void RayletClient::NotifyGCSRestart( const rpc::ClientCallback &callback) { rpc::NotifyGCSRestartRequest request; - INVOKE_RPC_CALL(NodeManagerService, - NotifyGCSRestart, - request, - callback, - grpc_client_, - /*method_timeout_ms*/ -1); + INVOKE_RETRYABLE_RPC_CALL(retryable_grpc_client_, + NodeManagerService, + NotifyGCSRestart, + request, + callback, + grpc_client_, + /*method_timeout_ms*/ -1); } void RayletClient::GetSystemConfig(