Skip to content

Commit fd1e404

Browse files
authored
[core] Clean up test_raylet_resubscribe_to_worker_death and relevant Raylet logs (#58244)
`test_gcs_fault_tolerance.py:: test_worker_raylet_resubscription` is still flaky in CI despite bumping up the timeout. Making a few improvements here: - Increasing the timeout to `20s` just in case it's a timeout issue (unlikely). - Changing to scheduling an actor instead of using `internal_kv` for our signal that the GCS is back up. This should better indicate that the Raylet is resubscribed. - Cleaning up some system logs. - Modifying the `ObjectLostError` logs to avoid logging likely-irrelevant plasma usage on owner death. It's likely that the underlying issue here is that we don't actually reliably resubscribe to all worker death notifications, as indicated in the TODO in the PR. --------- Signed-off-by: Edward Oakes <[email protected]>
1 parent ed49a53 commit fd1e404

File tree

8 files changed

+82
-77
lines changed

8 files changed

+82
-77
lines changed

python/ray/_private/worker.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1023,7 +1023,9 @@ def get_objects(
10231023
# Raise exceptions instead of returning them to the user.
10241024
for i, value in enumerate(values):
10251025
if isinstance(value, RayError):
1026-
if isinstance(value, ray.exceptions.ObjectLostError):
1026+
if isinstance(
1027+
value, ray.exceptions.ObjectLostError
1028+
) and not isinstance(value, ray.exceptions.OwnerDiedError):
10271029
global_worker.core_worker.log_plasma_usage()
10281030
if isinstance(value, RayTaskError):
10291031
raise value.as_instanceof_cause()
@@ -2972,7 +2974,11 @@ def get(
29722974
)
29732975
for i, value in enumerate(values):
29742976
if isinstance(value, RayError):
2975-
if isinstance(value, ray.exceptions.ObjectLostError):
2977+
# If the object was lost and it wasn't due to owner death, it may be
2978+
# because the object store is full and objects needed to be evicted.
2979+
if isinstance(value, ray.exceptions.ObjectLostError) and not isinstance(
2980+
value, ray.exceptions.OwnerDiedError
2981+
):
29762982
worker.core_worker.log_plasma_usage()
29772983
if isinstance(value, RayTaskError):
29782984
raise value.as_instanceof_cause()

python/ray/tests/test_gcs_fault_tolerance.py

Lines changed: 40 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import tempfile
77
import time
88
from concurrent.futures import ThreadPoolExecutor
9-
from typing import Any
9+
from typing import Any, Tuple
1010

1111
import pytest
1212
from filelock import FileLock
@@ -254,56 +254,62 @@ def condition():
254254
ray.get_actor("abc")
255255

256256

257-
def test_worker_raylet_resubscription(tmp_path, ray_start_regular_with_external_redis):
258-
# This test is to make sure resubscription in raylet is working.
259-
# When subscription failed, raylet will not get worker failure error
260-
# and thus, it won't kill the worker which is fate sharing with the failed
261-
# one.
257+
def test_raylet_resubscribe_to_worker_death(
258+
tmp_path, ray_start_regular_with_external_redis
259+
):
260+
"""Verify that the Raylet resubscribes to worker death notifications on GCS restart."""
262261

263-
@ray.remote
264-
def blocking_child():
265-
(tmp_path / "blocking_child.pid").write_text(str(os.getpid()))
266-
time.sleep(10000)
262+
child_task_pid_path = tmp_path / "blocking_child.pid"
267263

268-
@ray.remote
269-
def bar():
270-
return (
271-
os.getpid(),
272-
# Use runtime env to make sure task is running in a different
273-
# ray worker
274-
blocking_child.options(runtime_env={"env_vars": {"P": ""}}).remote(),
275-
)
264+
@ray.remote(num_cpus=0)
265+
def child():
266+
print("Child worker ID:", ray.get_runtime_context().get_worker_id())
267+
child_task_pid_path.write_text(str(os.getpid()))
268+
while True:
269+
time.sleep(0.1)
270+
print("Child still running...")
276271

277-
(parent_pid, obj_ref) = ray.get(bar.remote())
272+
@ray.remote(num_cpus=0)
273+
def parent() -> Tuple[int, int, ray.ObjectRef]:
274+
print("Parent worker ID:", ray.get_runtime_context().get_worker_id())
275+
child_obj_ref = child.remote()
278276

279-
blocking_child_pid = None
277+
# Wait for the child to be running and report back its PID.
278+
wait_for_condition(lambda: child_task_pid_path.exists(), timeout=10)
279+
child_pid = int(child_task_pid_path.read_text())
280+
return os.getpid(), child_pid, child_obj_ref
280281

281-
def condition():
282-
nonlocal blocking_child_pid
283-
blocking_child_pid = int((tmp_path / "blocking_child.pid").read_text())
284-
return True
282+
parent_pid, child_pid, child_obj_ref = ray.get(parent.remote())
283+
print(f"Parent PID: {parent_pid}, child PID: {child_pid}")
284+
assert parent_pid != child_pid
285285

286-
wait_for_condition(condition, timeout=10)
287-
288-
# Kill and restart the GCS to trigger resubscription.
286+
# Kill and restart the GCS.
289287
ray._private.worker._global_node.kill_gcs_server()
290288
ray._private.worker._global_node.start_gcs_server()
291289

292-
# Make an internal KV request to ensure the GCS is back alive.
290+
# Schedule an actor to ensure that the GCS is back alive and the Raylet is
291+
# reconnected to it.
293292
# TODO(iycheng): this shouldn't be necessary, but the current resubscription
294293
# implementation can lose the worker failure message because we don't ask for
295294
# the snapshot of worker statuses.
296-
gcs_address = ray._private.worker.global_worker.gcs_client.address
297-
gcs_client = ray._raylet.GcsClient(address=gcs_address)
298-
gcs_client.internal_kv_put(b"a", b"b", True, None)
295+
@ray.remote
296+
class A:
297+
pass
298+
299+
ray.get(A.remote().__ray_ready__.remote())
299300

300-
# Kill the parent task, which should cause the blocking child task to exit.
301+
# Kill the parent task and verify that the child task is killed due to fate sharing
302+
# with its parent.
303+
print("Killing parent process.")
301304
p = psutil.Process(parent_pid)
302305
p.kill()
303306
p.wait()
307+
print("Parent process exited.")
304308

305-
# The blocking child task should exit.
306-
wait_for_pid_to_exit(blocking_child_pid, 10)
309+
# The child task should exit.
310+
wait_for_pid_to_exit(child_pid, 20)
311+
with pytest.raises(ray.exceptions.OwnerDiedError):
312+
ray.get(child_obj_ref)
307313

308314

309315
def test_core_worker_resubscription(tmp_path, ray_start_regular_with_external_redis):

src/ray/common/asio/periodical_runner.cc

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ PeriodicalRunner::PeriodicalRunner(instrumented_io_context &io_service)
2727
: io_service_(io_service) {}
2828

2929
PeriodicalRunner::~PeriodicalRunner() {
30-
RAY_LOG(DEBUG) << "PeriodicalRunner is destructed";
3130
absl::MutexLock lock(&mutex_);
3231
for (const auto &timer : timers_) {
3332
timer->cancel();

src/ray/gcs_rpc_client/accessor.cc

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -874,9 +874,7 @@ void NodeInfoAccessor::AsyncResubscribe() {
874874
/*done=*/
875875
[this](const Status &) {
876876
fetch_node_data_operation_([](const Status &) {
877-
RAY_LOG(INFO)
878-
<< "Finished fetching all node information from gcs server after gcs "
879-
"server or pub-sub server is restarted.";
877+
RAY_LOG(INFO) << "Finished fetching all node information for resubscription.";
880878
});
881879
});
882880
}
@@ -887,10 +885,8 @@ void NodeInfoAccessor::AsyncResubscribe() {
887885
/*done=*/
888886
[this](const Status &) {
889887
fetch_node_address_and_liveness_data_operation_([](const Status &) {
890-
RAY_LOG(INFO)
891-
<< "Finished fetching all node address and liveness information from gcs "
892-
"server after gcs "
893-
"server or pub-sub server is restarted.";
888+
RAY_LOG(INFO) << "Finished fetching all node address and liveness "
889+
"information for resubscription.";
894890
});
895891
});
896892
}

src/ray/gcs_rpc_client/gcs_client.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ Status GcsClient::Connect(instrumented_io_context &io_service, int64_t timeout_m
114114
options_.gcs_address_, options_.gcs_port_, *client_call_manager_);
115115

116116
resubscribe_func_ = [this]() {
117+
RAY_LOG(INFO) << "Resubscribing to GCS tables.";
117118
job_accessor_->AsyncResubscribe();
118119
actor_accessor_->AsyncResubscribe();
119120
node_accessor_->AsyncResubscribe();

src/ray/ray_syncer/ray_syncer.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ void RaySyncer::Connect(const std::string &node_id,
101101
io_context_,
102102
[this, remote_node_id, channel]() {
103103
RAY_LOG(INFO).WithField(NodeID::FromBinary(remote_node_id))
104-
<< "Connection is broken. Reconnect to node.";
104+
<< "Connection to the node was broken, reconnecting.";
105105
Connect(remote_node_id, channel);
106106
},
107107
/* delay_microseconds = */ std::chrono::milliseconds(2000));

src/ray/ray_syncer/ray_syncer_bidi_reactor_base.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ class RaySyncerBidiReactorBase : public RaySyncerBidiReactor, public T {
161161
if (ok) {
162162
SendNext();
163163
} else {
164-
RAY_LOG_EVERY_MS(INFO, 1000) << "Failed to send the message to: "
164+
RAY_LOG_EVERY_MS(INFO, 1000) << "Failed to send a message to node: "
165165
<< NodeID::FromBinary(GetRemoteNodeID());
166166
Disconnect();
167167
}
@@ -180,7 +180,7 @@ class RaySyncerBidiReactorBase : public RaySyncerBidiReactor, public T {
180180
}
181181

182182
if (!ok) {
183-
RAY_LOG_EVERY_MS(INFO, 1000) << "Failed to read the message from: "
183+
RAY_LOG_EVERY_MS(INFO, 1000) << "Failed to read a message from node: "
184184
<< NodeID::FromBinary(GetRemoteNodeID());
185185
Disconnect();
186186
return;

src/ray/raylet/node_manager.cc

Lines changed: 27 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -545,9 +545,8 @@ void NodeManager::HandleJobFinished(const JobID &job_id, const JobTableData &job
545545
(worker->GetAssignedJobId() == job_id)) {
546546
// Don't kill worker processes belonging to the detached actor
547547
// since those are expected to outlive the job.
548-
RAY_LOG(INFO).WithField(worker->WorkerId())
549-
<< "The leased worker "
550-
<< " is killed because the job " << job_id << " finished.";
548+
RAY_LOG(INFO).WithField(worker->WorkerId()).WithField(job_id)
549+
<< "Killing leased worker because its job finished.";
551550
rpc::ExitRequest request;
552551
request.set_force_exit(true);
553552
worker->rpc_client()->Exit(
@@ -948,7 +947,7 @@ void NodeManager::NodeRemoved(const NodeID &node_id) {
948947
// If the leased worker's owner was on the failed node, then kill the leased
949948
// worker.
950949
RAY_LOG(INFO).WithField(worker->WorkerId()).WithField(owner_node_id)
951-
<< "The leased worker is killed because the owner node died.";
950+
<< "Killing leased worker because its owner's node died.";
952951
worker->KillAsync(io_service_);
953952
}
954953

@@ -989,9 +988,10 @@ void NodeManager::HandleUnexpectedWorkerFailure(const WorkerID &worker_id) {
989988
continue;
990989
}
991990
// If the failed worker was a leased worker's owner, then kill the leased worker.
992-
RAY_LOG(INFO) << "The leased worker " << worker->WorkerId()
993-
<< " is killed because the owner process " << owner_worker_id
994-
<< " died.";
991+
RAY_LOG(INFO)
992+
.WithField(worker->WorkerId())
993+
.WithField("owner_worker_id", owner_worker_id)
994+
<< "Killing leased worker because its owner died.";
995995
worker->KillAsync(io_service_);
996996
}
997997
}
@@ -1053,6 +1053,8 @@ bool NodeManager::ResourceDeleted(const NodeID &node_id,
10531053
void NodeManager::HandleNotifyGCSRestart(rpc::NotifyGCSRestartRequest request,
10541054
rpc::NotifyGCSRestartReply *reply,
10551055
rpc::SendReplyCallback send_reply_callback) {
1056+
RAY_LOG(INFO)
1057+
<< "The GCS has restarted. Resubscribing to pubsub and notifying local workers.";
10561058
// When GCS restarts, it'll notify raylet to do some initialization work
10571059
// (resubscribing). Raylet will also notify all workers to do this job. Workers are
10581060
// registered to raylet first (blocking call) and then connect to GCS, so there is no
@@ -1090,10 +1092,9 @@ void NodeManager::HandleClientConnectionError(
10901092
error.value(),
10911093
". ",
10921094
error.message(),
1093-
". There are some potential root causes. (1) The process is killed by "
1094-
"SIGKILL by OOM killer due to high memory usage. (2) ray stop --force is "
1095-
"called. (3) The worker is crashed unexpectedly due to SIGSEGV or other "
1096-
"unexpected errors.");
1095+
". Some common causes include: (1) the process was killed by the OOM killer "
1096+
"due to high memory usage, (2) ray stop --force was called, or (3) the worker "
1097+
"crashed unexpectedly due to SIGSEGV or another unexpected error.");
10971098

10981099
// Disconnect the client and don't process more messages.
10991100
DisconnectClient(
@@ -1413,31 +1414,27 @@ void NodeManager::DisconnectClient(const std::shared_ptr<ClientConnection> &clie
14131414
rpc::WorkerExitType disconnect_type,
14141415
const std::string &disconnect_detail,
14151416
const rpc::RayException *creation_task_exception) {
1416-
std::shared_ptr<WorkerInterface> worker = worker_pool_.GetRegisteredWorker(client);
14171417
bool is_worker = false, is_driver = false;
1418-
if (worker) {
1419-
// The client is a worker.
1418+
std::shared_ptr<WorkerInterface> worker;
1419+
if ((worker = worker_pool_.GetRegisteredWorker(client))) {
14201420
is_worker = true;
1421+
RAY_LOG(INFO).WithField(worker->WorkerId()).WithField(worker->GetAssignedJobId())
1422+
<< "Disconnecting worker, graceful=" << std::boolalpha << graceful
1423+
<< ", disconnect_type=" << disconnect_type
1424+
<< ", has_creation_task_exception=" << std::boolalpha
1425+
<< (creation_task_exception != nullptr);
1426+
} else if ((worker = worker_pool_.GetRegisteredDriver(client))) {
1427+
is_driver = true;
1428+
RAY_LOG(INFO).WithField(worker->WorkerId()).WithField(worker->GetAssignedJobId())
1429+
<< "Disconnecting driver, graceful=" << std::boolalpha << graceful
1430+
<< ", disconnect_type=" << disconnect_type;
14211431
} else {
1422-
worker = worker_pool_.GetRegisteredDriver(client);
1423-
if (worker) {
1424-
// The client is a driver.
1425-
is_driver = true;
1426-
} else {
1427-
RAY_LOG(INFO)
1428-
<< "Not disconnecting client disconnect it has already been disconnected.";
1429-
return;
1430-
}
1432+
RAY_LOG(INFO) << "Got disconnect message from an unregistered client, ignoring.";
1433+
return;
14311434
}
14321435

1433-
RAY_LOG(INFO).WithField(worker->WorkerId())
1434-
<< "Disconnecting client, graceful=" << std::boolalpha << graceful
1435-
<< ", disconnect_type=" << disconnect_type
1436-
<< ", has_creation_task_exception=" << std::boolalpha
1437-
<< (creation_task_exception != nullptr);
1436+
RAY_CHECK(is_worker != is_driver) << "Client must be a registered worker or driver.";
14381437

1439-
RAY_CHECK(worker != nullptr);
1440-
RAY_CHECK(!(is_worker && is_driver));
14411438
// Clean up any open ray.get or ray.wait calls that the worker made.
14421439
lease_dependency_manager_.CancelGetRequest(worker->WorkerId());
14431440
lease_dependency_manager_.CancelWaitRequest(worker->WorkerId());

0 commit comments

Comments
 (0)