Skip to content

Commit 94ce173

Browse files
Sparks0219SheldonTsen
authored andcommitted
[core] Make KillActor RPC Fault Tolerant (ray-project#57648)
The issue with the current implementation of core worker HandleKillActor is that it won't send a reply when the RPC completes because the worker is dead. The application code from the GCS doesn't really care since it just logs the response if one is received, a response is only sent if the actor ID of the actor on the worker and in the RPC don't match, and the GCS will just log it and move on with its life. Hence we can't differentiate in the case of a transient network failure whether there was a network issue, or the actor was successfully killed. What I think is the most straightforward approach is instead of the GCS directly calling core worker KillActor, we have the GCS talk to the raylet instead and call a new RPC KillLocalActor that in turn calls KillActor. Since the raylet that receives KillLocalActor is local to the worker that the actor is on, we're guaranteed to kill it at that point (either through using KillActor, or if it hangs falling back to SIGKILL). Thus the main intuition is that the GCS now talks to the raylet, and this layer implements retries. Once the raylet receives the KillLocalActor request, it routes this to KillActor. This layer between the raylet and core worker does not have retries enabled because we can assume that RPCs between the local raylet and worker won't fail (same machine). We then check on the status of the worker after a while (5 seconds via kill_worker_timeout_milliseconds) and if it still hasn't been killed then we call DestroyWorker that in turn sends the SIGKILL. --------- Signed-off-by: joshlee <[email protected]>
1 parent fa9bf5a commit 94ce173

29 files changed

+387
-75
lines changed

python/ray/tests/test_raylet_fault_tolerance.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import os
12
import sys
23

34
import pytest
@@ -11,6 +12,8 @@
1112
PlacementGroupSchedulingStrategy,
1213
)
1314

15+
import psutil
16+
1417

1518
@pytest.mark.parametrize("deterministic_failure", ["request", "response"])
1619
def test_request_worker_lease_idempotent(
@@ -138,5 +141,44 @@ def task():
138141
assert result == "success"
139142

140143

144+
def test_kill_local_actor_rpc_retry_and_idempotency(monkeypatch, shutdown_only):
145+
"""Test that KillLocalActor RPC retries work correctly and guarantee actor death.
146+
Not testing response since the actor is killed either way.
147+
"""
148+
149+
monkeypatch.setenv(
150+
"RAY_testing_rpc_failure",
151+
"NodeManagerService.grpc_client.KillLocalActor=1:100:0",
152+
)
153+
154+
ray.init()
155+
156+
@ray.remote
157+
class SimpleActor:
158+
def ping(self):
159+
return "pong"
160+
161+
def get_pid(self):
162+
return os.getpid()
163+
164+
actor = SimpleActor.remote()
165+
166+
result = ray.get(actor.ping.remote())
167+
assert result == "pong"
168+
169+
worker_pid = ray.get(actor.get_pid.remote())
170+
171+
# NOTE: checking the process is still alive rather than checking the actor state from the GCS
172+
# since as long as KillActor is sent the GCS will mark the actor as dead even though it may not actually be
173+
assert psutil.pid_exists(worker_pid)
174+
175+
ray.kill(actor)
176+
177+
def verify_process_killed():
178+
return not psutil.pid_exists(worker_pid)
179+
180+
wait_for_condition(verify_process_killed, timeout=30)
181+
182+
141183
if __name__ == "__main__":
142184
sys.exit(pytest.main(["-sv", __file__]))

src/mock/ray/gcs/gcs_actor_manager.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ class MockGcsActorManager : public GcsActorManager {
2727
public:
2828
MockGcsActorManager(RuntimeEnvManager &runtime_env_manager,
2929
GCSFunctionManager &function_manager,
30+
rpc::RayletClientPool &raylet_client_pool,
3031
rpc::CoreWorkerClientPool &worker_client_pool)
3132
: GcsActorManager(
3233
/*scheduler=*/
@@ -37,6 +38,7 @@ class MockGcsActorManager : public GcsActorManager {
3738
runtime_env_manager,
3839
function_manager,
3940
[](const ActorID &) {},
41+
raylet_client_pool,
4042
worker_client_pool,
4143
/*ray_event_recorder=*/fake_ray_event_recorder_,
4244
/*session_name=*/"",

src/mock/ray/raylet_client/raylet_client.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,11 @@ class MockRayletClientInterface : public RayletClientInterface {
146146
(const rpc::GetNodeStatsRequest &request,
147147
const rpc::ClientCallback<rpc::GetNodeStatsReply> &callback),
148148
(override));
149+
MOCK_METHOD(void,
150+
KillLocalActor,
151+
(const rpc::KillLocalActorRequest &request,
152+
const rpc::ClientCallback<rpc::KillLocalActorReply> &callback),
153+
(override));
149154
MOCK_METHOD(void,
150155
GlobalGC,
151156
(const rpc::ClientCallback<rpc::GlobalGCReply> &callback),

src/ray/core_worker_rpc_client/fake_core_worker_client.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,9 @@ class FakeCoreWorkerClient : public CoreWorkerClientInterface {
7575
const ClientCallback<ReportGeneratorItemReturnsReply> &callback) override {}
7676

7777
void KillActor(const KillActorRequest &request,
78-
const ClientCallback<KillActorReply> &callback) override {}
78+
const ClientCallback<KillActorReply> &callback) override {
79+
num_kill_actor_requests++;
80+
}
7981

8082
void CancelTask(const CancelTaskRequest &request,
8183
const ClientCallback<CancelTaskReply> &callback) override {}
@@ -159,6 +161,7 @@ class FakeCoreWorkerClient : public CoreWorkerClientInterface {
159161
}
160162

161163
std::list<ClientCallback<PushTaskReply>> callbacks_ ABSL_GUARDED_BY(mutex_);
164+
size_t num_kill_actor_requests = 0;
162165
absl::Mutex mutex_;
163166
};
164167

src/ray/gcs/gcs_actor.cc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,14 @@ const rpc::Address &GcsActor::GetOwnerAddress() const {
6060
return actor_table_data_.owner_address();
6161
}
6262

63+
const std::optional<rpc::Address> &GcsActor::LocalRayletAddress() const {
64+
return local_raylet_address_;
65+
}
66+
67+
void GcsActor::UpdateLocalRayletAddress(const rpc::Address &address) {
68+
local_raylet_address_ = address;
69+
}
70+
6371
void GcsActor::UpdateState(rpc::ActorTableData::ActorState state) {
6472
actor_table_data_.set_state(state);
6573
RefreshMetrics();

src/ray/gcs/gcs_actor.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,10 @@ class GcsActor {
180180
NodeID GetOwnerNodeID() const;
181181
/// Get the address of the actor's owner.
182182
const rpc::Address &GetOwnerAddress() const;
183-
183+
/// Get the address of the local raylet for this actor
184+
const std::optional<rpc::Address> &LocalRayletAddress() const;
185+
/// Update the address of the local raylet for this actor
186+
void UpdateLocalRayletAddress(const rpc::Address &address);
184187
/// Update the `Address` of this actor (see gcs.proto).
185188
void UpdateAddress(const rpc::Address &address);
186189
/// Get the `Address` of this actor.
@@ -307,6 +310,8 @@ class GcsActor {
307310
/// Event recorder and session name for Ray events
308311
observability::RayEventRecorderInterface &ray_event_recorder_;
309312
std::string session_name_;
313+
/// Address of the local raylet of the worker where this actor is running
314+
std::optional<rpc::Address> local_raylet_address_;
310315
};
311316

312317
using RestartActorForLineageReconstructionCallback =

src/ray/gcs/gcs_actor_manager.cc

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,7 @@ GcsActorManager::GcsActorManager(
227227
RuntimeEnvManager &runtime_env_manager,
228228
GCSFunctionManager &function_manager,
229229
std::function<void(const ActorID &)> destroy_owned_placement_group_if_needed,
230+
rpc::RayletClientPool &raylet_client_pool,
230231
rpc::CoreWorkerClientPool &worker_client_pool,
231232
observability::RayEventRecorderInterface &ray_event_recorder,
232233
const std::string &session_name,
@@ -236,6 +237,7 @@ GcsActorManager::GcsActorManager(
236237
gcs_table_storage_(gcs_table_storage),
237238
io_context_(io_context),
238239
gcs_publisher_(gcs_publisher),
240+
raylet_client_pool_(raylet_client_pool),
239241
worker_client_pool_(worker_client_pool),
240242
ray_event_recorder_(ray_event_recorder),
241243
session_name_(session_name),
@@ -1001,7 +1003,7 @@ void GcsActorManager::DestroyActor(const ActorID &actor_id,
10011003
if (node_it != created_actors_.end() && node_it->second.count(worker_id)) {
10021004
// The actor has already been created. Destroy the process by force-killing
10031005
// it.
1004-
NotifyCoreWorkerToKillActor(actor, death_cause, force_kill);
1006+
NotifyRayletToKillActor(actor, death_cause, force_kill);
10051007
RAY_CHECK(node_it->second.erase(actor->GetWorkerID()));
10061008
if (node_it->second.empty()) {
10071009
created_actors_.erase(node_it);
@@ -1010,7 +1012,7 @@ void GcsActorManager::DestroyActor(const ActorID &actor_id,
10101012
if (!worker_id.IsNil()) {
10111013
// The actor is in phase of creating, so we need to notify the core
10121014
// worker exit to avoid process and resource leak.
1013-
NotifyCoreWorkerToKillActor(actor, death_cause, force_kill);
1015+
NotifyRayletToKillActor(actor, death_cause, force_kill);
10141016
}
10151017
CancelActorInScheduling(actor);
10161018
}
@@ -1744,24 +1746,36 @@ void GcsActorManager::RemoveActorFromOwner(const std::shared_ptr<GcsActor> &acto
17441746
}
17451747
}
17461748

1747-
void GcsActorManager::NotifyCoreWorkerToKillActor(const std::shared_ptr<GcsActor> &actor,
1748-
const rpc::ActorDeathCause &death_cause,
1749-
bool force_kill) {
1750-
rpc::KillActorRequest request;
1749+
void GcsActorManager::NotifyRayletToKillActor(const std::shared_ptr<GcsActor> &actor,
1750+
const rpc::ActorDeathCause &death_cause,
1751+
bool force_kill) {
1752+
rpc::KillLocalActorRequest request;
17511753
request.set_intended_actor_id(actor->GetActorID().Binary());
1754+
request.set_worker_id(actor->GetWorkerID().Binary());
17521755
request.mutable_death_cause()->CopyFrom(death_cause);
17531756
request.set_force_kill(force_kill);
1754-
auto actor_client = worker_client_pool_.GetOrConnect(actor->GetAddress());
1757+
if (!actor->LocalRayletAddress()) {
1758+
RAY_LOG(DEBUG) << "Actor " << actor->GetActorID() << " has not been assigned a lease";
1759+
return;
1760+
}
1761+
auto actor_raylet_client =
1762+
raylet_client_pool_.GetOrConnectByAddress(actor->LocalRayletAddress().value());
17551763
RAY_LOG(DEBUG)
17561764
.WithField(actor->GetActorID())
17571765
.WithField(actor->GetWorkerID())
17581766
.WithField(actor->GetNodeID())
17591767
<< "Send request to kill actor to worker at node";
1760-
actor_client->KillActor(request,
1761-
[actor_id = actor->GetActorID()](auto &status, auto &&) {
1762-
RAY_LOG(DEBUG) << "Killing status: " << status.ToString()
1763-
<< ", actor_id: " << actor_id;
1764-
});
1768+
actor_raylet_client->KillLocalActor(
1769+
request,
1770+
[actor_id = actor->GetActorID()](const ray::Status &status,
1771+
rpc::KillLocalActorReply &&reply) {
1772+
if (!status.ok()) {
1773+
RAY_LOG(ERROR) << "Failed to kill actor " << actor_id
1774+
<< ", return status: " << status.ToString();
1775+
} else {
1776+
RAY_LOG(INFO) << "Killed actor " << actor_id << " successfully.";
1777+
}
1778+
});
17651779
}
17661780

17671781
void GcsActorManager::KillActor(const ActorID &actor_id, bool force_kill) {
@@ -1786,7 +1800,7 @@ void GcsActorManager::KillActor(const ActorID &actor_id, bool force_kill) {
17861800
if (node_it != created_actors_.end() && node_it->second.count(worker_id)) {
17871801
// The actor has already been created. Destroy the process by force-killing
17881802
// it.
1789-
NotifyCoreWorkerToKillActor(
1803+
NotifyRayletToKillActor(
17901804
actor, GenKilledByApplicationCause(GetActor(actor_id)), force_kill);
17911805
} else {
17921806
const auto &lease_id = actor->GetLeaseSpecification().LeaseId();
@@ -1795,7 +1809,7 @@ void GcsActorManager::KillActor(const ActorID &actor_id, bool force_kill) {
17951809
if (!worker_id.IsNil()) {
17961810
// The actor is in phase of creating, so we need to notify the core
17971811
// worker exit to avoid process and resource leak.
1798-
NotifyCoreWorkerToKillActor(
1812+
NotifyRayletToKillActor(
17991813
actor, GenKilledByApplicationCause(GetActor(actor_id)), force_kill);
18001814
}
18011815
CancelActorInScheduling(actor);

src/ray/gcs/gcs_actor_manager.h

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ class GcsActorManager : public rpc::ActorInfoGcsServiceHandler {
105105
RuntimeEnvManager &runtime_env_manager,
106106
GCSFunctionManager &function_manager,
107107
std::function<void(const ActorID &)> destroy_owned_placement_group_if_needed,
108+
rpc::RayletClientPool &raylet_client_pool,
108109
rpc::CoreWorkerClientPool &worker_client_pool,
109110
observability::RayEventRecorderInterface &ray_event_recorder,
110111
const std::string &session_name,
@@ -353,14 +354,14 @@ class GcsActorManager : public rpc::ActorInfoGcsServiceHandler {
353354
/// \param force_kill Whether to force kill an actor by killing the worker.
354355
void KillActor(const ActorID &actor_id, bool force_kill);
355356

356-
/// Notify CoreWorker to kill the specified actor.
357+
/// Notify Raylet to kill the specified actor.
357358
///
358359
/// \param actor The actor to be killed.
359360
/// \param death_cause Context about why this actor is dead.
360361
/// \param force_kill Whether to force kill an actor by killing the worker.
361-
void NotifyCoreWorkerToKillActor(const std::shared_ptr<GcsActor> &actor,
362-
const rpc::ActorDeathCause &death_cause,
363-
bool force_kill = true);
362+
void NotifyRayletToKillActor(const std::shared_ptr<GcsActor> &actor,
363+
const rpc::ActorDeathCause &death_cause,
364+
bool force_kill = true);
364365

365366
/// Add the destroyed actor to the cache. If the cache is full, one actor is randomly
366367
/// evicted.
@@ -479,6 +480,8 @@ class GcsActorManager : public rpc::ActorInfoGcsServiceHandler {
479480
instrumented_io_context &io_context_;
480481
/// A publisher for publishing gcs messages.
481482
pubsub::GcsPublisher *gcs_publisher_;
483+
/// This is used to communicate with raylets where actors are located.
484+
rpc::RayletClientPool &raylet_client_pool_;
482485
/// This is used to communicate with actors and their owners.
483486
rpc::CoreWorkerClientPool &worker_client_pool_;
484487
/// Event recorder for emitting actor events

0 commit comments

Comments
 (0)