diff --git a/src/mock/ray/raylet/worker.h b/src/mock/ray/raylet/worker.h index ea2c594830e2..1f0a04837b50 100644 --- a/src/mock/ray/raylet/worker.h +++ b/src/mock/ray/raylet/worker.h @@ -51,7 +51,6 @@ class MockWorkerInterface : public WorkerInterface { MOCK_METHOD(int, GetRuntimeEnvHash, (), (const, override)); MOCK_METHOD(void, AssignActorId, (const ActorID &actor_id), (override)); MOCK_METHOD(const ActorID &, GetActorId, (), (const, override)); - MOCK_METHOD(void, MarkDetachedActor, (), (override)); MOCK_METHOD(bool, IsDetachedActor, (), (const, override)); MOCK_METHOD(const std::shared_ptr, Connection, (), (const, override)); MOCK_METHOD(void, SetOwnerAddress, (const rpc::Address &address), (override)); diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index b20b6ef1eb4a..3e3dd03d57b1 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -2270,7 +2270,6 @@ void NodeManager::FinishAssignedActorCreationTask(WorkerInterface &worker, worker.AssignActorId(actor_id); if (task_spec.IsDetachedActor()) { - worker.MarkDetachedActor(); auto job_id = task.GetTaskSpecification().JobId(); auto job_config = worker_pool_.GetJobConfig(job_id); RAY_CHECK(job_config); diff --git a/src/ray/raylet/test/node_manager_test.cc b/src/ray/raylet/test/node_manager_test.cc index 0b6c258b14a4..cf0bedd598e6 100644 --- a/src/ray/raylet/test/node_manager_test.cc +++ b/src/ray/raylet/test/node_manager_test.cc @@ -69,6 +69,50 @@ TaskSpecification BuildTaskSpec( return std::move(builder).ConsumeAndBuild(); } +TaskSpecBuilder DetachedActorCreationTaskBuilder(const rpc::Address &owner_address, + const ActorID &actor_id) { + rpc::JobConfig config; + const FunctionDescriptor function_descriptor = + FunctionDescriptorBuilder::BuildPython("x", "", "", ""); + TaskSpecBuilder task_spec_builder; + task_spec_builder.SetCommonTaskSpec(TaskID::FromRandom(JobID::Nil()), + "dummy_task", + Language::PYTHON, + function_descriptor, + JobID::Nil(), + config, + TaskID::Nil(), + 0, + TaskID::Nil(), + owner_address, + 1, + false, + false, + -1, + {{"CPU", 0}}, + {{"CPU", 0}}, + "", + 0, + TaskID::Nil(), + ""); + task_spec_builder.SetActorCreationTaskSpec(actor_id, + /*serialized_actor_handle=*/"", + rpc::SchedulingStrategy(), + /*max_restarts=*/0, + /*max_task_retries=*/0, + /*dynamic_worker_options=*/{}, + /*max_concurrency=*/1, + /*is_detached=*/true, + /*name=*/"", + /*ray_namespace=*/"", + /*is_asyncio=*/false, + /*concurrency_groups=*/{}, + /*extension_data=*/"", + /*execute_out_of_order=*/false, + /*root_detached_actor_id=*/actor_id); + return task_spec_builder; +} + } // namespace TEST(NodeManagerStaticTest, TestHandleReportWorkerBacklog) { @@ -195,7 +239,8 @@ class NodeManagerTest : public ::testing::Test { return std::make_shared(); }) { RayConfig::instance().initialize(R"({ - "raylet_liveness_self_check_interval_ms": 100 + "raylet_liveness_self_check_interval_ms": 100, + "kill_worker_timeout_milliseconds": 10 })"); NodeManagerConfig node_manager_config{}; @@ -391,6 +436,215 @@ TEST_F(NodeManagerTest, TestRegisterGcsAndCheckSelfAlive) { thread.join(); } +TEST_F(NodeManagerTest, TestDetachedWorkerIsKilledByFailedWorker) { + EXPECT_CALL(*mock_gcs_client_->mock_node_accessor, AsyncSubscribeToNodeChange(_, _)) + .WillOnce(Return(Status::OK())); + EXPECT_CALL(*mock_gcs_client_->mock_job_accessor, AsyncSubscribeAll(_, _)) + .WillOnce(Return(Status::OK())); + EXPECT_CALL(*mock_gcs_client_->mock_node_accessor, AsyncCheckSelfAlive(_, _)) + .WillRepeatedly(Return(Status::OK())); + EXPECT_CALL(mock_worker_pool_, GetAllRegisteredWorkers(_, _)) + .WillRepeatedly(Return(std::vector>{})); + EXPECT_CALL(mock_worker_pool_, GetAllRegisteredDrivers(_)) + .WillRepeatedly(Return(std::vector>{})); + EXPECT_CALL(mock_worker_pool_, IsWorkerAvailableForScheduling()) + .WillRepeatedly(Return(false)); + EXPECT_CALL(mock_worker_pool_, PrestartWorkers(_, _)).Times(1); + + std::promise pop_worker_callback_promise; + PopWorkerCallback pop_worker_callback; + gcs::ItemCallback publish_worker_failure_callback; + + // Save the publish_worker_failure_callback for publishing a worker failure event later. + EXPECT_CALL(*mock_gcs_client_->mock_worker_accessor, + AsyncSubscribeToWorkerFailures(_, _)) + .WillOnce([&](const gcs::ItemCallback &subscribe, + const gcs::StatusCallback &done) { + publish_worker_failure_callback = subscribe; + return Status::OK(); + }); + + // Save the pop_worker_callback for providing a mock worker later. + EXPECT_CALL(mock_worker_pool_, PopWorker(_, _)) + .WillOnce( + [&](const TaskSpecification &task_spec, const PopWorkerCallback &callback) { + pop_worker_callback = callback; + pop_worker_callback_promise.set_value(); + }); + + // Invoke RegisterGcs and io_service_.run() so that the above EXPECT_CALLs can be + // triggered. + RAY_CHECK_OK(node_manager_->RegisterGcs()); + std::thread io_thread{[&] { + // Run the io_service in a separate thread to avoid blocking the main thread. + auto work_guard = boost::asio::make_work_guard(io_service_); + io_service_.run(); + }}; + + std::thread grpc_client_thread{[&] { + // Run the grpc client in a separate thread to avoid blocking the main thread. + + // Preparing a detached actor creation task spec for the later RequestWorkerLease rpc. + const auto owner_worker_id = WorkerID::FromRandom(); + rpc::Address owner_address; + owner_address.set_worker_id(owner_worker_id.Binary()); + const auto actor_id = + ActorID::Of(JobID::FromInt(1), TaskID::FromRandom(JobID::FromInt(1)), 0); + const auto task_spec_builder = + DetachedActorCreationTaskBuilder(owner_address, actor_id); + + // Invoke RequestWorkerLease to request a leased worker for the task in the + // NodeManager. + grpc::ClientContext context; + rpc::RequestWorkerLeaseReply reply; + rpc::RequestWorkerLeaseRequest request; + request.mutable_resource_spec()->CopyFrom(task_spec_builder.GetMessage()); + auto channel = + grpc::CreateChannel("localhost:" + std::to_string(node_manager_->GetServerPort()), + grpc::InsecureChannelCredentials()); + auto stub = rpc::NodeManagerService::NewStub(channel); + auto status = stub->RequestWorkerLease(&context, request, &reply); + EXPECT_TRUE(status.ok()); + + // After RequestWorkerLease, a leased worker is ready in the NodeManager. + // Then use publish_worker_failure_callback to say owner_worker_id is dead. + // The leased worker should not be killed by this because it is a detached actor. + rpc::WorkerDeltaData delta_data; + delta_data.set_worker_id(owner_worker_id.Binary()); + publish_worker_failure_callback(std::move(delta_data)); + }}; + + // Prepare a mock worker with a real process so that we can check if the process is + // alive later. + const auto worker = std::make_shared(WorkerID::FromRandom(), 10); + auto [proc, spawn_error] = + Process::Spawn(std::vector{"sleep", "1000"}, true); + EXPECT_FALSE(spawn_error); + worker->SetProcess(proc); + // Complete the RequestWorkerLease rpc with the mock worker. + pop_worker_callback_promise.get_future().wait(); + pop_worker_callback(worker, PopWorkerStatus::OK, ""); + + // Wait for the client thead to complete. This waits for the RequestWorkerLease call + // and publish_worker_failure_callback to finish. + grpc_client_thread.join(); + // Wait for more than kill_worker_timeout_milliseconds. + std::this_thread::sleep_for(std::chrono::seconds(1)); + // The process should still be alive because it should not be killed by + // publish_worker_failure_callback. + EXPECT_TRUE(proc.IsAlive()); + // clean up. + proc.Kill(); + io_service_.stop(); + io_thread.join(); +} + +TEST_F(NodeManagerTest, TestDetachedWorkerIsKilledByFailedNode) { + EXPECT_CALL(*mock_object_directory_, HandleNodeRemoved(_)).Times(1); + EXPECT_CALL(*mock_object_manager_, HandleNodeRemoved(_)).Times(1); + EXPECT_CALL(*mock_gcs_client_->mock_worker_accessor, + AsyncSubscribeToWorkerFailures(_, _)) + .WillOnce(Return(Status::OK())); + EXPECT_CALL(*mock_gcs_client_->mock_job_accessor, AsyncSubscribeAll(_, _)) + .WillOnce(Return(Status::OK())); + EXPECT_CALL(*mock_gcs_client_->mock_node_accessor, AsyncCheckSelfAlive(_, _)) + .WillRepeatedly(Return(Status::OK())); + EXPECT_CALL(mock_worker_pool_, GetAllRegisteredWorkers(_, _)) + .WillRepeatedly(Return(std::vector>{})); + EXPECT_CALL(mock_worker_pool_, GetAllRegisteredDrivers(_)) + .WillRepeatedly(Return(std::vector>{})); + EXPECT_CALL(mock_worker_pool_, IsWorkerAvailableForScheduling()) + .WillRepeatedly(Return(false)); + EXPECT_CALL(mock_worker_pool_, PrestartWorkers(_, _)).Times(1); + + std::promise pop_worker_callback_promise; + PopWorkerCallback pop_worker_callback; + std::function + publish_node_change_callback; + + // Save the publish_node_change_callback for publishing a node failure event later. + EXPECT_CALL(*mock_gcs_client_->mock_node_accessor, AsyncSubscribeToNodeChange(_, _)) + .WillOnce([&](const gcs::SubscribeCallback &subscribe, + const gcs::StatusCallback &done) { + publish_node_change_callback = subscribe; + return Status::OK(); + }); + + // Save the pop_worker_callback for providing a mock worker later. + EXPECT_CALL(mock_worker_pool_, PopWorker(_, _)) + .WillOnce( + [&](const TaskSpecification &task_spec, const PopWorkerCallback &callback) { + pop_worker_callback = callback; + pop_worker_callback_promise.set_value(); + }); + + // Invoke RegisterGcs and io_service_.run() so that the above EXPECT_CALLs can be + // triggered. + RAY_CHECK_OK(node_manager_->RegisterGcs()); + std::thread io_thread{[&] { + // Run the io_service in a separate thread to avoid blocking the main thread. + auto work_guard = boost::asio::make_work_guard(io_service_); + io_service_.run(); + }}; + + std::thread grpc_client_thread{[&] { + // Run the grpc client in a separate thread to avoid blocking the main thread. + + // Preparing a detached actor creation task spec for the later RequestWorkerLease rpc. + const auto owner_node_id = NodeID::FromRandom(); + rpc::Address owner_address; + owner_address.set_raylet_id(owner_node_id.Binary()); + const auto actor_id = + ActorID::Of(JobID::FromInt(1), TaskID::FromRandom(JobID::FromInt(1)), 0); + const auto task_spec_builder = + DetachedActorCreationTaskBuilder(owner_address, actor_id); + + // Invoke RequestWorkerLease to request a leased worker for the task in the + // NodeManager. + grpc::ClientContext context; + rpc::RequestWorkerLeaseReply reply; + rpc::RequestWorkerLeaseRequest request; + request.mutable_resource_spec()->CopyFrom(task_spec_builder.GetMessage()); + auto channel = + grpc::CreateChannel("localhost:" + std::to_string(node_manager_->GetServerPort()), + grpc::InsecureChannelCredentials()); + auto stub = rpc::NodeManagerService::NewStub(channel); + auto status = stub->RequestWorkerLease(&context, request, &reply); + EXPECT_TRUE(status.ok()); + + // After RequestWorkerLease, a leased worker is ready in the NodeManager. + // Then use publish_node_change_callback to say owner_node_id is dead. + // The leased worker should not be killed by this because it is a detached actor. + GcsNodeInfo node_info; + node_info.set_state(GcsNodeInfo::DEAD); + publish_node_change_callback(owner_node_id, std::move(node_info)); + }}; + + // Prepare a mock worker with a real process so that we can check if the process is + // alive later. + const auto worker = std::make_shared(WorkerID::FromRandom(), 10); + auto [proc, spawn_error] = + Process::Spawn(std::vector{"sleep", "1000"}, true); + EXPECT_FALSE(spawn_error); + worker->SetProcess(proc); + // Complete the RequestWorkerLease rpc with the mock worker. + pop_worker_callback_promise.get_future().wait(); + pop_worker_callback(worker, PopWorkerStatus::OK, ""); + + // Wait for the client thead to complete. This waits for the RequestWorkerLease call + // and publish_worker_failure_callback to finish. + grpc_client_thread.join(); + // Wait for more than kill_worker_timeout_milliseconds. + std::this_thread::sleep_for(std::chrono::seconds(1)); + // The process should still be alive because it should not be killed by + // publish_worker_failure_callback. + EXPECT_TRUE(proc.IsAlive()); + // clean up. + proc.Kill(); + io_service_.stop(); + io_thread.join(); +} + } // namespace ray::raylet int main(int argc, char **argv) { diff --git a/src/ray/raylet/test/util.h b/src/ray/raylet/test/util.h index dbc4b739ba50..55a350af9287 100644 --- a/src/ray/raylet/test/util.h +++ b/src/ray/raylet/test/util.h @@ -16,6 +16,7 @@ #include #include +#include #include #include "ray/raylet/worker.h" @@ -29,9 +30,9 @@ class MockWorker : public WorkerInterface { MockWorker(WorkerID worker_id, int port, int runtime_env_hash = 0) : worker_id_(worker_id), port_(port), - is_detached_actor_(false), runtime_env_hash_(runtime_env_hash), - job_id_(JobID::FromInt(859)) {} + job_id_(JobID::FromInt(859)), + proc_(Process::CreateNewDummy()) {} WorkerID WorkerId() const override { return worker_id_; } @@ -47,6 +48,11 @@ class MockWorker : public WorkerInterface { task_ = assigned_task; task_assign_time_ = absl::Now(); root_detached_actor_id_ = assigned_task.GetTaskSpecification().RootDetachedActorId(); + const auto &task_spec = assigned_task.GetTaskSpecification(); + SetJobId(task_spec.JobId()); + SetBundleId(task_spec.PlacementGroupBundleId()); + SetOwnerAddress(task_spec.CallerAddress()); + AssignTaskId(task_spec.TaskId()); }; absl::Time GetAssignedTaskTime() const override { return task_assign_time_; }; @@ -85,9 +91,9 @@ class MockWorker : public WorkerInterface { void MarkUnblocked() override { blocked_ = false; } bool IsBlocked() const override { return blocked_; } - Process GetProcess() const override { return Process::CreateNewDummy(); } + Process GetProcess() const override { return proc_; } StartupToken GetStartupToken() const override { return 0; } - void SetProcess(Process proc) override { RAY_CHECK(false) << "Method unused"; } + void SetProcess(Process proc) override { proc_ = std::move(proc); } Language GetLanguage() const override { RAY_CHECK(false) << "Method unused"; @@ -119,16 +125,16 @@ class MockWorker : public WorkerInterface { RAY_CHECK(false) << "Method unused"; return ""; } - void MarkDetachedActor() override { is_detached_actor_ = true; } - bool IsDetachedActor() const override { return is_detached_actor_; } + + bool IsDetachedActor() const override { + return task_.GetTaskSpecification().IsDetachedActor(); + } + const std::shared_ptr Connection() const override { RAY_CHECK(false) << "Method unused"; return nullptr; } - const rpc::Address &GetOwnerAddress() const override { - RAY_CHECK(false) << "Method unused"; - return address_; - } + const rpc::Address &GetOwnerAddress() const override { return address_; } void ActorCallArgWaitComplete(int64_t tag) override { RAY_CHECK(false) << "Method unused"; @@ -184,7 +190,6 @@ class MockWorker : public WorkerInterface { std::vector borrowed_cpu_instances_; std::optional is_gpu_; std::optional is_actor_worker_; - bool is_detached_actor_; BundleID bundle_id_; bool blocked_ = false; RayTask task_; @@ -193,6 +198,7 @@ class MockWorker : public WorkerInterface { TaskID task_id_; JobID job_id_; ActorID root_detached_actor_id_; + Process proc_; }; } // namespace raylet diff --git a/src/ray/raylet/worker.cc b/src/ray/raylet/worker.cc index ab41e082e891..4ce55df2f3dc 100644 --- a/src/ray/raylet/worker.cc +++ b/src/ray/raylet/worker.cc @@ -50,8 +50,7 @@ Worker::Worker(const JobID &job_id, bundle_id_(std::make_pair(PlacementGroupID::Nil(), -1)), dead_(false), blocked_(false), - client_call_manager_(client_call_manager), - is_detached_actor_(false) {} + client_call_manager_(client_call_manager) {} rpc::WorkerType Worker::GetWorkerType() const { return worker_type_; } @@ -169,9 +168,9 @@ const std::string Worker::GetTaskOrActorIdAsDebugString() const { return id_ss.str(); } -void Worker::MarkDetachedActor() { is_detached_actor_ = true; } - -bool Worker::IsDetachedActor() const { return is_detached_actor_; } +bool Worker::IsDetachedActor() const { + return assigned_task_.GetTaskSpecification().IsDetachedActor(); +} const std::shared_ptr Worker::Connection() const { return connection_; } diff --git a/src/ray/raylet/worker.h b/src/ray/raylet/worker.h index dab0ac560d7b..4da65e9fa30b 100644 --- a/src/ray/raylet/worker.h +++ b/src/ray/raylet/worker.h @@ -75,7 +75,6 @@ class WorkerInterface { virtual void AssignActorId(const ActorID &actor_id) = 0; virtual const ActorID &GetActorId() const = 0; virtual const std::string GetTaskOrActorIdAsDebugString() const = 0; - virtual void MarkDetachedActor() = 0; virtual bool IsDetachedActor() const = 0; virtual const std::shared_ptr Connection() const = 0; virtual void SetOwnerAddress(const rpc::Address &address) = 0; @@ -186,7 +185,6 @@ class Worker : public WorkerInterface { // Creates the debug string for the ID of the task or actor depending on which is // running. const std::string GetTaskOrActorIdAsDebugString() const; - void MarkDetachedActor(); bool IsDetachedActor() const; const std::shared_ptr Connection() const; void SetOwnerAddress(const rpc::Address &address); @@ -307,9 +305,6 @@ class Worker : public WorkerInterface { rpc::ClientCallManager &client_call_manager_; /// The rpc client to send tasks to this worker. std::shared_ptr rpc_client_; - /// Whether the worker is detached. This is applies when the worker is actor. - /// Detached actor means the actor's creator can exit without killing this actor. - bool is_detached_actor_; /// The address of this worker's owner. The owner is the worker that /// currently holds the lease on this worker, if any. rpc::Address owner_address_;