Skip to content

Commit 41e40fd

Browse files
israbbaniedoakes
authored andcommitted
[core] (cgroups 18/n) Moving the driver into the workers cgroup. (#57776)
For more details about the resource isolation project see #54703. This PR moves the driver into the workers cgroup when it registers with the NodeManager. Also updates the tests to reflect this. This now includes changes from #57800. --------- Signed-off-by: irabbani <[email protected]> Co-authored-by: Edward Oakes <[email protected]> Signed-off-by: elliot-barn <[email protected]>
1 parent 3982d4b commit 41e40fd

File tree

9 files changed

+84
-29
lines changed

9 files changed

+84
-29
lines changed

python/ray/_private/resource_isolation_config.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ def _validate_and_get_system_reserved_cpu(
179179
does not have enough available cpus.
180180
181181
"""
182-
available_system_cpus = utils.get_num_cpus()
182+
available_system_cpus = utils.get_num_cpus(truncate=False)
183183

184184
if available_system_cpus < ray_constants.DEFAULT_MIN_SYSTEM_RESERVED_CPU_CORES:
185185
raise ValueError(
@@ -220,9 +220,9 @@ def _validate_and_get_system_reserved_cpu(
220220
f"greater than or equal to {ray_constants.DEFAULT_MIN_SYSTEM_RESERVED_CPU_CORES}"
221221
)
222222

223-
if system_reserved_cpu > available_system_cpus:
223+
if system_reserved_cpu >= available_system_cpus:
224224
raise ValueError(
225-
f"The requested system_reserved_cpu={system_reserved_cpu} is greater than "
225+
f"The requested system_reserved_cpu={system_reserved_cpu} is greater than or equal to "
226226
f"the number of cpus available={available_system_cpus}. "
227227
"Pick a smaller number of cpu cores to reserve for ray system processes."
228228
)

python/ray/_private/utils.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -421,7 +421,8 @@ def _get_docker_cpus(
421421

422422
def get_num_cpus(
423423
override_docker_cpu_warning: bool = ENV_DISABLE_DOCKER_CPU_WARNING,
424-
) -> int:
424+
truncate: bool = True,
425+
) -> float:
425426
"""
426427
Get the number of CPUs available on this node.
427428
Depending on the situation, use multiprocessing.cpu_count() or cgroups.
@@ -432,6 +433,7 @@ def get_num_cpus(
432433
RAY_DISABLE_DOCKER_CPU_WARNING. By default, whether or not to log
433434
the warning is determined by the env variable
434435
RAY_DISABLE_DOCKER_CPU_WARNING.
436+
truncate: truncates the return value and drops the decimal part.
435437
"""
436438
cpu_count = multiprocessing.cpu_count()
437439
if os.environ.get("RAY_USE_MULTIPROCESSING_CPU_COUNT"):
@@ -473,7 +475,8 @@ def get_num_cpus(
473475
f"truncated from {docker_count} to "
474476
f"{int(docker_count)}."
475477
)
476-
docker_count = int(docker_count)
478+
if truncate:
479+
docker_count = int(docker_count)
477480
cpu_count = docker_count
478481

479482
except Exception:

python/ray/tests/resource_isolation/test_resource_isolation_config.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -89,12 +89,12 @@ def test_enabled_resource_isolation_with_default_config_picks_min_values(monkeyp
8989
# NOTE: if you change the DEFAULT_MIN_SYSTEM_* constants, you may need to modify this test.
9090
# if the total number of cpus is between [1,19] the system cgroup will a weight that is equal to 1 cpu core.
9191
# if the total amount of memory is between [0.5GB, 4.8GB] the system cgroup will get 0.5GB + object store memory.
92-
monkeypatch.setattr(utils, "get_num_cpus", lambda *args, **kwargs: 1)
92+
monkeypatch.setattr(utils, "get_num_cpus", lambda *args, **kwargs: 2)
9393
monkeypatch.setattr(
9494
common_utils, "get_system_memory", lambda *args, **kwargs: 0.5 * (1024**3)
9595
)
9696
config = ResourceIsolationConfig(enable_resource_isolation=True)
97-
assert config.system_reserved_cpu_weight == 10000
97+
assert config.system_reserved_cpu_weight == 5000
9898
assert config.system_reserved_memory == 500 * (1024**2)
9999

100100
monkeypatch.setattr(utils, "get_num_cpus", lambda *args, **kwargs: 19)
@@ -176,7 +176,7 @@ def test_enabled_with_resource_overrides_less_than_minimum_defaults_raise_value_
176176
)
177177

178178

179-
def test_enabled_with_resource_overrides_greater_than_available_resources_raise_value_error(
179+
def test_enabled_with_resource_overrides_gte_than_available_resources_raise_value_error(
180180
monkeypatch,
181181
):
182182
# The following values in ray_constants define the maximum reserved values to run ray with resource isolation.
@@ -186,11 +186,9 @@ def test_enabled_with_resource_overrides_greater_than_available_resources_raise_
186186
monkeypatch.setattr(utils, "get_num_cpus", lambda *args, **kwargs: 32)
187187
with pytest.raises(
188188
ValueError,
189-
match="The requested system_reserved_cpu=32.1 is greater than the number of cpus available=32",
189+
match="The requested system_reserved_cpu=32.0 is greater than or equal to the number of cpus available=32",
190190
):
191-
ResourceIsolationConfig(
192-
enable_resource_isolation=True, system_reserved_cpu=32.1
193-
)
191+
ResourceIsolationConfig(enable_resource_isolation=True, system_reserved_cpu=32)
194192

195193
monkeypatch.setattr(
196194
common_utils, "get_system_memory", lambda *args, **kwargs: 10 * (1024**3)

python/ray/tests/resource_isolation/test_resource_isolation_integration.py

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -224,14 +224,23 @@ def cleanup_test_suite():
224224
) as base_subtree_control_file:
225225
base_subtree_control_file.write("-cpu -memory")
226226
base_subtree_control_file.flush()
227-
# 2) Move processes back into the leaf cgroup.
227+
# 2) Move processes back into the root cgroup.
228228
with open(_ROOT_CGROUP / "cgroup.procs", "w") as root_procs_file, open(
229229
_LEAF_GROUP / "cgroup.procs", "r"
230230
) as leaf_procs_file:
231231
leaf_cgroup_lines = leaf_procs_file.readlines()
232232
for line in leaf_cgroup_lines:
233233
root_procs_file.write(line.strip())
234234
root_procs_file.flush()
235+
# 3) Move the current process back into the _ROOT_CGROUP
236+
with open(_ROOT_CGROUP / "cgroup.procs", "w") as root_procs_file, open(
237+
_TEST_CGROUP / "cgroup.procs", "r"
238+
) as test_procs_file:
239+
test_cgroup_lines = test_procs_file.readlines()
240+
for line in test_cgroup_lines:
241+
root_procs_file.write(line.strip())
242+
root_procs_file.flush()
243+
235244
# 3) Delete the cgroups.
236245
os.rmdir(_LEAF_GROUP)
237246
os.rmdir(_TEST_CGROUP)
@@ -431,9 +440,6 @@ def test_ray_cli_start_resource_isolation_creates_cgroup_hierarchy_and_cleans_up
431440
assert result.exit_code == 0
432441
resource_isolation_config.add_object_store_memory(object_store_memory)
433442
assert_cgroup_hierarchy_exists_for_node(node_id, resource_isolation_config)
434-
assert_system_processes_are_in_system_cgroup(
435-
node_id, resource_isolation_config, len(_EXPECTED_SYSTEM_PROCESSES_RAY_START)
436-
)
437443

438444
@ray.remote(num_cpus=1)
439445
class Actor:
@@ -447,12 +453,17 @@ def get_pid(self):
447453
for _ in range(num_cpus):
448454
actor_refs.append(Actor.remote())
449455
worker_pids = set()
456+
worker_pids.add(str(os.getpid()))
450457
for actor in actor_refs:
451458
worker_pids.add(str(ray.get(actor.get_pid.remote())))
459+
assert_system_processes_are_in_system_cgroup(
460+
node_id, resource_isolation_config, len(_EXPECTED_SYSTEM_PROCESSES_RAY_START)
461+
)
452462
assert_worker_processes_are_in_workers_cgroup(
453463
node_id, resource_isolation_config, worker_pids
454464
)
455465
runner.invoke(scripts.stop)
466+
456467
assert_cgroup_hierarchy_cleaned_up_for_node(node_id, resource_isolation_config)
457468

458469

@@ -492,9 +503,6 @@ def test_ray_init_resource_isolation_creates_cgroup_hierarchy_and_cleans_up(
492503
object_store_memory=object_store_memory,
493504
)
494505
assert_cgroup_hierarchy_exists_for_node(node_id, resource_isolation_config)
495-
assert_system_processes_are_in_system_cgroup(
496-
node_id, resource_isolation_config, len(_EXPECTED_SYSTEM_PROCESSES_RAY_INIT)
497-
)
498506

499507
@ray.remote(num_cpus=1)
500508
class Actor:
@@ -508,8 +516,12 @@ def get_pid(self):
508516
for _ in range(num_cpus):
509517
actor_refs.append(Actor.remote())
510518
worker_pids = set()
519+
worker_pids.add(str(os.getpid()))
511520
for actor in actor_refs:
512521
worker_pids.add(str(ray.get(actor.get_pid.remote())))
522+
assert_system_processes_are_in_system_cgroup(
523+
node_id, resource_isolation_config, len(_EXPECTED_SYSTEM_PROCESSES_RAY_INIT)
524+
)
513525
assert_worker_processes_are_in_workers_cgroup(
514526
node_id, resource_isolation_config, worker_pids
515527
)

src/ray/common/cgroup2/cgroup_manager.cc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -268,10 +268,11 @@ Status CgroupManager::Initialize(int64_t system_reserved_cpu_weight,
268268
RAY_RETURN_NOT_OK(cgroup_driver_->MoveAllProcesses(base_cgroup_, non_ray_cgroup_));
269269
RegisterMoveAllProcesses(non_ray_cgroup_, base_cgroup_);
270270

271-
// NOTE: Since the raylet does not own the lifecycle of all system processes,
272-
// there's no guarantee that there are no pids in the system leaf cgroup.
271+
// NOTE: Since the raylet does not own the lifecycle of all system or worker processes,
272+
// there's no guarantee that there are no pids in the system leaf or the workers cgroup.
273273
// Therefore, pids need to be migrated out of the system cgroup to delete it.
274274
RegisterMoveAllProcesses(system_leaf_cgroup_, base_cgroup_);
275+
RegisterMoveAllProcesses(workers_cgroup_, base_cgroup_);
275276

276277
std::array<const std::string *, 2> cpu_controlled_cgroups{&base_cgroup_, &node_cgroup_};
277278
std::array<const std::string *, 3> memory_controlled_cgroups{

src/ray/common/cgroup2/integration_tests/sysfs_cgroup_driver_integration_test.cc

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -679,4 +679,44 @@ TEST_F(SysFsCgroupDriverIntegrationTest,
679679
ASSERT_TRUE(terminate_s.ok()) << terminate_s.ToString();
680680
}
681681

682+
TEST_F(SysFsCgroupDriverIntegrationTest,
683+
AddProcessToCgroupSucceedsIfProcessAlreadyInCgroup) {
684+
auto cgroup_or_status = TempCgroupDirectory::Create(test_cgroup_path_, S_IRWXU);
685+
ASSERT_TRUE(cgroup_or_status.ok()) << cgroup_or_status.ToString();
686+
auto cgroup = std::move(cgroup_or_status.value());
687+
auto child_cgroup_or_status = TempCgroupDirectory::Create(cgroup->GetPath(), S_IRWXU);
688+
ASSERT_TRUE(child_cgroup_or_status.ok()) << child_cgroup_or_status.ToString();
689+
auto child_cgroup = std::move(child_cgroup_or_status.value());
690+
StatusOr<std::pair<pid_t, int>> child_process_s =
691+
StartChildProcessInCgroup(cgroup->GetPath());
692+
ASSERT_TRUE(child_process_s.ok()) << child_process_s.ToString();
693+
auto [child_pid, child_pidfd] = child_process_s.value();
694+
SysFsCgroupDriver driver;
695+
Status s =
696+
driver.AddProcessToCgroup(child_cgroup->GetPath(), std::to_string(child_pid));
697+
ASSERT_TRUE(s.ok()) << s.ToString();
698+
Status s2 =
699+
driver.AddProcessToCgroup(child_cgroup->GetPath(), std::to_string(child_pid));
700+
ASSERT_TRUE(s2.ok()) << s2.ToString();
701+
// Assert that the child's pid is actually in the new file.
702+
std::string child_cgroup_procs_file_path = child_cgroup->GetPath() +
703+
std::filesystem::path::preferred_separator +
704+
"cgroup.procs";
705+
std::ifstream child_cgroup_procs_file(child_cgroup_procs_file_path);
706+
ASSERT_TRUE(child_cgroup_procs_file.is_open())
707+
<< "Could not open file " << child_cgroup_procs_file_path << ".";
708+
std::unordered_set<int> child_cgroup_pids;
709+
int pid = -1;
710+
while (child_cgroup_procs_file >> pid) {
711+
ASSERT_FALSE(child_cgroup_procs_file.fail())
712+
<< "Unable to read pid from file " << child_cgroup_procs_file_path;
713+
child_cgroup_pids.emplace(pid);
714+
}
715+
EXPECT_EQ(child_cgroup_pids.size(), 1);
716+
EXPECT_TRUE(child_cgroup_pids.find(child_pid) != child_cgroup_pids.end());
717+
Status terminate_s =
718+
TerminateChildProcessAndWaitForTimeout(child_pid, child_pidfd, 5000);
719+
ASSERT_TRUE(terminate_s.ok()) << terminate_s.ToString();
720+
}
721+
682722
} // namespace ray

src/ray/common/cgroup2/sysfs_cgroup_driver.cc

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -331,8 +331,6 @@ Status SysFsCgroupDriver::DisableController(const std::string &cgroup_path,
331331
return Status::OK();
332332
}
333333

334-
// What's the right thing here? If the controller is specified?
335-
// The correct API would be specify where the controller should be enabled.
336334
Status SysFsCgroupDriver::AddConstraint(const std::string &cgroup_path,
337335
const std::string &constraint,
338336
const std::string &constraint_value) {

src/ray/common/cgroup2/tests/cgroup_manager_test.cc

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -282,11 +282,11 @@ TEST(CgroupManagerTest, CreateSucceedsWithCleanupInOrder) {
282282
}
283283

284284
// Processes must be moved third.
285-
// Processes were moved both out of the system_leaf cgroup and the non_ray
286-
// cgroup.
287-
ASSERT_EQ(processes_moved->size(), 2);
288-
std::array<std::string, 2> process_moved_cgroups{system_leaf_cgroup_path,
289-
non_ray_cgroup_path};
285+
// Processes were moved both out of the system_leaf, workers, and non_ray
286+
// cgroups.
287+
ASSERT_EQ(processes_moved->size(), 3);
288+
std::array<std::string, 3> process_moved_cgroups{
289+
system_leaf_cgroup_path, non_ray_cgroup_path, workers_cgroup_path};
290290

291291
// The order in which processes were moved back from leaf nodes to the base_cgroup
292292
// does not matter.

src/ray/raylet/node_manager.cc

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1184,7 +1184,10 @@ Status NodeManager::RegisterForNewDriver(
11841184
worker->SetProcess(Process::FromPid(pid));
11851185
rpc::JobConfig job_config;
11861186
job_config.ParseFromString(message->serialized_job_config()->str());
1187-
1187+
Status s = cgroup_manager_->AddProcessToWorkersCgroup(std::to_string(pid));
1188+
RAY_CHECK(s.ok()) << absl::StrFormat(
1189+
"Failed to move the driver process into the workers cgroup with error %s",
1190+
s.ToString());
11881191
return worker_pool_.RegisterDriver(worker, job_config, send_reply_callback);
11891192
}
11901193

0 commit comments

Comments
 (0)