Skip to content

Commit e81f80f

Browse files
authored
[core] cherrypicking (cgroups 21/n) into v2.51.0 (#58017)
Cherrypicking #57955 into v2.51.0 Signed-off-by: irabbani <[email protected]>
1 parent 7034ba5 commit e81f80f

File tree

3 files changed

+76
-10
lines changed

3 files changed

+76
-10
lines changed

python/ray/tests/resource_isolation/test_resource_isolation_integration.py

Lines changed: 65 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import os
22
import platform
3+
import subprocess
34
import sys
5+
import textwrap
46
from pathlib import Path
57
from typing import Set
68

@@ -11,6 +13,7 @@
1113
import ray._common.utils as utils
1214
import ray._private.ray_constants as ray_constants
1315
import ray.scripts.scripts as scripts
16+
from ray._common.test_utils import wait_for_condition
1417
from ray._private.resource_isolation_config import ResourceIsolationConfig
1518

1619
# These tests are intended to run in CI inside a container.
@@ -21,7 +24,6 @@
2124
# Run these commands locally before running the test suite:
2225
#
2326
# sudo mkdir -p /sys/fs/cgroup/resource_isolation_test
24-
# echo "+cpu +memory" | sudo tee -a /sys/fs/cgroup/resource_isolation_test/cgroup.subtree_control
2527
# sudo chown -R $(whoami):$(whoami) /sys/fs/cgroup/resource_isolation_test/
2628
# sudo chmod -R u+rwx /sys/fs/cgroup/resource_isolation_test/
2729
# echo $$ | sudo tee /sys/fs/cgroup/resource_isolation_test/cgroup.procs
@@ -337,6 +339,35 @@ def assert_cgroup_hierarchy_exists_for_node(
337339
)
338340

339341

342+
def assert_process_in_not_moved_into_ray_cgroups(
343+
node_id: str,
344+
resource_isolation_config: ResourceIsolationConfig,
345+
pid: str,
346+
):
347+
"""Asserts that the system processes were created in the correct cgroup.
348+
349+
Args:
350+
node_id: used to construct the path of the cgroup subtree
351+
resource_isolation_config: used to construct the path of the cgroup
352+
subtree
353+
pid:
354+
"""
355+
base_cgroup_for_node = resource_isolation_config.cgroup_path
356+
node_cgroup = Path(base_cgroup_for_node) / f"ray-node_{node_id}"
357+
cgroup_procs_file_paths = [
358+
node_cgroup / "system" / "leaf" / "cgroup.procs",
359+
node_cgroup / "user" / "non-ray" / "cgroup.procs",
360+
node_cgroup / "user" / "workers" / "cgroup.procs",
361+
]
362+
found_pid = False
363+
for file_path in cgroup_procs_file_paths:
364+
with open(file_path, "r") as cgroup_procs_file:
365+
lines = cgroup_procs_file.readlines()
366+
for line in lines:
367+
found_pid = found_pid or (line.strip() == pid)
368+
assert not found_pid
369+
370+
340371
def assert_system_processes_are_in_system_cgroup(
341372
node_id: str,
342373
resource_isolation_config: ResourceIsolationConfig,
@@ -407,6 +438,30 @@ def assert_cgroup_hierarchy_cleaned_up_for_node(
407438
), f"Root cgroup node at {node_cgroup} was not deleted. Cgroup cleanup failed. You may have to manually delete the cgroup subtree."
408439

409440

441+
def create_driver_in_internal_namespace():
442+
"""
443+
Returns a driver process that is a part of the '_ray_internal_' namespace.
444+
If the driver is part of the '_ray_internal_' namespace, it will NOT
445+
be moved into the workers cgroup by the raylet when it registers.
446+
The Dashboard ServeHead and JobHead modules are drivers that are
447+
technically system processes and use the '_ray_internal_' namespace and therefore
448+
must not be moved into the workers cgroup on registration.
449+
"""
450+
451+
driver_code = textwrap.dedent(
452+
"""
453+
import ray
454+
import time
455+
ray.init(namespace='_ray_internal_')
456+
time.sleep(3600)
457+
"""
458+
).strip()
459+
460+
second_driver_proc = subprocess.Popen(["python", "-c", driver_code])
461+
462+
return second_driver_proc
463+
464+
410465
# The following tests check for cgroup setup and cleanup with the
411466
# ray cli.
412467
def test_ray_cli_start_invalid_resource_isolation_config(cleanup_ray):
@@ -465,13 +520,16 @@ def __init__(self):
465520
def get_pid(self):
466521
return os.getpid()
467522

523+
second_driver_proc = create_driver_in_internal_namespace()
524+
468525
actor_refs = []
469526
for _ in range(num_cpus):
470527
actor_refs.append(Actor.remote())
471528
worker_pids = set()
472529
worker_pids.add(str(os.getpid()))
473530
for actor in actor_refs:
474531
worker_pids.add(str(ray.get(actor.get_pid.remote())))
532+
475533
assert_system_processes_are_in_system_cgroup(
476534
node_id,
477535
resource_isolation_config,
@@ -480,8 +538,13 @@ def get_pid(self):
480538
assert_worker_processes_are_in_workers_cgroup(
481539
node_id, resource_isolation_config, worker_pids
482540
)
483-
runner.invoke(scripts.stop)
541+
assert_process_in_not_moved_into_ray_cgroups(
542+
node_id, resource_isolation_config, second_driver_proc.pid
543+
)
484544

545+
second_driver_proc.kill()
546+
wait_for_condition(lambda: second_driver_proc.wait(), timeout=5)
547+
runner.invoke(scripts.stop)
485548
assert_cgroup_hierarchy_cleaned_up_for_node(node_id, resource_isolation_config)
486549

487550

src/ray/raylet/node_manager.cc

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1278,10 +1278,6 @@ Status NodeManager::RegisterForNewDriver(
12781278
worker->SetProcess(Process::FromPid(pid));
12791279
rpc::JobConfig job_config;
12801280
job_config.ParseFromString(message->serialized_job_config()->str());
1281-
Status s = cgroup_manager_->AddProcessToWorkersCgroup(std::to_string(pid));
1282-
RAY_CHECK(s.ok()) << absl::StrFormat(
1283-
"Failed to move the driver process into the workers cgroup with error %s",
1284-
s.ToString());
12851281
return worker_pool_.RegisterDriver(worker, job_config, send_reply_callback);
12861282
}
12871283

src/ray/raylet/worker_pool.cc

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -846,6 +846,10 @@ Status WorkerPool::RegisterWorker(const std::shared_ptr<WorkerInterface> &worker
846846
return Status::OK();
847847
}
848848

849+
bool IsInternalNamespace(const std::string &ray_namespace) {
850+
return absl::StartsWith(ray_namespace, kRayInternalNamespacePrefix);
851+
}
852+
849853
void WorkerPool::OnWorkerStarted(const std::shared_ptr<WorkerInterface> &worker) {
850854
auto &state = GetStateForLanguage(worker->GetLanguage());
851855
const StartupToken worker_startup_token = worker->GetStartupToken();
@@ -907,6 +911,13 @@ Status WorkerPool::RegisterDriver(const std::shared_ptr<WorkerInterface> &driver
907911
auto &state = GetStateForLanguage(driver->GetLanguage());
908912
state.registered_drivers.insert(std::move(driver));
909913
const auto job_id = driver->GetAssignedJobId();
914+
// A subset of the Ray Dashboard Modules are registered as drivers under an
915+
// internal namespace. These are system processes and therefore, do not need to be moved
916+
// into the workers cgroup.
917+
if (!IsInternalNamespace(job_config.ray_namespace())) {
918+
add_to_cgroup_hook_(std::to_string(driver->GetProcess().GetId()));
919+
}
920+
910921
HandleJobStarted(job_id, job_config);
911922

912923
if (driver->GetLanguage() == Language::JAVA) {
@@ -1657,10 +1668,6 @@ bool WorkerPool::IsWorkerAvailableForScheduling() const {
16571668
return false;
16581669
}
16591670

1660-
bool IsInternalNamespace(const std::string &ray_namespace) {
1661-
return absl::StartsWith(ray_namespace, kRayInternalNamespacePrefix);
1662-
}
1663-
16641671
std::vector<std::shared_ptr<WorkerInterface>> WorkerPool::GetAllRegisteredDrivers(
16651672
bool filter_dead_drivers, bool filter_system_drivers) const {
16661673
std::vector<std::shared_ptr<WorkerInterface>> drivers;

0 commit comments

Comments
 (0)