Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import os
import platform
import subprocess
import sys
import textwrap
from pathlib import Path
from typing import Set

Expand All @@ -11,6 +13,7 @@
import ray._common.utils as utils
import ray._private.ray_constants as ray_constants
import ray.scripts.scripts as scripts
from ray._common.test_utils import wait_for_condition
from ray._private.resource_isolation_config import ResourceIsolationConfig

# These tests are intended to run in CI inside a container.
Expand All @@ -21,7 +24,6 @@
# Run these commands locally before running the test suite:
#
# sudo mkdir -p /sys/fs/cgroup/resource_isolation_test
# echo "+cpu +memory" | sudo tee -a /sys/fs/cgroup/resource_isolation_test/cgroup.subtree_control
# sudo chown -R $(whoami):$(whoami) /sys/fs/cgroup/resource_isolation_test/
# sudo chmod -R u+rwx /sys/fs/cgroup/resource_isolation_test/
# echo $$ | sudo tee /sys/fs/cgroup/resource_isolation_test/cgroup.procs
Expand Down Expand Up @@ -337,6 +339,35 @@ def assert_cgroup_hierarchy_exists_for_node(
)


def assert_process_in_not_moved_into_ray_cgroups(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you forget a word in the name?

node_id: str,
resource_isolation_config: ResourceIsolationConfig,
pid: str,
):
"""Asserts that the system processes were created in the correct cgroup.

Args:
node_id: used to construct the path of the cgroup subtree
resource_isolation_config: used to construct the path of the cgroup
subtree
pid:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing

"""
base_cgroup_for_node = resource_isolation_config.cgroup_path
node_cgroup = Path(base_cgroup_for_node) / f"ray-node_{node_id}"
cgroup_procs_file_paths = [
node_cgroup / "system" / "leaf" / "cgroup.procs",
node_cgroup / "user" / "non-ray" / "cgroup.procs",
node_cgroup / "user" / "workers" / "cgroup.procs",
]
found_pid = False
for file_path in cgroup_procs_file_paths:
with open(file_path, "r") as cgroup_procs_file:
lines = cgroup_procs_file.readlines()
for line in lines:
found_pid = found_pid or (line.strip() == pid)
assert not found_pid


def assert_system_processes_are_in_system_cgroup(
node_id: str,
resource_isolation_config: ResourceIsolationConfig,
Expand Down Expand Up @@ -407,6 +438,30 @@ def assert_cgroup_hierarchy_cleaned_up_for_node(
), f"Root cgroup node at {node_cgroup} was not deleted. Cgroup cleanup failed. You may have to manually delete the cgroup subtree."


def create_driver_in_internal_namespace():
"""
Returns a driver process that is a part of the '_ray_internal_' namespace.
If the driver is part of the '_ray_internal_' namespace, it will NOT
be moved into the workers cgroup by the raylet when it registers.
The Dashboard ServeHead and JobHead modules are drivers that are
technically system processes and use the '_ray_internal_' namespace and therefore
must not be moved into the workers cgroup on registration.
"""

driver_code = textwrap.dedent(
"""
import ray
import time
ray.init(namespace='_ray_internal_')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's a constant for it somewhere you could use, but hard coding is also fine (it shouldn't change)

time.sleep(3600)
"""
).strip()

second_driver_proc = subprocess.Popen(["python", "-c", driver_code])

return second_driver_proc


# The following tests check for cgroup setup and cleanup with the
# ray cli.
def test_ray_cli_start_invalid_resource_isolation_config(cleanup_ray):
Expand Down Expand Up @@ -465,13 +520,16 @@ def __init__(self):
def get_pid(self):
return os.getpid()

second_driver_proc = create_driver_in_internal_namespace()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I only added the test to ray start because in ray.init, the testing process is a driver that is already in the workers cgroup. Therefore, the newly spawned second driver will also be in the workers cgroup.


actor_refs = []
for _ in range(num_cpus):
actor_refs.append(Actor.remote())
worker_pids = set()
worker_pids.add(str(os.getpid()))
for actor in actor_refs:
worker_pids.add(str(ray.get(actor.get_pid.remote())))

assert_system_processes_are_in_system_cgroup(
node_id,
resource_isolation_config,
Expand All @@ -480,8 +538,13 @@ def get_pid(self):
assert_worker_processes_are_in_workers_cgroup(
node_id, resource_isolation_config, worker_pids
)
runner.invoke(scripts.stop)
assert_process_in_not_moved_into_ray_cgroups(
node_id, resource_isolation_config, second_driver_proc.pid
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Test Assertion Fails Due to PID Type Mismatch

The assert_process_in_not_moved_into_ray_cgroups function expects a string PID but receives an integer from second_driver_proc.pid. This type mismatch causes the internal PID comparison to always fail, making the test assertion ineffective and always pass.

Fix in Cursor Fix in Web


second_driver_proc.kill()
wait_for_condition(lambda: second_driver_proc.wait(), timeout=5)
runner.invoke(scripts.stop)
assert_cgroup_hierarchy_cleaned_up_for_node(node_id, resource_isolation_config)


Expand Down
4 changes: 0 additions & 4 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1278,10 +1278,6 @@ Status NodeManager::RegisterForNewDriver(
worker->SetProcess(Process::FromPid(pid));
rpc::JobConfig job_config;
job_config.ParseFromString(message->serialized_job_config()->str());
Status s = cgroup_manager_->AddProcessToWorkersCgroup(std::to_string(pid));
RAY_CHECK(s.ok()) << absl::StrFormat(
"Failed to move the driver process into the workers cgroup with error %s",
s.ToString());
return worker_pool_.RegisterDriver(worker, job_config, send_reply_callback);
}

Expand Down
15 changes: 11 additions & 4 deletions src/ray/raylet/worker_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -846,6 +846,10 @@ Status WorkerPool::RegisterWorker(const std::shared_ptr<WorkerInterface> &worker
return Status::OK();
}

bool IsInternalNamespace(const std::string &ray_namespace) {
return absl::StartsWith(ray_namespace, kRayInternalNamespacePrefix);
}

void WorkerPool::OnWorkerStarted(const std::shared_ptr<WorkerInterface> &worker) {
auto &state = GetStateForLanguage(worker->GetLanguage());
const StartupToken worker_startup_token = worker->GetStartupToken();
Expand Down Expand Up @@ -907,6 +911,13 @@ Status WorkerPool::RegisterDriver(const std::shared_ptr<WorkerInterface> &driver
auto &state = GetStateForLanguage(driver->GetLanguage());
state.registered_drivers.insert(std::move(driver));
const auto job_id = driver->GetAssignedJobId();
// A subset of the Ray Dashboard Modules are registered as drivers under an
// internal namespace. These are system processes and therefore, do not need to be moved
// into the workers cgroup.
if (!IsInternalNamespace(job_config.ray_namespace())) {
add_to_cgroup_hook_(std::to_string(driver->GetProcess().GetId()));
}

HandleJobStarted(job_id, job_config);

if (driver->GetLanguage() == Language::JAVA) {
Expand Down Expand Up @@ -1657,10 +1668,6 @@ bool WorkerPool::IsWorkerAvailableForScheduling() const {
return false;
}

bool IsInternalNamespace(const std::string &ray_namespace) {
return absl::StartsWith(ray_namespace, kRayInternalNamespacePrefix);
}

std::vector<std::shared_ptr<WorkerInterface>> WorkerPool::GetAllRegisteredDrivers(
bool filter_dead_drivers, bool filter_system_drivers) const {
std::vector<std::shared_ptr<WorkerInterface>> drivers;
Expand Down