Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -224,14 +224,23 @@ def cleanup_test_suite():
) as base_subtree_control_file:
base_subtree_control_file.write("-cpu -memory")
base_subtree_control_file.flush()
# 2) Move processes back into the leaf cgroup.
# 2) Move processes back into the root cgroup.
with open(_ROOT_CGROUP / "cgroup.procs", "w") as root_procs_file, open(
_LEAF_GROUP / "cgroup.procs", "r"
) as leaf_procs_file:
leaf_cgroup_lines = leaf_procs_file.readlines()
for line in leaf_cgroup_lines:
root_procs_file.write(line.strip())
root_procs_file.flush()
# 3) Move the current process back into the _ROOT_CGROUP
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This wasn't previously required because the Driver (for ray.init) was moved by the test script into the LEAF_CGROUP_ and never touched again.

However, now it will be moved by the raylet back into _TEST_CGROUP when ray stops.

with open(_ROOT_CGROUP / "cgroup.procs", "w") as root_procs_file, open(
_TEST_CGROUP / "cgroup.procs", "r"
) as test_procs_file:
test_cgroup_lines = test_procs_file.readlines()
for line in test_cgroup_lines:
root_procs_file.write(line.strip())
root_procs_file.flush()

# 3) Delete the cgroups.
os.rmdir(_LEAF_GROUP)
os.rmdir(_TEST_CGROUP)
Expand Down Expand Up @@ -431,9 +440,6 @@ def test_ray_cli_start_resource_isolation_creates_cgroup_hierarchy_and_cleans_up
assert result.exit_code == 0
resource_isolation_config.add_object_store_memory(object_store_memory)
assert_cgroup_hierarchy_exists_for_node(node_id, resource_isolation_config)
assert_system_processes_are_in_system_cgroup(
node_id, resource_isolation_config, len(_EXPECTED_SYSTEM_PROCESSES_RAY_START)
)

@ray.remote(num_cpus=1)
class Actor:
Expand All @@ -449,10 +455,16 @@ def get_pid(self):
worker_pids = set()
for actor in actor_refs:
worker_pids.add(str(ray.get(actor.get_pid.remote())))
assert_system_processes_are_in_system_cgroup(
node_id,
resource_isolation_config,
len(_EXPECTED_SYSTEM_PROCESSES_RAY_START) + 1,
)
assert_worker_processes_are_in_workers_cgroup(
node_id, resource_isolation_config, worker_pids
)
runner.invoke(scripts.stop)

assert_cgroup_hierarchy_cleaned_up_for_node(node_id, resource_isolation_config)


Expand Down Expand Up @@ -492,9 +504,6 @@ def test_ray_init_resource_isolation_creates_cgroup_hierarchy_and_cleans_up(
object_store_memory=object_store_memory,
)
assert_cgroup_hierarchy_exists_for_node(node_id, resource_isolation_config)
assert_system_processes_are_in_system_cgroup(
node_id, resource_isolation_config, len(_EXPECTED_SYSTEM_PROCESSES_RAY_INIT)
)

@ray.remote(num_cpus=1)
class Actor:
Expand All @@ -510,6 +519,9 @@ def get_pid(self):
worker_pids = set()
for actor in actor_refs:
worker_pids.add(str(ray.get(actor.get_pid.remote())))
assert_system_processes_are_in_system_cgroup(
node_id, resource_isolation_config, len(_EXPECTED_SYSTEM_PROCESSES_RAY_INIT) + 1
)
assert_worker_processes_are_in_workers_cgroup(
node_id, resource_isolation_config, worker_pids
)
Expand Down
6 changes: 5 additions & 1 deletion src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1183,7 +1183,11 @@ Status NodeManager::RegisterForNewDriver(
worker->SetProcess(Process::FromPid(pid));
rpc::JobConfig job_config;
job_config.ParseFromString(message->serialized_job_config()->str());

// The driver must start in the system cgroup.
Status s = cgroup_manager_->AddProcessToSystemCgroup(std::to_string(pid));
RAY_CHECK(s.ok()) << absl::StrFormat(
"Failed to move the driver process into the system cgroup with error %s",
s.ToString());
return worker_pool_.RegisterDriver(worker, job_config, send_reply_callback);
}

Expand Down