Skip to content
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions python/ray/tests/test_raylet_fault_tolerance.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
import sys

import pytest
Expand All @@ -11,6 +12,8 @@
PlacementGroupSchedulingStrategy,
)

import psutil


@pytest.mark.parametrize("deterministic_failure", ["request", "response"])
def test_request_worker_lease_idempotent(
Expand Down Expand Up @@ -138,5 +141,44 @@ def task():
assert result == "success"


def test_kill_local_actor_rpc_retry_and_idempotency(monkeypatch, shutdown_only):
"""Test that KillLocalActor RPC retries work correctly and guarantee actor death.
Not testing response since the actor is killed either way.
"""

monkeypatch.setenv(
"RAY_testing_rpc_failure",
"NodeManagerService.grpc_client.KillLocalActor=1:100:0",
)

ray.init()

@ray.remote
class SimpleActor:
def ping(self):
return "pong"

def get_pid(self):
return os.getpid()

actor = SimpleActor.remote()

result = ray.get(actor.ping.remote())
assert result == "pong"

worker_pid = ray.get(actor.get_pid.remote())

# NOTE: checking the process is still alive rather than checking the actor state from the GCS
# since as long as KillActor is sent the GCS will mark the actor as dead even though it may not actually be
assert psutil.pid_exists(worker_pid)

ray.kill(actor)

def verify_process_killed():
return not psutil.pid_exists(worker_pid)

wait_for_condition(verify_process_killed, timeout=30)


if __name__ == "__main__":
sys.exit(pytest.main(["-sv", __file__]))
2 changes: 2 additions & 0 deletions src/mock/ray/gcs/gcs_actor_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class MockGcsActorManager : public GcsActorManager {
public:
MockGcsActorManager(RuntimeEnvManager &runtime_env_manager,
GCSFunctionManager &function_manager,
rpc::RayletClientPool &raylet_client_pool,
rpc::CoreWorkerClientPool &worker_client_pool)
: GcsActorManager(
/*scheduler=*/
Expand All @@ -37,6 +38,7 @@ class MockGcsActorManager : public GcsActorManager {
runtime_env_manager,
function_manager,
[](const ActorID &) {},
raylet_client_pool,
worker_client_pool,
/*ray_event_recorder=*/fake_ray_event_recorder_,
/*session_name=*/"",
Expand Down
5 changes: 5 additions & 0 deletions src/mock/ray/raylet_client/raylet_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,11 @@ class MockRayletClientInterface : public RayletClientInterface {
(const rpc::GetNodeStatsRequest &request,
const rpc::ClientCallback<rpc::GetNodeStatsReply> &callback),
(override));
MOCK_METHOD(void,
KillLocalActor,
(const rpc::KillLocalActorRequest &request,
const rpc::ClientCallback<rpc::KillLocalActorReply> &callback),
(override));
MOCK_METHOD(void,
GlobalGC,
(const rpc::ClientCallback<rpc::GlobalGCReply> &callback),
Expand Down
5 changes: 4 additions & 1 deletion src/ray/core_worker_rpc_client/fake_core_worker_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ class FakeCoreWorkerClient : public CoreWorkerClientInterface {
const ClientCallback<ReportGeneratorItemReturnsReply> &callback) override {}

void KillActor(const KillActorRequest &request,
const ClientCallback<KillActorReply> &callback) override {}
const ClientCallback<KillActorReply> &callback) override {
num_kill_actor_requests++;
}

void CancelTask(const CancelTaskRequest &request,
const ClientCallback<CancelTaskReply> &callback) override {}
Expand Down Expand Up @@ -159,6 +161,7 @@ class FakeCoreWorkerClient : public CoreWorkerClientInterface {
}

std::list<ClientCallback<PushTaskReply>> callbacks_ ABSL_GUARDED_BY(mutex_);
size_t num_kill_actor_requests = 0;
absl::Mutex mutex_;
};

Expand Down
8 changes: 8 additions & 0 deletions src/ray/gcs/gcs_actor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ const rpc::Address &GcsActor::GetOwnerAddress() const {
return actor_table_data_.owner_address();
}

const std::optional<rpc::Address> &GcsActor::LocalRayletAddress() const {
return local_raylet_address_;
}

void GcsActor::UpdateLocalRayletAddress(const rpc::Address &address) {
local_raylet_address_ = address;
}

void GcsActor::UpdateState(rpc::ActorTableData::ActorState state) {
actor_table_data_.set_state(state);
RefreshMetrics();
Expand Down
7 changes: 6 additions & 1 deletion src/ray/gcs/gcs_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,10 @@ class GcsActor {
NodeID GetOwnerNodeID() const;
/// Get the address of the actor's owner.
const rpc::Address &GetOwnerAddress() const;

/// Get the address of the local raylet for this actor
const std::optional<rpc::Address> &LocalRayletAddress() const;
/// Update the address of the local raylet for this actor
void UpdateLocalRayletAddress(const rpc::Address &address);
/// Update the `Address` of this actor (see gcs.proto).
void UpdateAddress(const rpc::Address &address);
/// Get the `Address` of this actor.
Expand Down Expand Up @@ -307,6 +310,8 @@ class GcsActor {
/// Event recorder and session name for Ray events
observability::RayEventRecorderInterface &ray_event_recorder_;
std::string session_name_;
/// Address of the local raylet of the worker where this actor is running
std::optional<rpc::Address> local_raylet_address_;
};

using RestartActorForLineageReconstructionCallback =
Expand Down
38 changes: 24 additions & 14 deletions src/ray/gcs/gcs_actor_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ GcsActorManager::GcsActorManager(
RuntimeEnvManager &runtime_env_manager,
GCSFunctionManager &function_manager,
std::function<void(const ActorID &)> destroy_owned_placement_group_if_needed,
rpc::RayletClientPool &raylet_client_pool,
rpc::CoreWorkerClientPool &worker_client_pool,
observability::RayEventRecorderInterface &ray_event_recorder,
const std::string &session_name,
Expand All @@ -236,6 +237,7 @@ GcsActorManager::GcsActorManager(
gcs_table_storage_(gcs_table_storage),
io_context_(io_context),
gcs_publisher_(gcs_publisher),
raylet_client_pool_(raylet_client_pool),
worker_client_pool_(worker_client_pool),
ray_event_recorder_(ray_event_recorder),
session_name_(session_name),
Expand Down Expand Up @@ -1001,7 +1003,7 @@ void GcsActorManager::DestroyActor(const ActorID &actor_id,
if (node_it != created_actors_.end() && node_it->second.count(worker_id)) {
// The actor has already been created. Destroy the process by force-killing
// it.
NotifyCoreWorkerToKillActor(actor, death_cause, force_kill);
NotifyRayletToKillActor(actor, death_cause, force_kill);
RAY_CHECK(node_it->second.erase(actor->GetWorkerID()));
if (node_it->second.empty()) {
created_actors_.erase(node_it);
Expand All @@ -1010,7 +1012,7 @@ void GcsActorManager::DestroyActor(const ActorID &actor_id,
if (!worker_id.IsNil()) {
// The actor is in phase of creating, so we need to notify the core
// worker exit to avoid process and resource leak.
NotifyCoreWorkerToKillActor(actor, death_cause, force_kill);
NotifyRayletToKillActor(actor, death_cause, force_kill);
}
CancelActorInScheduling(actor);
}
Expand Down Expand Up @@ -1744,24 +1746,32 @@ void GcsActorManager::RemoveActorFromOwner(const std::shared_ptr<GcsActor> &acto
}
}

void GcsActorManager::NotifyCoreWorkerToKillActor(const std::shared_ptr<GcsActor> &actor,
const rpc::ActorDeathCause &death_cause,
bool force_kill) {
rpc::KillActorRequest request;
void GcsActorManager::NotifyRayletToKillActor(const std::shared_ptr<GcsActor> &actor,
const rpc::ActorDeathCause &death_cause,
bool force_kill) {
rpc::KillLocalActorRequest request;
request.set_intended_actor_id(actor->GetActorID().Binary());
request.set_worker_id(actor->GetWorkerID().Binary());
request.mutable_death_cause()->CopyFrom(death_cause);
request.set_force_kill(force_kill);
auto actor_client = worker_client_pool_.GetOrConnect(actor->GetAddress());
if (!actor->LocalRayletAddress()) {
RAY_LOG(DEBUG) << "Actor " << actor->GetActorID() << " has not been assigned a lease";
return;
}
auto actor_raylet_client =
raylet_client_pool_.GetOrConnectByAddress(actor->LocalRayletAddress().value());
RAY_LOG(DEBUG)
.WithField(actor->GetActorID())
.WithField(actor->GetWorkerID())
.WithField(actor->GetNodeID())
<< "Send request to kill actor to worker at node";
actor_client->KillActor(request,
[actor_id = actor->GetActorID()](auto &status, auto &&) {
RAY_LOG(DEBUG) << "Killing status: " << status.ToString()
<< ", actor_id: " << actor_id;
});
actor_raylet_client->KillLocalActor(
request,
[actor_id = actor->GetActorID()](const ray::Status &status,
rpc::KillLocalActorReply &&reply) {
RAY_LOG(DEBUG) << "Killing actor " << actor_id
<< " with return status: " << status.ToString();
});
}

void GcsActorManager::KillActor(const ActorID &actor_id, bool force_kill) {
Expand All @@ -1786,7 +1796,7 @@ void GcsActorManager::KillActor(const ActorID &actor_id, bool force_kill) {
if (node_it != created_actors_.end() && node_it->second.count(worker_id)) {
// The actor has already been created. Destroy the process by force-killing
// it.
NotifyCoreWorkerToKillActor(
NotifyRayletToKillActor(
actor, GenKilledByApplicationCause(GetActor(actor_id)), force_kill);
} else {
const auto &lease_id = actor->GetLeaseSpecification().LeaseId();
Expand All @@ -1795,7 +1805,7 @@ void GcsActorManager::KillActor(const ActorID &actor_id, bool force_kill) {
if (!worker_id.IsNil()) {
// The actor is in phase of creating, so we need to notify the core
// worker exit to avoid process and resource leak.
NotifyCoreWorkerToKillActor(
NotifyRayletToKillActor(
actor, GenKilledByApplicationCause(GetActor(actor_id)), force_kill);
}
CancelActorInScheduling(actor);
Expand Down
11 changes: 7 additions & 4 deletions src/ray/gcs/gcs_actor_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ class GcsActorManager : public rpc::ActorInfoGcsServiceHandler {
RuntimeEnvManager &runtime_env_manager,
GCSFunctionManager &function_manager,
std::function<void(const ActorID &)> destroy_owned_placement_group_if_needed,
rpc::RayletClientPool &raylet_client_pool,
rpc::CoreWorkerClientPool &worker_client_pool,
observability::RayEventRecorderInterface &ray_event_recorder,
const std::string &session_name,
Expand Down Expand Up @@ -353,14 +354,14 @@ class GcsActorManager : public rpc::ActorInfoGcsServiceHandler {
/// \param force_kill Whether to force kill an actor by killing the worker.
void KillActor(const ActorID &actor_id, bool force_kill);

/// Notify CoreWorker to kill the specified actor.
/// Notify Raylet to kill the specified actor.
///
/// \param actor The actor to be killed.
/// \param death_cause Context about why this actor is dead.
/// \param force_kill Whether to force kill an actor by killing the worker.
void NotifyCoreWorkerToKillActor(const std::shared_ptr<GcsActor> &actor,
const rpc::ActorDeathCause &death_cause,
bool force_kill = true);
void NotifyRayletToKillActor(const std::shared_ptr<GcsActor> &actor,
const rpc::ActorDeathCause &death_cause,
bool force_kill = true);

/// Add the destroyed actor to the cache. If the cache is full, one actor is randomly
/// evicted.
Expand Down Expand Up @@ -479,6 +480,8 @@ class GcsActorManager : public rpc::ActorInfoGcsServiceHandler {
instrumented_io_context &io_context_;
/// A publisher for publishing gcs messages.
pubsub::GcsPublisher *gcs_publisher_;
/// This is used to communicate with raylets where actors are located.
rpc::RayletClientPool &raylet_client_pool_;
/// This is used to communicate with actors and their owners.
rpc::CoreWorkerClientPool &worker_client_pool_;
/// Event recorder for emitting actor events
Expand Down
Loading