Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/mock/ray/raylet/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ class MockWorkerInterface : public WorkerInterface {
MOCK_METHOD(int, GetRuntimeEnvHash, (), (const, override));
MOCK_METHOD(void, AssignActorId, (const ActorID &actor_id), (override));
MOCK_METHOD(const ActorID &, GetActorId, (), (const, override));
MOCK_METHOD(void, MarkDetachedActor, (), (override));
MOCK_METHOD(bool, IsDetachedActor, (), (const, override));
MOCK_METHOD(const std::shared_ptr<ClientConnection>, Connection, (), (const, override));
MOCK_METHOD(void, SetOwnerAddress, (const rpc::Address &address), (override));
Expand Down
1 change: 0 additions & 1 deletion src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2270,7 +2270,6 @@ void NodeManager::FinishAssignedActorCreationTask(WorkerInterface &worker,
worker.AssignActorId(actor_id);

if (task_spec.IsDetachedActor()) {
worker.MarkDetachedActor();
auto job_id = task.GetTaskSpecification().JobId();
auto job_config = worker_pool_.GetJobConfig(job_id);
RAY_CHECK(job_config);
Expand Down
256 changes: 255 additions & 1 deletion src/ray/raylet/test/node_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,50 @@ TaskSpecification BuildTaskSpec(
return std::move(builder).ConsumeAndBuild();
}

TaskSpecBuilder DetachedActorCreationTaskBuilder(const rpc::Address &owner_address,
const ActorID &actor_id) {
rpc::JobConfig config;
const FunctionDescriptor function_descriptor =
FunctionDescriptorBuilder::BuildPython("x", "", "", "");
TaskSpecBuilder task_spec_builder;
task_spec_builder.SetCommonTaskSpec(TaskID::FromRandom(JobID::Nil()),
"dummy_task",
Language::PYTHON,
function_descriptor,
JobID::Nil(),
config,
TaskID::Nil(),
0,
TaskID::Nil(),
owner_address,
1,
false,
false,
-1,
{{"CPU", 0}},
{{"CPU", 0}},
"",
0,
TaskID::Nil(),
"");
task_spec_builder.SetActorCreationTaskSpec(actor_id,
/*serialized_actor_handle=*/"",
rpc::SchedulingStrategy(),
/*max_restarts=*/0,
/*max_task_retries=*/0,
/*dynamic_worker_options=*/{},
/*max_concurrency=*/1,
/*is_detached=*/true,
/*name=*/"",
/*ray_namespace=*/"",
/*is_asyncio=*/false,
/*concurrency_groups=*/{},
/*extension_data=*/"",
/*execute_out_of_order=*/false,
/*root_detached_actor_id=*/actor_id);
return task_spec_builder;
}

} // namespace

TEST(NodeManagerStaticTest, TestHandleReportWorkerBacklog) {
Expand Down Expand Up @@ -195,7 +239,8 @@ class NodeManagerTest : public ::testing::Test {
return std::make_shared<rpc::MockCoreWorkerClientInterface>();
}) {
RayConfig::instance().initialize(R"({
"raylet_liveness_self_check_interval_ms": 100
"raylet_liveness_self_check_interval_ms": 100,
"kill_worker_timeout_milliseconds": 10
})");

NodeManagerConfig node_manager_config{};
Expand Down Expand Up @@ -391,6 +436,215 @@ TEST_F(NodeManagerTest, TestRegisterGcsAndCheckSelfAlive) {
thread.join();
}

TEST_F(NodeManagerTest, TestDetachedWorkerIsKilledByFailedWorker) {
EXPECT_CALL(*mock_gcs_client_->mock_node_accessor, AsyncSubscribeToNodeChange(_, _))
.WillOnce(Return(Status::OK()));
EXPECT_CALL(*mock_gcs_client_->mock_job_accessor, AsyncSubscribeAll(_, _))
.WillOnce(Return(Status::OK()));
EXPECT_CALL(*mock_gcs_client_->mock_node_accessor, AsyncCheckSelfAlive(_, _))
.WillRepeatedly(Return(Status::OK()));
EXPECT_CALL(mock_worker_pool_, GetAllRegisteredWorkers(_, _))
.WillRepeatedly(Return(std::vector<std::shared_ptr<WorkerInterface>>{}));
EXPECT_CALL(mock_worker_pool_, GetAllRegisteredDrivers(_))
.WillRepeatedly(Return(std::vector<std::shared_ptr<WorkerInterface>>{}));
EXPECT_CALL(mock_worker_pool_, IsWorkerAvailableForScheduling())
.WillRepeatedly(Return(false));
EXPECT_CALL(mock_worker_pool_, PrestartWorkers(_, _)).Times(1);
Comment on lines +440 to +452
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All unit tests should be written against an API and not an implementation. See discussion here for more context.

Are all of these EXPECT_CALLs strictly necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test will still pass without those assertions, but without any of them, there will be GMOCK WARNINGs like this:

GMOCK WARNING:
Uninteresting mock function call - returning default value.
    Function call: AsyncSubscribeToNodeChange(@0x16b1e21e8 32-byte object <88-85 68-07 01-00 00-00 00-B6 00-3A 01-00 00-00 10-93 81-38 01-00 00-00 E8-21 1E-6B 01-00 00-00>, @0x16b1e21c8 32-byte object <70-86 68-07 01-00 00-00 00-B6 00-3A 01-00 00-00 E8-28 1E-6B 01-00 00-00 C8-21 1E-6B 01-00 00-00>)
          Returns: OK
NOTE: You can safely ignore the above warning unless this call should not happen.  Do not suppress it by blindly adding an EXPECT_CALL() if you don't mean to enforce the call.  See https://github.com/google/googletest/blob/main/docs/gmock_cook_book.md#knowing-when-to-expect-useoncall for details.

Do you think we should still remove those assertions and keep those warnings?


std::promise<void> pop_worker_callback_promise;
PopWorkerCallback pop_worker_callback;
gcs::ItemCallback<rpc::WorkerDeltaData> publish_worker_failure_callback;

// Save the publish_worker_failure_callback for publishing a worker failure event later.
EXPECT_CALL(*mock_gcs_client_->mock_worker_accessor,
AsyncSubscribeToWorkerFailures(_, _))
.WillOnce([&](const gcs::ItemCallback<rpc::WorkerDeltaData> &subscribe,
const gcs::StatusCallback &done) {
publish_worker_failure_callback = subscribe;
return Status::OK();
});

// Save the pop_worker_callback for providing a mock worker later.
EXPECT_CALL(mock_worker_pool_, PopWorker(_, _))
.WillOnce(
[&](const TaskSpecification &task_spec, const PopWorkerCallback &callback) {
pop_worker_callback = callback;
pop_worker_callback_promise.set_value();
});

// Invoke RegisterGcs and io_service_.run() so that the above EXPECT_CALLs can be
// triggered.
RAY_CHECK_OK(node_manager_->RegisterGcs());
std::thread io_thread{[&] {
// Run the io_service in a separate thread to avoid blocking the main thread.
auto work_guard = boost::asio::make_work_guard(io_service_);
io_service_.run();
}};

std::thread grpc_client_thread{[&] {
// Run the grpc client in a separate thread to avoid blocking the main thread.

// Preparing a detached actor creation task spec for the later RequestWorkerLease rpc.
const auto owner_worker_id = WorkerID::FromRandom();
rpc::Address owner_address;
owner_address.set_worker_id(owner_worker_id.Binary());
const auto actor_id =
ActorID::Of(JobID::FromInt(1), TaskID::FromRandom(JobID::FromInt(1)), 0);
const auto task_spec_builder =
DetachedActorCreationTaskBuilder(owner_address, actor_id);

// Invoke RequestWorkerLease to request a leased worker for the task in the
// NodeManager.
grpc::ClientContext context;
rpc::RequestWorkerLeaseReply reply;
rpc::RequestWorkerLeaseRequest request;
request.mutable_resource_spec()->CopyFrom(task_spec_builder.GetMessage());
auto channel =
grpc::CreateChannel("localhost:" + std::to_string(node_manager_->GetServerPort()),
grpc::InsecureChannelCredentials());
auto stub = rpc::NodeManagerService::NewStub(channel);
auto status = stub->RequestWorkerLease(&context, request, &reply);
EXPECT_TRUE(status.ok());

// After RequestWorkerLease, a leased worker is ready in the NodeManager.
// Then use publish_worker_failure_callback to say owner_worker_id is dead.
// The leased worker should not be killed by this because it is a detached actor.
rpc::WorkerDeltaData delta_data;
delta_data.set_worker_id(owner_worker_id.Binary());
publish_worker_failure_callback(std::move(delta_data));
}};

// Prepare a mock worker with a real process so that we can check if the process is
// alive later.
const auto worker = std::make_shared<MockWorker>(WorkerID::FromRandom(), 10);
auto [proc, spawn_error] =
Process::Spawn(std::vector<std::string>{"sleep", "1000"}, true);
EXPECT_FALSE(spawn_error);
worker->SetProcess(proc);
// Complete the RequestWorkerLease rpc with the mock worker.
pop_worker_callback_promise.get_future().wait();
pop_worker_callback(worker, PopWorkerStatus::OK, "");

// Wait for the client thead to complete. This waits for the RequestWorkerLease call
// and publish_worker_failure_callback to finish.
grpc_client_thread.join();
// Wait for more than kill_worker_timeout_milliseconds.
std::this_thread::sleep_for(std::chrono::seconds(1));
// The process should still be alive because it should not be killed by
// publish_worker_failure_callback.
EXPECT_TRUE(proc.IsAlive());
// clean up.
proc.Kill();
io_service_.stop();
io_thread.join();
}

TEST_F(NodeManagerTest, TestDetachedWorkerIsKilledByFailedNode) {
EXPECT_CALL(*mock_object_directory_, HandleNodeRemoved(_)).Times(1);
EXPECT_CALL(*mock_object_manager_, HandleNodeRemoved(_)).Times(1);
EXPECT_CALL(*mock_gcs_client_->mock_worker_accessor,
AsyncSubscribeToWorkerFailures(_, _))
.WillOnce(Return(Status::OK()));
EXPECT_CALL(*mock_gcs_client_->mock_job_accessor, AsyncSubscribeAll(_, _))
.WillOnce(Return(Status::OK()));
EXPECT_CALL(*mock_gcs_client_->mock_node_accessor, AsyncCheckSelfAlive(_, _))
.WillRepeatedly(Return(Status::OK()));
EXPECT_CALL(mock_worker_pool_, GetAllRegisteredWorkers(_, _))
.WillRepeatedly(Return(std::vector<std::shared_ptr<WorkerInterface>>{}));
EXPECT_CALL(mock_worker_pool_, GetAllRegisteredDrivers(_))
.WillRepeatedly(Return(std::vector<std::shared_ptr<WorkerInterface>>{}));
EXPECT_CALL(mock_worker_pool_, IsWorkerAvailableForScheduling())
.WillRepeatedly(Return(false));
EXPECT_CALL(mock_worker_pool_, PrestartWorkers(_, _)).Times(1);

std::promise<void> pop_worker_callback_promise;
PopWorkerCallback pop_worker_callback;
std::function<void(const NodeID &id, rpc::GcsNodeInfo &&node_info)>
publish_node_change_callback;

// Save the publish_node_change_callback for publishing a node failure event later.
EXPECT_CALL(*mock_gcs_client_->mock_node_accessor, AsyncSubscribeToNodeChange(_, _))
.WillOnce([&](const gcs::SubscribeCallback<NodeID, rpc::GcsNodeInfo> &subscribe,
const gcs::StatusCallback &done) {
publish_node_change_callback = subscribe;
return Status::OK();
});

// Save the pop_worker_callback for providing a mock worker later.
EXPECT_CALL(mock_worker_pool_, PopWorker(_, _))
.WillOnce(
[&](const TaskSpecification &task_spec, const PopWorkerCallback &callback) {
pop_worker_callback = callback;
pop_worker_callback_promise.set_value();
});

// Invoke RegisterGcs and io_service_.run() so that the above EXPECT_CALLs can be
// triggered.
RAY_CHECK_OK(node_manager_->RegisterGcs());
std::thread io_thread{[&] {
// Run the io_service in a separate thread to avoid blocking the main thread.
auto work_guard = boost::asio::make_work_guard(io_service_);
io_service_.run();
}};

std::thread grpc_client_thread{[&] {
// Run the grpc client in a separate thread to avoid blocking the main thread.

// Preparing a detached actor creation task spec for the later RequestWorkerLease rpc.
const auto owner_node_id = NodeID::FromRandom();
rpc::Address owner_address;
owner_address.set_raylet_id(owner_node_id.Binary());
const auto actor_id =
ActorID::Of(JobID::FromInt(1), TaskID::FromRandom(JobID::FromInt(1)), 0);
const auto task_spec_builder =
DetachedActorCreationTaskBuilder(owner_address, actor_id);

// Invoke RequestWorkerLease to request a leased worker for the task in the
// NodeManager.
grpc::ClientContext context;
rpc::RequestWorkerLeaseReply reply;
rpc::RequestWorkerLeaseRequest request;
request.mutable_resource_spec()->CopyFrom(task_spec_builder.GetMessage());
auto channel =
grpc::CreateChannel("localhost:" + std::to_string(node_manager_->GetServerPort()),
grpc::InsecureChannelCredentials());
auto stub = rpc::NodeManagerService::NewStub(channel);
auto status = stub->RequestWorkerLease(&context, request, &reply);
EXPECT_TRUE(status.ok());

// After RequestWorkerLease, a leased worker is ready in the NodeManager.
// Then use publish_node_change_callback to say owner_node_id is dead.
// The leased worker should not be killed by this because it is a detached actor.
GcsNodeInfo node_info;
node_info.set_state(GcsNodeInfo::DEAD);
publish_node_change_callback(owner_node_id, std::move(node_info));
}};

// Prepare a mock worker with a real process so that we can check if the process is
// alive later.
const auto worker = std::make_shared<MockWorker>(WorkerID::FromRandom(), 10);
auto [proc, spawn_error] =
Process::Spawn(std::vector<std::string>{"sleep", "1000"}, true);
EXPECT_FALSE(spawn_error);
worker->SetProcess(proc);
// Complete the RequestWorkerLease rpc with the mock worker.
pop_worker_callback_promise.get_future().wait();
pop_worker_callback(worker, PopWorkerStatus::OK, "");

// Wait for the client thead to complete. This waits for the RequestWorkerLease call
// and publish_worker_failure_callback to finish.
grpc_client_thread.join();
// Wait for more than kill_worker_timeout_milliseconds.
std::this_thread::sleep_for(std::chrono::seconds(1));
// The process should still be alive because it should not be killed by
// publish_worker_failure_callback.
EXPECT_TRUE(proc.IsAlive());
// clean up.
proc.Kill();
io_service_.stop();
io_thread.join();
}

} // namespace ray::raylet

int main(int argc, char **argv) {
Expand Down
28 changes: 17 additions & 11 deletions src/ray/raylet/test/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <memory>
#include <string>
#include <utility>
#include <vector>

#include "ray/raylet/worker.h"
Expand All @@ -29,9 +30,9 @@ class MockWorker : public WorkerInterface {
MockWorker(WorkerID worker_id, int port, int runtime_env_hash = 0)
: worker_id_(worker_id),
port_(port),
is_detached_actor_(false),
runtime_env_hash_(runtime_env_hash),
job_id_(JobID::FromInt(859)) {}
job_id_(JobID::FromInt(859)),
proc_(Process::CreateNewDummy()) {}

WorkerID WorkerId() const override { return worker_id_; }

Expand All @@ -47,6 +48,11 @@ class MockWorker : public WorkerInterface {
task_ = assigned_task;
task_assign_time_ = absl::Now();
root_detached_actor_id_ = assigned_task.GetTaskSpecification().RootDetachedActorId();
const auto &task_spec = assigned_task.GetTaskSpecification();
SetJobId(task_spec.JobId());
SetBundleId(task_spec.PlacementGroupBundleId());
SetOwnerAddress(task_spec.CallerAddress());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make the MockWorker::SetAssignedTask align with the real one.

AssignTaskId(task_spec.TaskId());
};

absl::Time GetAssignedTaskTime() const override { return task_assign_time_; };
Expand Down Expand Up @@ -85,9 +91,9 @@ class MockWorker : public WorkerInterface {
void MarkUnblocked() override { blocked_ = false; }
bool IsBlocked() const override { return blocked_; }

Process GetProcess() const override { return Process::CreateNewDummy(); }
Process GetProcess() const override { return proc_; }
StartupToken GetStartupToken() const override { return 0; }
void SetProcess(Process proc) override { RAY_CHECK(false) << "Method unused"; }
void SetProcess(Process proc) override { proc_ = std::move(proc); }
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This allows us to inject a Process into a MockWorker.


Language GetLanguage() const override {
RAY_CHECK(false) << "Method unused";
Expand Down Expand Up @@ -119,16 +125,16 @@ class MockWorker : public WorkerInterface {
RAY_CHECK(false) << "Method unused";
return "";
}
void MarkDetachedActor() override { is_detached_actor_ = true; }
bool IsDetachedActor() const override { return is_detached_actor_; }

bool IsDetachedActor() const override {
return task_.GetTaskSpecification().IsDetachedActor();
}

const std::shared_ptr<ClientConnection> Connection() const override {
RAY_CHECK(false) << "Method unused";
return nullptr;
}
const rpc::Address &GetOwnerAddress() const override {
RAY_CHECK(false) << "Method unused";
return address_;
}
const rpc::Address &GetOwnerAddress() const override { return address_; }

void ActorCallArgWaitComplete(int64_t tag) override {
RAY_CHECK(false) << "Method unused";
Expand Down Expand Up @@ -184,7 +190,6 @@ class MockWorker : public WorkerInterface {
std::vector<double> borrowed_cpu_instances_;
std::optional<bool> is_gpu_;
std::optional<bool> is_actor_worker_;
bool is_detached_actor_;
BundleID bundle_id_;
bool blocked_ = false;
RayTask task_;
Expand All @@ -193,6 +198,7 @@ class MockWorker : public WorkerInterface {
TaskID task_id_;
JobID job_id_;
ActorID root_detached_actor_id_;
Process proc_;
};

} // namespace raylet
Expand Down
9 changes: 4 additions & 5 deletions src/ray/raylet/worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ Worker::Worker(const JobID &job_id,
bundle_id_(std::make_pair(PlacementGroupID::Nil(), -1)),
dead_(false),
blocked_(false),
client_call_manager_(client_call_manager),
is_detached_actor_(false) {}
client_call_manager_(client_call_manager) {}

rpc::WorkerType Worker::GetWorkerType() const { return worker_type_; }

Expand Down Expand Up @@ -169,9 +168,9 @@ const std::string Worker::GetTaskOrActorIdAsDebugString() const {
return id_ss.str();
}

void Worker::MarkDetachedActor() { is_detached_actor_ = true; }

bool Worker::IsDetachedActor() const { return is_detached_actor_; }
bool Worker::IsDetachedActor() const {
return assigned_task_.GetTaskSpecification().IsDetachedActor();
}

const std::shared_ptr<ClientConnection> Worker::Connection() const { return connection_; }

Expand Down
Loading