Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 34 additions & 30 deletions python/ray/tests/test_gcs_fault_tolerance.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import tempfile
import time
from concurrent.futures import ThreadPoolExecutor
from typing import Any
from typing import Any, Tuple

import pytest
from filelock import FileLock
Expand Down Expand Up @@ -254,38 +254,36 @@ def condition():
ray.get_actor("abc")


def test_worker_raylet_resubscription(tmp_path, ray_start_regular_with_external_redis):
# This test is to make sure resubscription in raylet is working.
# When subscription failed, raylet will not get worker failure error
# and thus, it won't kill the worker which is fate sharing with the failed
# one.

@ray.remote
def blocking_child():
(tmp_path / "blocking_child.pid").write_text(str(os.getpid()))
time.sleep(10000)
def test_raylet_resubscribe_to_worker_death(
tmp_path, ray_start_regular_with_external_redis
):
"""Verify that the Raylet resubscribes to worker death notifications on GCS restart."""

@ray.remote
def bar():
return (
os.getpid(),
# Use runtime env to make sure task is running in a different
# ray worker
blocking_child.options(runtime_env={"env_vars": {"P": ""}}).remote(),
)
child_task_pid_path = tmp_path / "blocking_child.pid"

(parent_pid, obj_ref) = ray.get(bar.remote())
@ray.remote(num_cpus=0)
def child():
print("Child worker ID:", ray.get_runtime_context().get_worker_id())
child_task_pid_path.write_text(str(os.getpid()))
while True:
time.sleep(0.1)
print("Child still running...")

blocking_child_pid = None
@ray.remote(num_cpus=0)
def parent() -> Tuple[int, int, ray.ObjectRef]:
print("Parent worker ID:", ray.get_runtime_context().get_worker_id())
child_obj_ref = child.remote()

def condition():
nonlocal blocking_child_pid
blocking_child_pid = int((tmp_path / "blocking_child.pid").read_text())
return True
# Wait for the child to be running and report back its PID.
wait_for_condition(lambda: child_task_pid_path.exists(), timeout=10)
child_pid = int(child_task_pid_path.read_text())
return os.getpid(), child_pid, child_obj_ref

wait_for_condition(condition, timeout=10)
parent_pid, child_pid, child_obj_ref = ray.get(parent.remote())
print("Parent PID:", parent_pid, ", child PID:", child_pid)
assert parent_pid != child_pid

# Kill and restart the GCS to trigger resubscription.
# Kill and restart the GCS.
ray._private.worker._global_node.kill_gcs_server()
ray._private.worker._global_node.start_gcs_server()

Expand All @@ -296,14 +294,20 @@ def condition():
gcs_address = ray._private.worker.global_worker.gcs_client.address
gcs_client = ray._raylet.GcsClient(address=gcs_address)
gcs_client.internal_kv_put(b"a", b"b", True, None)
assert gcs_client.internal_kv_get(b"a") == b"b"

# Kill the parent task, which should cause the blocking child task to exit.
# Kill the parent task and verify that the child task is killed due to fate sharing
# with its parent.
print("Killing parent process.")
p = psutil.Process(parent_pid)
p.kill()
p.wait()
print("Parent process exited.")

# The blocking child task should exit.
wait_for_pid_to_exit(blocking_child_pid, 10)
# The child task should exit.
wait_for_pid_to_exit(child_pid, 20)
with pytest.raises(ray.exceptions.OwnerDiedError):
ray.get(child_obj_ref)


def test_core_worker_resubscription(tmp_path, ray_start_regular_with_external_redis):
Expand Down
1 change: 0 additions & 1 deletion src/ray/common/asio/periodical_runner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ PeriodicalRunner::PeriodicalRunner(instrumented_io_context &io_service)
: io_service_(io_service) {}

PeriodicalRunner::~PeriodicalRunner() {
RAY_LOG(DEBUG) << "PeriodicalRunner is destructed";
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessarily noisy when debug logs were on

absl::MutexLock lock(&mutex_);
for (const auto &timer : timers_) {
timer->cancel();
Expand Down
2 changes: 1 addition & 1 deletion src/ray/common/ray_syncer/ray_syncer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ void RaySyncer::Connect(const std::string &node_id,
io_context_,
[this, remote_node_id, channel]() {
RAY_LOG(INFO).WithField(NodeID::FromBinary(remote_node_id))
<< "Connection is broken. Reconnect to node.";
<< "Connection to the node was broken, reconnecting.";
Connect(remote_node_id, channel);
},
/* delay_microseconds = */ std::chrono::milliseconds(2000));
Expand Down
4 changes: 2 additions & 2 deletions src/ray/common/ray_syncer/ray_syncer_bidi_reactor_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ class RaySyncerBidiReactorBase : public RaySyncerBidiReactor, public T {
if (ok) {
SendNext();
} else {
RAY_LOG_EVERY_MS(INFO, 1000) << "Failed to send the message to: "
RAY_LOG_EVERY_MS(INFO, 1000) << "Failed to send a message to node: "
<< NodeID::FromBinary(GetRemoteNodeID());
Disconnect();
}
Expand All @@ -180,7 +180,7 @@ class RaySyncerBidiReactorBase : public RaySyncerBidiReactor, public T {
}

if (!ok) {
RAY_LOG_EVERY_MS(INFO, 1000) << "Failed to read the message from: "
RAY_LOG_EVERY_MS(INFO, 1000) << "Failed to read a message from node: "
<< NodeID::FromBinary(GetRemoteNodeID());
Disconnect();
return;
Expand Down
55 changes: 25 additions & 30 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -545,9 +545,8 @@ void NodeManager::HandleJobFinished(const JobID &job_id, const JobTableData &job
(worker->GetAssignedJobId() == job_id)) {
// Don't kill worker processes belonging to the detached actor
// since those are expected to outlive the job.
RAY_LOG(INFO).WithField(worker->WorkerId())
<< "The leased worker "
<< " is killed because the job " << job_id << " finished.";
RAY_LOG(INFO).WithField(worker->WorkerId()).WithField(job_id)
<< "Killing leased worker because its job finished.";
rpc::ExitRequest request;
request.set_force_exit(true);
worker->rpc_client()->Exit(
Expand Down Expand Up @@ -948,7 +947,7 @@ void NodeManager::NodeRemoved(const NodeID &node_id) {
// If the leased worker's owner was on the failed node, then kill the leased
// worker.
RAY_LOG(INFO).WithField(worker->WorkerId()).WithField(owner_node_id)
<< "The leased worker is killed because the owner node died.";
<< "Killing leased worker because its owner's node died.";
worker->KillAsync(io_service_);
}

Expand Down Expand Up @@ -989,9 +988,10 @@ void NodeManager::HandleUnexpectedWorkerFailure(const WorkerID &worker_id) {
continue;
}
// If the failed worker was a leased worker's owner, then kill the leased worker.
RAY_LOG(INFO) << "The leased worker " << worker->WorkerId()
<< " is killed because the owner process " << owner_worker_id
<< " died.";
RAY_LOG(INFO)
.WithField(worker->WorkerId())
.WithField("owner_worker_id", owner_worker_id)
<< "Killing leased worker because its owner died.";
worker->KillAsync(io_service_);
}
}
Expand Down Expand Up @@ -1090,10 +1090,9 @@ void NodeManager::HandleClientConnectionError(
error.value(),
". ",
error.message(),
". There are some potential root causes. (1) The process is killed by "
"SIGKILL by OOM killer due to high memory usage. (2) ray stop --force is "
"called. (3) The worker is crashed unexpectedly due to SIGSEGV or other "
"unexpected errors.");
". Some common causes include: (1) the process was killed by the OOM killer "
"due to high memory usage, (2) ray stop --force was called, or (3) the worker "
"crashed unexpectedly due to SIGSEGV or another unexpected error.");

// Disconnect the client and don't process more messages.
DisconnectClient(
Expand Down Expand Up @@ -1413,31 +1412,27 @@ void NodeManager::DisconnectClient(const std::shared_ptr<ClientConnection> &clie
rpc::WorkerExitType disconnect_type,
const std::string &disconnect_detail,
const rpc::RayException *creation_task_exception) {
std::shared_ptr<WorkerInterface> worker = worker_pool_.GetRegisteredWorker(client);
bool is_worker = false, is_driver = false;
if (worker) {
// The client is a worker.
std::shared_ptr<WorkerInterface> worker;
if ((worker = worker_pool_.GetRegisteredWorker(client))) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dayshah is this frowned upon by c++ enjoyers?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, i kinda like the if value syntax, scopes it so it gets destroyed earlier + can't be accessed outside

is_worker = true;
RAY_LOG(INFO).WithField(worker->WorkerId()).WithField(worker->GetAssignedJobId())
<< "Disconnecting worker, graceful=" << std::boolalpha << graceful
<< ", disconnect_type=" << disconnect_type
<< ", has_creation_task_exception=" << std::boolalpha
<< (creation_task_exception != nullptr);
} else if ((worker = worker_pool_.GetRegisteredDriver(client))) {
is_driver = true;
RAY_LOG(INFO).WithField(worker->WorkerId()).WithField(worker->GetAssignedJobId())
<< "Disconnecting driver, graceful=" << std::boolalpha << graceful
<< ", disconnect_type=" << disconnect_type;
} else {
worker = worker_pool_.GetRegisteredDriver(client);
if (worker) {
// The client is a driver.
is_driver = true;
} else {
RAY_LOG(INFO)
<< "Not disconnecting client disconnect it has already been disconnected.";
return;
}
RAY_LOG(INFO) << "Got disconnect message from an unregistered client, ignoring.";
return;
}

RAY_LOG(INFO).WithField(worker->WorkerId())
<< "Disconnecting client, graceful=" << std::boolalpha << graceful
<< ", disconnect_type=" << disconnect_type
<< ", has_creation_task_exception=" << std::boolalpha
<< (creation_task_exception != nullptr);
RAY_CHECK(is_worker != is_driver) << "Client must be a registered worker or driver.";

RAY_CHECK(worker != nullptr);
RAY_CHECK(!(is_worker && is_driver));
// Clean up any open ray.get or ray.wait calls that the worker made.
lease_dependency_manager_.CancelGetRequest(worker->WorkerId());
lease_dependency_manager_.CancelWaitRequest(worker->WorkerId());
Expand Down