Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions python/ray/tests/test_raylet_fault_tolerance.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import os
import sys

import pytest

import ray
from ray._common.test_utils import wait_for_condition
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy

import psutil


@pytest.mark.parametrize("deterministic_failure", ["request", "response"])
def test_request_worker_lease_idempotent(
Expand Down Expand Up @@ -44,5 +48,44 @@ def simple_task_2():
assert ray.get([result_ref1, result_ref2]) == [0, 1]


def test_kill_local_actor_rpc_retry_and_idempotency(monkeypatch, shutdown_only):
"""Test that KillLocalActor RPC retries work correctly and guarantee actor death.
Not testing response since the actor is killed either way.
"""

monkeypatch.setenv(
"RAY_testing_rpc_failure",
"NodeManagerService.grpc_client.KillLocalActor=1:100:0",
)

ray.init()

@ray.remote
class SimpleActor:
def ping(self):
return "pong"

def get_pid(self):
return os.getpid()

actor = SimpleActor.remote()

result = ray.get(actor.ping.remote())
assert result == "pong"

worker_pid = ray.get(actor.get_pid.remote())

# NOTE: checking the process is still alive rather than checking the actor state from the GCS
# since as long as KillActor is sent the GCS will mark the actor as dead even though it may not actually be
assert psutil.pid_exists(worker_pid)

ray.kill(actor)

def verify_process_killed():
return not psutil.pid_exists(worker_pid)

wait_for_condition(verify_process_killed, timeout=30)


if __name__ == "__main__":
sys.exit(pytest.main(["-sv", __file__]))
2 changes: 2 additions & 0 deletions src/mock/ray/gcs/gcs_actor_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class MockGcsActorManager : public GcsActorManager {
public:
MockGcsActorManager(RuntimeEnvManager &runtime_env_manager,
GCSFunctionManager &function_manager,
rpc::RayletClientPool &raylet_client_pool,
rpc::CoreWorkerClientPool &worker_client_pool)
: GcsActorManager(
/*scheduler=*/
Expand All @@ -37,6 +38,7 @@ class MockGcsActorManager : public GcsActorManager {
runtime_env_manager,
function_manager,
[](const ActorID &) {},
raylet_client_pool,
worker_client_pool,
/*ray_event_recorder=*/fake_ray_event_recorder_,
/*session_name=*/"",
Expand Down
5 changes: 5 additions & 0 deletions src/mock/ray/raylet_client/raylet_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,11 @@ class MockRayletClientInterface : public RayletClientInterface {
(const rpc::GetNodeStatsRequest &request,
const rpc::ClientCallback<rpc::GetNodeStatsReply> &callback),
(override));
MOCK_METHOD(void,
KillLocalActor,
(const rpc::KillLocalActorRequest &request,
const rpc::ClientCallback<rpc::KillLocalActorReply> &callback),
(override));
MOCK_METHOD(void,
GlobalGC,
(const rpc::ClientCallback<rpc::GlobalGCReply> &callback),
Expand Down
5 changes: 4 additions & 1 deletion src/ray/core_worker_rpc_client/fake_core_worker_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ class FakeCoreWorkerClient : public CoreWorkerClientInterface {
const ClientCallback<ReportGeneratorItemReturnsReply> &callback) override {}

void KillActor(const KillActorRequest &request,
const ClientCallback<KillActorReply> &callback) override {}
const ClientCallback<KillActorReply> &callback) override {
num_kill_actor_requests++;
}

void CancelTask(const CancelTaskRequest &request,
const ClientCallback<CancelTaskReply> &callback) override {}
Expand Down Expand Up @@ -159,6 +161,7 @@ class FakeCoreWorkerClient : public CoreWorkerClientInterface {
}

std::list<ClientCallback<PushTaskReply>> callbacks_ ABSL_GUARDED_BY(mutex_);
size_t num_kill_actor_requests = 0;
absl::Mutex mutex_;
};

Expand Down
8 changes: 8 additions & 0 deletions src/ray/gcs/gcs_actor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ const rpc::Address &GcsActor::GetOwnerAddress() const {
return actor_table_data_.owner_address();
}

const std::optional<rpc::Address> &GcsActor::LocalRayletAddress() const {
return local_raylet_address_;
}

void GcsActor::UpdateLocalRayletAddress(const rpc::Address &address) {
local_raylet_address_ = address;
}

void GcsActor::UpdateState(rpc::ActorTableData::ActorState state) {
actor_table_data_.set_state(state);
RefreshMetrics();
Expand Down
7 changes: 6 additions & 1 deletion src/ray/gcs/gcs_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,10 @@ class GcsActor {
NodeID GetOwnerNodeID() const;
/// Get the address of the actor's owner.
const rpc::Address &GetOwnerAddress() const;

/// Get the address of the local raylet for this actor
const std::optional<rpc::Address> &LocalRayletAddress() const;
/// Update the address of the local raylet for this actor
void UpdateLocalRayletAddress(const rpc::Address &address);
/// Update the `Address` of this actor (see gcs.proto).
void UpdateAddress(const rpc::Address &address);
/// Get the `Address` of this actor.
Expand Down Expand Up @@ -307,6 +310,8 @@ class GcsActor {
/// Event recorder and session name for Ray events
observability::RayEventRecorderInterface &ray_event_recorder_;
std::string session_name_;
/// Address of the local raylet of the worker where this actor is running
std::optional<rpc::Address> local_raylet_address_;
};

using RestartActorForLineageReconstructionCallback =
Expand Down
24 changes: 17 additions & 7 deletions src/ray/gcs/gcs_actor_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ GcsActorManager::GcsActorManager(
RuntimeEnvManager &runtime_env_manager,
GCSFunctionManager &function_manager,
std::function<void(const ActorID &)> destroy_owned_placement_group_if_needed,
rpc::RayletClientPool &raylet_client_pool,
rpc::CoreWorkerClientPool &worker_client_pool,
observability::RayEventRecorderInterface &ray_event_recorder,
const std::string &session_name,
Expand All @@ -238,6 +239,7 @@ GcsActorManager::GcsActorManager(
gcs_table_storage_(gcs_table_storage),
io_context_(io_context),
gcs_publisher_(gcs_publisher),
raylet_client_pool_(raylet_client_pool),
worker_client_pool_(worker_client_pool),
ray_event_recorder_(ray_event_recorder),
session_name_(session_name),
Expand Down Expand Up @@ -1749,21 +1751,29 @@ void GcsActorManager::RemoveActorFromOwner(const std::shared_ptr<GcsActor> &acto
void GcsActorManager::NotifyCoreWorkerToKillActor(const std::shared_ptr<GcsActor> &actor,
const rpc::ActorDeathCause &death_cause,
bool force_kill) {
rpc::KillActorRequest request;
rpc::KillLocalActorRequest request;
request.set_intended_actor_id(actor->GetActorID().Binary());
request.set_worker_id(actor->GetWorkerID().Binary());
request.mutable_death_cause()->CopyFrom(death_cause);
request.set_force_kill(force_kill);
auto actor_client = worker_client_pool_.GetOrConnect(actor->GetAddress());
if (!actor->LocalRayletAddress()) {
RAY_LOG(DEBUG) << "Actor " << actor->GetActorID() << " has not been assigned a lease";
return;
}
auto actor_raylet_client =
raylet_client_pool_.GetOrConnectByAddress(actor->LocalRayletAddress().value());
RAY_LOG(DEBUG)
.WithField(actor->GetActorID())
.WithField(actor->GetWorkerID())
.WithField(actor->GetNodeID())
<< "Send request to kill actor to worker at node";
actor_client->KillActor(request,
[actor_id = actor->GetActorID()](auto &status, auto &&) {
RAY_LOG(DEBUG) << "Killing status: " << status.ToString()
<< ", actor_id: " << actor_id;
});
actor_raylet_client->KillLocalActor(
request,
[actor_id = actor->GetActorID()](const ray::Status &status,
rpc::KillLocalActorReply &&reply) {
RAY_LOG(DEBUG) << "Killing actor " << actor_id
<< " with return status: " << status.ToString();
});
}

void GcsActorManager::KillActor(const ActorID &actor_id, bool force_kill) {
Expand Down
3 changes: 3 additions & 0 deletions src/ray/gcs/gcs_actor_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ class GcsActorManager : public rpc::ActorInfoGcsServiceHandler {
RuntimeEnvManager &runtime_env_manager,
GCSFunctionManager &function_manager,
std::function<void(const ActorID &)> destroy_owned_placement_group_if_needed,
rpc::RayletClientPool &raylet_client_pool,
rpc::CoreWorkerClientPool &worker_client_pool,
observability::RayEventRecorderInterface &ray_event_recorder,
const std::string &session_name,
Expand Down Expand Up @@ -479,6 +480,8 @@ class GcsActorManager : public rpc::ActorInfoGcsServiceHandler {
instrumented_io_context &io_context_;
/// A publisher for publishing gcs messages.
pubsub::GcsPublisher *gcs_publisher_;
/// This is used to communicate with raylets where actors are located.
rpc::RayletClientPool &raylet_client_pool_;
/// This is used to communicate with actors and their owners.
rpc::CoreWorkerClientPool &worker_client_pool_;
/// Event recorder for emitting actor events
Expand Down
58 changes: 36 additions & 22 deletions src/ray/gcs/gcs_actor_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ void GcsActorScheduler::CancelOnLeasing(const NodeID &node_id,
address.set_node_id(node_info->node_id());
address.set_ip_address(node_info->node_manager_address());
address.set_port(node_info->node_manager_port());
auto raylet_client = GetOrConnectRayletClient(address);
auto raylet_client = raylet_client_pool_.GetOrConnectByAddress(address);
raylet_client->CancelWorkerLease(
lease_id, [](const Status &status, const rpc::CancelWorkerLeaseReply &reply) {});
}
Expand Down Expand Up @@ -270,7 +270,7 @@ void GcsActorScheduler::ReleaseUnusedActorWorkers(
address.set_node_id(alive_node.second->node_id());
address.set_ip_address(alive_node.second->node_manager_address());
address.set_port(alive_node.second->node_manager_port());
auto raylet_client = GetOrConnectRayletClient(address);
auto raylet_client = raylet_client_pool_.GetOrConnectByAddress(address);
auto release_unused_workers_callback =
[this, node_id](const Status &status,
const rpc::ReleaseUnusedActorWorkersReply &reply) {
Expand Down Expand Up @@ -309,7 +309,7 @@ void GcsActorScheduler::LeaseWorkerFromNode(
remote_address.set_node_id(node->node_id());
remote_address.set_ip_address(node->node_manager_address());
remote_address.set_port(node->node_manager_port());
auto raylet_client = GetOrConnectRayletClient(remote_address);
auto raylet_client = raylet_client_pool_.GetOrConnectByAddress(remote_address);
// Actor leases should be sent to the raylet immediately, so we should never build up a
// backlog in GCS.
// Counter for generating unique lease IDs.
Expand Down Expand Up @@ -350,7 +350,9 @@ void GcsActorScheduler::DoRetryLeasingWorkerFromNode(
}

void GcsActorScheduler::HandleWorkerLeaseGrantedReply(
std::shared_ptr<GcsActor> actor, const ray::rpc::RequestWorkerLeaseReply &reply) {
std::shared_ptr<GcsActor> actor,
const ray::rpc::RequestWorkerLeaseReply &reply,
std::shared_ptr<const rpc::GcsNodeInfo> node) {
const auto &retry_at_raylet_address = reply.retry_at_raylet_address();
const auto &worker_address = reply.worker_address();
if (worker_address.node_id().empty()) {
Expand Down Expand Up @@ -390,6 +392,11 @@ void GcsActorScheduler::HandleWorkerLeaseGrantedReply(
RAY_CHECK(node_to_workers_when_creating_[node_id]
.emplace(leased_worker->GetWorkerID(), leased_worker)
.second);
rpc::Address actor_local_raylet_address;
actor_local_raylet_address.set_node_id(node->node_id());
actor_local_raylet_address.set_ip_address(node->node_manager_address());
actor_local_raylet_address.set_port(node->node_manager_port());
actor->UpdateLocalRayletAddress(actor_local_raylet_address);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on actor death in between restarting, there's probably a point where the actor doesn't have a local raylet.
The actor also doesn't have a local raylet at registration time + before creation completion

local_raylet_address should probably be an optional and we shouldn't make the rpc if it's nullopt

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ahhh good catch thanks, yup I modified it so that we don't make the rpc if its nullopt.

actor->UpdateAddress(leased_worker->GetAddress());
actor->GetMutableActorTableData()->set_pid(reply.worker_pid());
actor->GetMutableTaskSpec()->set_lease_grant_timestamp_ms(current_sys_time_ms());
Expand Down Expand Up @@ -491,7 +498,10 @@ void GcsActorScheduler::CreateActorOnWorker(std::shared_ptr<GcsActor> actor,
<< " has been removed from creating map. Actor status "
<< actor->GetState();
auto actor_id = status.ok() ? actor->GetActorID() : ActorID::Nil();
KillActorOnWorker(worker->GetAddress(), actor_id);
if (actor->LocalRayletAddress().has_value()) {
KillActorOnWorker(
actor->LocalRayletAddress().value(), worker->GetAddress(), actor_id);
}
}
});
}
Expand Down Expand Up @@ -521,27 +531,27 @@ void GcsActorScheduler::DoRetryCreatingActorOnWorker(
}
}

std::shared_ptr<RayletClientInterface> GcsActorScheduler::GetOrConnectRayletClient(
const rpc::Address &raylet_address) {
return raylet_client_pool_.GetOrConnectByAddress(raylet_address);
}

bool GcsActorScheduler::KillActorOnWorker(const rpc::Address &worker_address,
bool GcsActorScheduler::KillActorOnWorker(const rpc::Address &raylet_address,
const rpc::Address &worker_address,
ActorID actor_id) {
if (worker_address.node_id().empty()) {
RAY_LOG(DEBUG) << "Invalid worker address, skip the killing of actor " << actor_id;
if (raylet_address.node_id().empty() || worker_address.node_id().empty()) {
RAY_LOG(DEBUG) << "Invalid raylet or worker address, skip the killing of actor "
<< actor_id;
return false;
}

auto cli = worker_client_pool_.GetOrConnect(worker_address);
rpc::KillActorRequest request;
auto raylet_client = raylet_client_pool_.GetOrConnectByAddress(raylet_address);
rpc::KillLocalActorRequest request;
// Set it to be Nil() since it hasn't been setup yet.
request.set_intended_actor_id(actor_id.Binary());
request.set_worker_id(worker_address.worker_id());
request.set_force_kill(true);
cli->KillActor(request, [actor_id](auto &status, auto &&) {
RAY_LOG(DEBUG) << "Killing actor " << actor_id
<< " with return status: " << status.ToString();
});

raylet_client->KillLocalActor(
request, [actor_id](const Status &status, const rpc::KillLocalActorReply &) {
RAY_LOG(DEBUG) << "Killing actor " << actor_id
<< " with return status: " << status.ToString();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, shouldn't we be retrying here if we get a non-OK status?

at a minimum we should have an ERROR log here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're already retrying implicitly via the retryable grpc client for status UNAVAILABLE/UNKNOWN, think for the other cases like the actor id mismatch in HandleKillActor we don't want to retry.
Added an ERROR log to report this though

});
return true;
}

Expand Down Expand Up @@ -579,7 +589,9 @@ void GcsActorScheduler::HandleWorkerLeaseReply(
// If the actor has been killed, we need to kill the worker too
// otherwise, the worker will be leaked.
RAY_LOG(DEBUG) << "Actor " << actor->GetActorID() << " is dead, kill the worker.";
KillActorOnWorker(reply.worker_address(), ActorID::Nil());
auto raylet_address = rpc::RayletClientPool::GenerateRayletAddress(
node_id, node->node_manager_address(), node->node_manager_port());
KillActorOnWorker(raylet_address, reply.worker_address(), ActorID::Nil());
}
return;
}
Expand Down Expand Up @@ -621,7 +633,7 @@ void GcsActorScheduler::HandleWorkerLeaseReply(
RAY_LOG(INFO) << "Finished leasing worker from " << node_id << " for actor "
<< actor->GetActorID()
<< ", job id = " << actor->GetActorID().JobId();
HandleWorkerLeaseGrantedReply(actor, reply);
HandleWorkerLeaseGrantedReply(actor, reply, node);
}
} else {
RetryLeasingWorkerFromNode(actor, node);
Expand All @@ -630,7 +642,9 @@ void GcsActorScheduler::HandleWorkerLeaseReply(
// If the actor has been killed, we need to kill the worker too
// otherwise, the worker will be leaked.
RAY_LOG(DEBUG) << "Actor " << actor->GetActorID() << " is dead, kill the worker.";
KillActorOnWorker(reply.worker_address(), ActorID::Nil());
auto raylet_address = rpc::RayletClientPool::GenerateRayletAddress(
node_id, node->node_manager_address(), node->node_manager_port());
KillActorOnWorker(raylet_address, reply.worker_address(), ActorID::Nil());
}
}

Expand Down
17 changes: 10 additions & 7 deletions src/ray/gcs/gcs_actor_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -289,8 +289,10 @@ class GcsActorScheduler : public GcsActorSchedulerInterface {
///
/// \param actor Contains the resources needed to lease workers from the specified node.
/// \param reply The reply of `RequestWorkerLeaseRequest`.
/// \param node The node that the worker will be leased from.
void HandleWorkerLeaseGrantedReply(std::shared_ptr<GcsActor> actor,
const rpc::RequestWorkerLeaseReply &reply);
const rpc::RequestWorkerLeaseReply &reply,
std::shared_ptr<const rpc::GcsNodeInfo> node);

/// A rejected rely means resources were preempted by normal tasks. Then
/// update the cluster resource view and reschedule immediately.
Expand Down Expand Up @@ -332,12 +334,13 @@ class GcsActorScheduler : public GcsActorSchedulerInterface {
void DoRetryCreatingActorOnWorker(std::shared_ptr<GcsActor> actor,
std::shared_ptr<GcsLeasedWorker> worker);

/// Get an existing lease client or connect a new one.
std::shared_ptr<RayletClientInterface> GetOrConnectRayletClient(
const rpc::Address &raylet_address);

/// Kill the actor on a node
bool KillActorOnWorker(const rpc::Address &worker_address, ActorID actor_id);
/// Kill the actor on a worker
/// \param raylet_address The address of the local raylet of the worker where the actor
/// is running \param worker_address The address of the worker where the actor is
/// running \param actor_id ID of the actor to kill
bool KillActorOnWorker(const rpc::Address &raylet_address,
const rpc::Address &worker_address,
ActorID actor_id);

/// Schedule the actor at GCS. The target Raylet is selected by hybrid_policy by
/// default.
Expand Down
1 change: 1 addition & 0 deletions src/ray/gcs/gcs_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,7 @@ void GcsServer::InitGcsActorManager(
[this](const ActorID &actor_id) {
gcs_placement_group_manager_->CleanPlacementGroupIfNeededWhenActorDead(actor_id);
},
raylet_client_pool_,
worker_client_pool_,
*ray_event_recorder_,
config_.session_name,
Expand Down
Loading