diff --git a/python/ray/tests/resource_isolation/test_resource_isolation_integration.py b/python/ray/tests/resource_isolation/test_resource_isolation_integration.py index 666d021dd4d7..a40158ea98fc 100644 --- a/python/ray/tests/resource_isolation/test_resource_isolation_integration.py +++ b/python/ray/tests/resource_isolation/test_resource_isolation_integration.py @@ -1,6 +1,8 @@ import os import platform +import subprocess import sys +import textwrap from pathlib import Path from typing import Set @@ -11,6 +13,7 @@ import ray._common.utils as utils import ray._private.ray_constants as ray_constants import ray.scripts.scripts as scripts +from ray._common.test_utils import wait_for_condition from ray._private.resource_isolation_config import ResourceIsolationConfig # These tests are intended to run in CI inside a container. @@ -21,7 +24,6 @@ # Run these commands locally before running the test suite: # # sudo mkdir -p /sys/fs/cgroup/resource_isolation_test -# echo "+cpu +memory" | sudo tee -a /sys/fs/cgroup/resource_isolation_test/cgroup.subtree_control # sudo chown -R $(whoami):$(whoami) /sys/fs/cgroup/resource_isolation_test/ # sudo chmod -R u+rwx /sys/fs/cgroup/resource_isolation_test/ # echo $$ | sudo tee /sys/fs/cgroup/resource_isolation_test/cgroup.procs @@ -337,6 +339,35 @@ def assert_cgroup_hierarchy_exists_for_node( ) +def assert_process_in_not_moved_into_ray_cgroups( + node_id: str, + resource_isolation_config: ResourceIsolationConfig, + pid: str, +): + """Asserts that the system processes were created in the correct cgroup. + + Args: + node_id: used to construct the path of the cgroup subtree + resource_isolation_config: used to construct the path of the cgroup + subtree + pid: + """ + base_cgroup_for_node = resource_isolation_config.cgroup_path + node_cgroup = Path(base_cgroup_for_node) / f"ray-node_{node_id}" + cgroup_procs_file_paths = [ + node_cgroup / "system" / "leaf" / "cgroup.procs", + node_cgroup / "user" / "non-ray" / "cgroup.procs", + node_cgroup / "user" / "workers" / "cgroup.procs", + ] + found_pid = False + for file_path in cgroup_procs_file_paths: + with open(file_path, "r") as cgroup_procs_file: + lines = cgroup_procs_file.readlines() + for line in lines: + found_pid = found_pid or (line.strip() == pid) + assert not found_pid + + def assert_system_processes_are_in_system_cgroup( node_id: str, resource_isolation_config: ResourceIsolationConfig, @@ -407,6 +438,30 @@ def assert_cgroup_hierarchy_cleaned_up_for_node( ), f"Root cgroup node at {node_cgroup} was not deleted. Cgroup cleanup failed. You may have to manually delete the cgroup subtree." +def create_driver_in_internal_namespace(): + """ + Returns a driver process that is a part of the '_ray_internal_' namespace. + If the driver is part of the '_ray_internal_' namespace, it will NOT + be moved into the workers cgroup by the raylet when it registers. + The Dashboard ServeHead and JobHead modules are drivers that are + technically system processes and use the '_ray_internal_' namespace and therefore + must not be moved into the workers cgroup on registration. + """ + + driver_code = textwrap.dedent( + """ + import ray + import time + ray.init(namespace='_ray_internal_') + time.sleep(3600) + """ + ).strip() + + second_driver_proc = subprocess.Popen(["python", "-c", driver_code]) + + return second_driver_proc + + # The following tests check for cgroup setup and cleanup with the # ray cli. def test_ray_cli_start_invalid_resource_isolation_config(cleanup_ray): @@ -465,6 +520,8 @@ def __init__(self): def get_pid(self): return os.getpid() + second_driver_proc = create_driver_in_internal_namespace() + actor_refs = [] for _ in range(num_cpus): actor_refs.append(Actor.remote()) @@ -472,6 +529,7 @@ def get_pid(self): worker_pids.add(str(os.getpid())) for actor in actor_refs: worker_pids.add(str(ray.get(actor.get_pid.remote()))) + assert_system_processes_are_in_system_cgroup( node_id, resource_isolation_config, @@ -480,8 +538,13 @@ def get_pid(self): assert_worker_processes_are_in_workers_cgroup( node_id, resource_isolation_config, worker_pids ) - runner.invoke(scripts.stop) + assert_process_in_not_moved_into_ray_cgroups( + node_id, resource_isolation_config, second_driver_proc.pid + ) + second_driver_proc.kill() + wait_for_condition(lambda: second_driver_proc.wait(), timeout=5) + runner.invoke(scripts.stop) assert_cgroup_hierarchy_cleaned_up_for_node(node_id, resource_isolation_config) diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index bc6bd476d799..90eaa40d4985 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1278,10 +1278,6 @@ Status NodeManager::RegisterForNewDriver( worker->SetProcess(Process::FromPid(pid)); rpc::JobConfig job_config; job_config.ParseFromString(message->serialized_job_config()->str()); - Status s = cgroup_manager_->AddProcessToWorkersCgroup(std::to_string(pid)); - RAY_CHECK(s.ok()) << absl::StrFormat( - "Failed to move the driver process into the workers cgroup with error %s", - s.ToString()); return worker_pool_.RegisterDriver(worker, job_config, send_reply_callback); } diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index 5520aade4c74..48c8cb0d384d 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -846,6 +846,10 @@ Status WorkerPool::RegisterWorker(const std::shared_ptr &worker return Status::OK(); } +bool IsInternalNamespace(const std::string &ray_namespace) { + return absl::StartsWith(ray_namespace, kRayInternalNamespacePrefix); +} + void WorkerPool::OnWorkerStarted(const std::shared_ptr &worker) { auto &state = GetStateForLanguage(worker->GetLanguage()); const StartupToken worker_startup_token = worker->GetStartupToken(); @@ -907,6 +911,13 @@ Status WorkerPool::RegisterDriver(const std::shared_ptr &driver auto &state = GetStateForLanguage(driver->GetLanguage()); state.registered_drivers.insert(std::move(driver)); const auto job_id = driver->GetAssignedJobId(); + // A subset of the Ray Dashboard Modules are registered as drivers under an + // internal namespace. These are system processes and therefore, do not need to be moved + // into the workers cgroup. + if (!IsInternalNamespace(job_config.ray_namespace())) { + add_to_cgroup_hook_(std::to_string(driver->GetProcess().GetId())); + } + HandleJobStarted(job_id, job_config); if (driver->GetLanguage() == Language::JAVA) { @@ -1657,10 +1668,6 @@ bool WorkerPool::IsWorkerAvailableForScheduling() const { return false; } -bool IsInternalNamespace(const std::string &ray_namespace) { - return absl::StartsWith(ray_namespace, kRayInternalNamespacePrefix); -} - std::vector> WorkerPool::GetAllRegisteredDrivers( bool filter_dead_drivers, bool filter_system_drivers) const { std::vector> drivers;