Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions python/ray/tests/test_raylet_fault_tolerance.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import os
import sys

import pytest

import ray
from ray._common.test_utils import wait_for_condition
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy

import psutil


@pytest.mark.parametrize("deterministic_failure", ["request", "response"])
def test_request_worker_lease_idempotent(
Expand Down Expand Up @@ -44,5 +48,44 @@ def simple_task_2():
assert ray.get([result_ref1, result_ref2]) == [0, 1]


def test_kill_local_actor_rpc_retry_and_idempotency(monkeypatch, shutdown_only):
"""Test that KillLocalActor RPC retries work correctly and guarantee actor death.
Not testing response since the actor is killed either way.
"""

monkeypatch.setenv(
"RAY_testing_rpc_failure",
"NodeManagerService.grpc_client.KillLocalActor=1:100:0",
)

ray.init()

@ray.remote
class SimpleActor:
def ping(self):
return "pong"

def get_pid(self):
return os.getpid()

actor = SimpleActor.remote()

result = ray.get(actor.ping.remote())
assert result == "pong"

worker_pid = ray.get(actor.get_pid.remote())

# NOTE: checking the process is still alive rather than checking the actor state from the GCS
# since as long as KillActor is sent the GCS will mark the actor as dead even though it may not actually be
assert psutil.pid_exists(worker_pid)

ray.kill(actor)

def verify_process_killed():
return not psutil.pid_exists(worker_pid)

wait_for_condition(verify_process_killed, timeout=30)


if __name__ == "__main__":
sys.exit(pytest.main(["-sv", __file__]))
2 changes: 2 additions & 0 deletions src/mock/ray/gcs/gcs_actor_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class MockGcsActorManager : public GcsActorManager {
public:
MockGcsActorManager(RuntimeEnvManager &runtime_env_manager,
GCSFunctionManager &function_manager,
rpc::RayletClientPool &raylet_client_pool,
rpc::CoreWorkerClientPool &worker_client_pool)
: GcsActorManager(
/*scheduler=*/
Expand All @@ -36,6 +37,7 @@ class MockGcsActorManager : public GcsActorManager {
runtime_env_manager,
function_manager,
[](const ActorID &) {},
raylet_client_pool,
worker_client_pool,
/*ray_event_recorder=*/fake_ray_event_recorder_,
/*session_name=*/"") {}
Expand Down
5 changes: 5 additions & 0 deletions src/mock/ray/raylet_client/raylet_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,11 @@ class MockRayletClientInterface : public RayletClientInterface {
(const rpc::GetNodeStatsRequest &request,
const rpc::ClientCallback<rpc::GetNodeStatsReply> &callback),
(override));
MOCK_METHOD(void,
KillLocalActor,
(const rpc::KillLocalActorRequest &request,
const rpc::ClientCallback<rpc::KillLocalActorReply> &callback),
(override));
MOCK_METHOD(void,
GlobalGC,
(const rpc::ClientCallback<rpc::GlobalGCReply> &callback),
Expand Down
5 changes: 4 additions & 1 deletion src/ray/core_worker_rpc_client/fake_core_worker_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ class FakeCoreWorkerClient : public CoreWorkerClientInterface {
const ClientCallback<ReportGeneratorItemReturnsReply> &callback) override {}

void KillActor(const KillActorRequest &request,
const ClientCallback<KillActorReply> &callback) override {}
const ClientCallback<KillActorReply> &callback) override {
num_kill_actor_requests++;
}

void CancelTask(const CancelTaskRequest &request,
const ClientCallback<CancelTaskReply> &callback) override {}
Expand Down Expand Up @@ -159,6 +161,7 @@ class FakeCoreWorkerClient : public CoreWorkerClientInterface {
}

std::list<ClientCallback<PushTaskReply>> callbacks_ ABSL_GUARDED_BY(mutex_);
size_t num_kill_actor_requests = 0;
absl::Mutex mutex_;
};

Expand Down
8 changes: 8 additions & 0 deletions src/ray/gcs/gcs_actor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ const rpc::Address &GcsActor::GetOwnerAddress() const {
return actor_table_data_.owner_address();
}

const rpc::Address &GcsActor::GetLocalRayletAddress() const {
return local_raylet_address_;
}

void GcsActor::UpdateLocalRayletAddress(const rpc::Address &address) {
local_raylet_address_.CopyFrom(address);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit but usually cpp getters and setters look like this

Suggested change
const rpc::Address &GcsActor::GetLocalRayletAddress() const {
return local_raylet_address_;
}
void GcsActor::UpdateLocalRayletAddress(const rpc::Address &address) {
local_raylet_address_.CopyFrom(address);
}
const rpc::Address &GcsActor::LocalRayletAddress() const {
return local_raylet_address_;
}
rpc::Address &GcsActor::LocalRayletAddress() {
return local_raylet_address_;
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the getter, but for the setter should I keep it as is?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that looks like a typo

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh not a typo, the "setter" usually just returns a non-const lvalue ref so you can more efficiently make changes however you want to the data member.

An UpdateMember func is bound to changing by replacing rather than modifying in place + extra move constructs or separate rvalue/lvalue overloads, etc. if you want to give flexibility on replacing

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh wait but at this point why isn't the member public ☠️ it doesn't matter lol

void GcsActor::UpdateState(rpc::ActorTableData::ActorState state) {
actor_table_data_.set_state(state);
RefreshMetrics();
Expand Down
7 changes: 6 additions & 1 deletion src/ray/gcs/gcs_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,10 @@ class GcsActor {
NodeID GetOwnerNodeID() const;
/// Get the address of the actor's owner.
const rpc::Address &GetOwnerAddress() const;

/// Get the address of the local raylet for this actor
const rpc::Address &GetLocalRayletAddress() const;
/// Update the address of the local raylet for this actor
void UpdateLocalRayletAddress(const rpc::Address &address);
/// Update the `Address` of this actor (see gcs.proto).
void UpdateAddress(const rpc::Address &address);
/// Get the `Address` of this actor.
Expand Down Expand Up @@ -307,6 +310,8 @@ class GcsActor {
/// Event recorder and session name for Ray events
observability::RayEventRecorderInterface &ray_event_recorder_;
std::string session_name_;
/// Address of the local raylet of the worker where this actor is running
rpc::Address local_raylet_address_;
};

using RestartActorForLineageReconstructionCallback =
Expand Down
18 changes: 11 additions & 7 deletions src/ray/gcs/gcs_actor_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -227,13 +227,15 @@ GcsActorManager::GcsActorManager(
RuntimeEnvManager &runtime_env_manager,
GCSFunctionManager &function_manager,
std::function<void(const ActorID &)> destroy_owned_placement_group_if_needed,
rpc::RayletClientPool &raylet_client_pool,
rpc::CoreWorkerClientPool &worker_client_pool,
observability::RayEventRecorderInterface &ray_event_recorder,
const std::string &session_name)
: gcs_actor_scheduler_(std::move(scheduler)),
gcs_table_storage_(gcs_table_storage),
io_context_(io_context),
gcs_publisher_(gcs_publisher),
raylet_client_pool_(raylet_client_pool),
worker_client_pool_(worker_client_pool),
ray_event_recorder_(ray_event_recorder),
session_name_(session_name),
Expand Down Expand Up @@ -1742,21 +1744,23 @@ void GcsActorManager::RemoveActorFromOwner(const std::shared_ptr<GcsActor> &acto
void GcsActorManager::NotifyCoreWorkerToKillActor(const std::shared_ptr<GcsActor> &actor,
const rpc::ActorDeathCause &death_cause,
bool force_kill) {
rpc::KillActorRequest request;
rpc::KillLocalActorRequest request;
request.set_intended_actor_id(actor->GetActorID().Binary());
request.set_worker_id(actor->GetWorkerID().Binary());
request.mutable_death_cause()->CopyFrom(death_cause);
request.set_force_kill(force_kill);
auto actor_client = worker_client_pool_.GetOrConnect(actor->GetAddress());
auto actor_raylet_client =
raylet_client_pool_.GetOrConnectByAddress(actor->GetLocalRayletAddress());
RAY_LOG(DEBUG)
.WithField(actor->GetActorID())
.WithField(actor->GetWorkerID())
.WithField(actor->GetNodeID())
<< "Send request to kill actor to worker at node";
actor_client->KillActor(request,
[actor_id = actor->GetActorID()](auto &status, auto &&) {
RAY_LOG(DEBUG) << "Killing status: " << status.ToString()
<< ", actor_id: " << actor_id;
});
actor_raylet_client->KillLocalActor(
request, [actor_id = actor->GetActorID()](auto &status, auto &&) {
RAY_LOG(DEBUG) << "Killing status: " << status.ToString()
<< ", actor_id: " << actor_id;
});
}

void GcsActorManager::KillActor(const ActorID &actor_id, bool force_kill) {
Expand Down
3 changes: 3 additions & 0 deletions src/ray/gcs/gcs_actor_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ class GcsActorManager : public rpc::ActorInfoGcsServiceHandler {
RuntimeEnvManager &runtime_env_manager,
GCSFunctionManager &function_manager,
std::function<void(const ActorID &)> destroy_owned_placement_group_if_needed,
rpc::RayletClientPool &raylet_client_pool,
rpc::CoreWorkerClientPool &worker_client_pool,
observability::RayEventRecorderInterface &ray_event_recorder,
const std::string &session_name);
Expand Down Expand Up @@ -477,6 +478,8 @@ class GcsActorManager : public rpc::ActorInfoGcsServiceHandler {
instrumented_io_context &io_context_;
/// A publisher for publishing gcs messages.
pubsub::GcsPublisher *gcs_publisher_;
/// This is used to communicate with raylets where actors are located.
rpc::RayletClientPool &raylet_client_pool_;
/// This is used to communicate with actors and their owners.
rpc::CoreWorkerClientPool &worker_client_pool_;
/// Event recorder for emitting actor events
Expand Down
11 changes: 9 additions & 2 deletions src/ray/gcs/gcs_actor_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,9 @@ void GcsActorScheduler::DoRetryLeasingWorkerFromNode(
}

void GcsActorScheduler::HandleWorkerLeaseGrantedReply(
std::shared_ptr<GcsActor> actor, const ray::rpc::RequestWorkerLeaseReply &reply) {
std::shared_ptr<GcsActor> actor,
const ray::rpc::RequestWorkerLeaseReply &reply,
std::shared_ptr<const rpc::GcsNodeInfo> node) {
const auto &retry_at_raylet_address = reply.retry_at_raylet_address();
const auto &worker_address = reply.worker_address();
if (worker_address.node_id().empty()) {
Expand Down Expand Up @@ -390,6 +392,11 @@ void GcsActorScheduler::HandleWorkerLeaseGrantedReply(
RAY_CHECK(node_to_workers_when_creating_[node_id]
.emplace(leased_worker->GetWorkerID(), leased_worker)
.second);
rpc::Address actor_local_raylet_address;
actor_local_raylet_address.set_node_id(node->node_id());
actor_local_raylet_address.set_ip_address(node->node_manager_address());
actor_local_raylet_address.set_port(node->node_manager_port());
actor->UpdateLocalRayletAddress(actor_local_raylet_address);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on actor death in between restarting, there's probably a point where the actor doesn't have a local raylet.
The actor also doesn't have a local raylet at registration time + before creation completion

local_raylet_address should probably be an optional and we shouldn't make the rpc if it's nullopt

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ahhh good catch thanks, yup I modified it so that we don't make the rpc if its nullopt.

actor->UpdateAddress(leased_worker->GetAddress());
actor->GetMutableActorTableData()->set_pid(reply.worker_pid());
actor->GetMutableTaskSpec()->set_lease_grant_timestamp_ms(current_sys_time_ms());
Expand Down Expand Up @@ -621,7 +628,7 @@ void GcsActorScheduler::HandleWorkerLeaseReply(
RAY_LOG(INFO) << "Finished leasing worker from " << node_id << " for actor "
<< actor->GetActorID()
<< ", job id = " << actor->GetActorID().JobId();
HandleWorkerLeaseGrantedReply(actor, reply);
HandleWorkerLeaseGrantedReply(actor, reply, node);
}
} else {
RetryLeasingWorkerFromNode(actor, node);
Expand Down
4 changes: 3 additions & 1 deletion src/ray/gcs/gcs_actor_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -289,8 +289,10 @@ class GcsActorScheduler : public GcsActorSchedulerInterface {
///
/// \param actor Contains the resources needed to lease workers from the specified node.
/// \param reply The reply of `RequestWorkerLeaseRequest`.
/// \param node The node that the worker will be leased from.
void HandleWorkerLeaseGrantedReply(std::shared_ptr<GcsActor> actor,
const rpc::RequestWorkerLeaseReply &reply);
const rpc::RequestWorkerLeaseReply &reply,
std::shared_ptr<const rpc::GcsNodeInfo> node);

/// A rejected rely means resources were preempted by normal tasks. Then
/// update the cluster resource view and reschedule immediately.
Expand Down
1 change: 1 addition & 0 deletions src/ray/gcs/gcs_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,7 @@ void GcsServer::InitGcsActorManager(const GcsInitData &gcs_init_data) {
[this](const ActorID &actor_id) {
gcs_placement_group_manager_->CleanPlacementGroupIfNeededWhenActorDead(actor_id);
},
raylet_client_pool_,
worker_client_pool_,
*ray_event_recorder_,
config_.session_name);
Expand Down
2 changes: 2 additions & 0 deletions src/ray/gcs/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ ray_cc_test(
"//src/ray/gcs:gcs_function_manager",
"//src/ray/gcs/store_client:in_memory_store_client",
"//src/ray/pubsub:publisher",
"//src/ray/raylet_rpc_client:fake_raylet_client",
"@com_google_googletest//:gtest_main",
],
)
Expand Down Expand Up @@ -430,6 +431,7 @@ ray_cc_test(
"//src/ray/gcs:gcs_function_manager",
"//src/ray/gcs/store_client:in_memory_store_client",
"//src/ray/pubsub:publisher",
"//src/ray/raylet_rpc_client:fake_raylet_client",
"//src/ray/util:event",
"@com_google_googletest//:gtest_main",
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "ray/gcs/store_client/in_memory_store_client.h"
#include "ray/observability/fake_ray_event_recorder.h"
#include "ray/pubsub/publisher.h"
#include "ray/raylet_rpc_client/fake_raylet_client.h"
#include "ray/util/event.h"

namespace ray {
Expand Down Expand Up @@ -163,6 +164,10 @@ class GcsActorManagerTest : public ::testing::Test {
function_manager_ = std::make_unique<gcs::GCSFunctionManager>(*kv_, io_service_);
auto actor_scheduler = std::make_unique<MockActorScheduler>();
mock_actor_scheduler_ = actor_scheduler.get();
raylet_client_pool_ =
std::make_unique<rpc::RayletClientPool>([](const rpc::Address &address) {
return std::make_shared<rpc::FakeRayletClient>();
});
worker_client_pool_ = std::make_unique<rpc::CoreWorkerClientPool>(
[this](const rpc::Address &address) { return worker_client_; });
gcs_actor_manager_ = std::make_unique<gcs::GcsActorManager>(
Expand All @@ -173,6 +178,7 @@ class GcsActorManagerTest : public ::testing::Test {
*runtime_env_mgr_,
*function_manager_,
[](const ActorID &actor_id) {},
*raylet_client_pool_,
*worker_client_pool_,
/*ray_event_recorder=*/fake_ray_event_recorder_,
/*session_name=*/"");
Expand Down Expand Up @@ -262,6 +268,7 @@ class GcsActorManagerTest : public ::testing::Test {
// Actor scheduler's ownership lies in actor manager.
MockActorScheduler *mock_actor_scheduler_ = nullptr;
std::shared_ptr<MockWorkerClient> worker_client_;
std::unique_ptr<rpc::RayletClientPool> raylet_client_pool_;
std::unique_ptr<rpc::CoreWorkerClientPool> worker_client_pool_;
absl::flat_hash_map<JobID, std::string> job_namespace_table_;
std::unique_ptr<gcs::GcsActorManager> gcs_actor_manager_;
Expand Down
7 changes: 7 additions & 0 deletions src/ray/gcs/tests/gcs_actor_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "ray/gcs/gcs_function_manager.h"
#include "ray/gcs/store_client/in_memory_store_client.h"
#include "ray/pubsub/publisher.h"
#include "ray/raylet_rpc_client/fake_raylet_client.h"

namespace ray {
namespace gcs {
Expand Down Expand Up @@ -139,6 +140,10 @@ class GcsActorManagerTest : public ::testing::Test {
function_manager_ = std::make_unique<gcs::GCSFunctionManager>(*kv_, io_service_);
auto scheduler = std::make_unique<MockActorScheduler>();
mock_actor_scheduler_ = scheduler.get();
raylet_client_pool_ =
std::make_unique<rpc::RayletClientPool>([](const rpc::Address &address) {
return std::make_shared<rpc::FakeRayletClient>();
});
worker_client_pool_ = std::make_unique<rpc::CoreWorkerClientPool>(
[this](const rpc::Address &address) { return worker_client_; });
fake_ray_event_recorder_ = std::make_unique<observability::FakeRayEventRecorder>();
Expand All @@ -150,6 +155,7 @@ class GcsActorManagerTest : public ::testing::Test {
*runtime_env_mgr_,
*function_manager_,
[](const ActorID &actor_id) {},
*raylet_client_pool_,
*worker_client_pool_,
*fake_ray_event_recorder_,
"test_session_name");
Expand Down Expand Up @@ -225,6 +231,7 @@ class GcsActorManagerTest : public ::testing::Test {
// Actor scheduler's ownership lies in actor manager.
MockActorScheduler *mock_actor_scheduler_ = nullptr;
std::shared_ptr<MockWorkerClient> worker_client_;
std::unique_ptr<rpc::RayletClientPool> raylet_client_pool_;
std::unique_ptr<rpc::CoreWorkerClientPool> worker_client_pool_;
absl::flat_hash_map<JobID, std::string> job_namespace_table_;
std::unique_ptr<gcs::GcsActorManager> gcs_actor_manager_;
Expand Down
11 changes: 9 additions & 2 deletions src/ray/gcs/tests/gcs_autoscaler_state_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class GcsAutoscalerStateManagerTest : public ::testing::Test {
std::unique_ptr<GCSFunctionManager> function_manager_;
std::unique_ptr<RuntimeEnvManager> runtime_env_manager_;
std::unique_ptr<GcsInternalKVManager> kv_manager_;
std::unique_ptr<rpc::RayletClientPool> raylet_client_pool_;
std::unique_ptr<rpc::CoreWorkerClientPool> worker_client_pool_;

void SetUp() override {
Expand All @@ -83,12 +84,18 @@ class GcsAutoscalerStateManagerTest : public ::testing::Test {
std::make_unique<GCSFunctionManager>(kv_manager_->GetInstance(), io_service_);
runtime_env_manager_ = std::make_unique<RuntimeEnvManager>(
[](const std::string &, std::function<void(bool)>) {});
raylet_client_pool_ =
std::make_unique<rpc::RayletClientPool>([](const rpc::Address &address) {
return std::make_shared<rpc::FakeRayletClient>();
});
worker_client_pool_ =
std::make_unique<rpc::CoreWorkerClientPool>([](const rpc::Address &) {
return std::make_shared<rpc::MockCoreWorkerClientInterface>();
});
gcs_actor_manager_ = std::make_unique<MockGcsActorManager>(
*runtime_env_manager_, *function_manager_, *worker_client_pool_);
gcs_actor_manager_ = std::make_unique<MockGcsActorManager>(*runtime_env_manager_,
*function_manager_,
*raylet_client_pool_,
*worker_client_pool_);
gcs_resource_manager_ =
std::make_shared<GcsResourceManager>(io_service_,
*cluster_resource_manager_,
Expand Down
6 changes: 3 additions & 3 deletions src/ray/protobuf/core_worker.proto
Original file line number Diff line number Diff line change
Expand Up @@ -522,9 +522,9 @@ service CoreWorkerService {
rpc GetObjectLocationsOwner(GetObjectLocationsOwnerRequest)
returns (GetObjectLocationsOwnerReply);

// Request from the GCS actor manager or actor scheduler that the worker shut down
// without completing outstanding work.
// Failure: TODO: Never retries
// Request from the Raylet that the actor shut down without completing outstanding work.
// Failure: Idempotent, does not retry. However requests should only be sent via
// KillLocalActor from the raylet which does implement retries
rpc KillActor(KillActorRequest) returns (KillActorReply);

// Request from owner worker to executor worker to cancel a task.
Expand Down
2 changes: 1 addition & 1 deletion src/ray/protobuf/gcs_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ message GetAllActorInfoReply {

// `KillActorViaGcsRequest` is sent to GCS Service to ask to kill an actor.
// `KillActorViaGcsRequest` is different from `KillActorRequest`.
// `KillActorRequest` is send to core worker to ask to kill an actor.
// `KillLocalActorRequest` is sent to raylet to ask to kill a local actor.
message KillActorViaGcsRequest {
// ID of this actor.
bytes actor_id = 1;
Expand Down
Loading