Skip to content

Commit b455703

Browse files
israbbaniedoakes
authored andcommitted
[core] (cgroups 11/n) Raylet will move system processes into cgroup on startup (#56522)
This PR stacks on #56352 . For more details about the resource isolation project see #54703. This PR the makes the raylet move the system processes into the system cgroup on startup if resource isolation is enabled. It introduces the following * A new raylet cli arg `--system-pids` which is a comma-separated string of pids of system processes that are started before the raylet. As of today, it contains * On the head node: gcs_server, dashboard_api_server, ray client server, monitor (autoscaler) * On every node (including head): process subreaper, log monitor. * End-to-end integration tests for resource isolation with the Ray SDK (`ray.init`) and the Ray CLI (`ray --start`) There are a few rough edges (I've added a comment on the PR where relevant): 1. The construction of ResourceIsolationConfig is spread across multiple call-sites (create the object, add the object store memory, add the system pids). The big positive of doing it this way was to fail fast on invalid user input (in scripts.py and worker.py). I think it needs to have at least two components: the user input (cgroup_path, system_reserved_memory, ...) and the derived input (system_pids, total_system_reserved_memory). 2. How to determine which processes should be moved? Right now I'm using `self.all_processes` in `node.py`. It _should_ contain all processes started so far, but there's no guarantee. 3. How intrusive should the integration test be? Should we count the number of pids inside the system cgroup? (This was answered in #56549) 4. How should a user setup multiple nodes on the same VM? I haven't written an integration test for it yet because there are multiple options for how to set this up. --------- Signed-off-by: irabbani <[email protected]> Co-authored-by: Edward Oakes <[email protected]> Signed-off-by: Douglas Strodtman <[email protected]>
1 parent c855106 commit b455703

File tree

7 files changed

+117
-76
lines changed

7 files changed

+117
-76
lines changed

python/ray/_private/node.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1185,6 +1185,10 @@ def start_raylet(
11851185
create_err=True,
11861186
)
11871187

1188+
self.resource_isolation_config.add_system_pids(
1189+
self._get_system_processes_for_resource_isolation()
1190+
)
1191+
11881192
process_info = ray._private.services.start_raylet(
11891193
self.redis_address,
11901194
self.gcs_address,
@@ -1427,6 +1431,15 @@ def start_ray_processes(self):
14271431

14281432
self.start_raylet(plasma_directory, fallback_directory, object_store_memory)
14291433

1434+
def _get_system_processes_for_resource_isolation(self) -> str:
1435+
"""Returns a list of system processes that will be isolated by raylet.
1436+
1437+
NOTE: If a new system process is started before the raylet starts up, it needs to be
1438+
added to self.all_processes so it can be moved into the raylet's managed cgroup
1439+
hierarchy.
1440+
"""
1441+
return ",".join(str(p[0].process.pid) for p in self.all_processes.values())
1442+
14301443
def _kill_process_type(
14311444
self,
14321445
process_type,

python/ray/_private/resource_isolation_config.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,10 @@ def __init__(
4343
system_reserved_cpu: Optional[float] = None,
4444
system_reserved_memory: Optional[int] = None,
4545
):
46-
4746
self._resource_isolation_enabled = enable_resource_isolation
4847
self.cgroup_path = cgroup_path
4948
self.system_reserved_memory = system_reserved_memory
49+
self.system_pids = ""
5050
# cgroupv2 cpu.weight calculated from system_reserved_cpu
5151
# assumes ray uses all available cores.
5252
self.system_reserved_cpu_weight: int = None
@@ -115,6 +115,10 @@ def add_object_store_memory(self, object_store_memory: int):
115115
)
116116
self._constructed = True
117117

118+
def add_system_pids(self, system_pids: str):
119+
"""A comma-separated list of pids to move into the system cgroup."""
120+
self.system_pids = system_pids
121+
118122
@staticmethod
119123
def _validate_and_get_cgroup_path(cgroup_path: Optional[str]) -> str:
120124
"""Returns the ray_constants.DEFAULT_CGROUP_PATH if cgroup_path is not

python/ray/_private/services.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1904,6 +1904,7 @@ def start_raylet(
19041904
command.append(
19051905
f"--system-reserved-memory-bytes={resource_isolation_config.system_reserved_memory}"
19061906
)
1907+
command.append(f"--system-pids={resource_isolation_config.system_pids}")
19071908

19081909
if raylet_stdout_filepath:
19091910
command.append(f"--stdout_filepath={raylet_stdout_filepath}")

python/ray/tests/resource_isolation/test_resource_isolation_integration.py

Lines changed: 68 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
1+
import os
12
import sys
23
from pathlib import Path
34

45
import pytest
56
from click.testing import CliRunner
67

78
import ray
8-
import ray._private.ray_constants as ray_constants
9-
import ray._private.utils as utils
109
import ray.scripts.scripts as scripts
1110
from ray._private.resource_isolation_config import ResourceIsolationConfig
1211

@@ -21,6 +20,7 @@
2120
#
2221
# Run these commands locally before running the test suite:
2322
# sudo mkdir -p /sys/fs/cgroup/resource_isolation_test
23+
# echo "+cpu +memory" | sudo tee -a /sys/fs/cgroup/resource_isolation_test/cgroup.subtree_control
2424
# sudo chown -R $(whoami):$(whoami) /sys/fs/cgroup/resource_isolation_test/
2525
# sudo chmod -R u+rwx /sys/fs/cgroup/resource_isolation_test/
2626
# echo $$ | sudo tee /sys/fs/cgroup/resource_isolation_test/cgroup.procs
@@ -32,44 +32,26 @@
3232
# _BASE_CGROUP_PATH = "/sys/fs/cgroup/resource_isolation_test"
3333

3434

35-
def test_resource_isolation_enabled_creates_cgroup_hierarchy(ray_start_cluster):
36-
cluster = ray_start_cluster
37-
base_cgroup = _BASE_CGROUP_PATH
38-
resource_isolation_config = ResourceIsolationConfig(
39-
enable_resource_isolation=True,
40-
cgroup_path=base_cgroup,
41-
system_reserved_memory=1024**3,
42-
system_reserved_cpu=1,
43-
)
44-
# Need to use a worker node because the driver cannot delete the head node.
45-
cluster.add_node(num_cpus=0)
46-
ray.init(address=cluster.address)
47-
48-
worker_node = cluster.add_node(
49-
num_cpus=1, resource_isolation_config=resource_isolation_config
50-
)
51-
worker_node_id = worker_node.node_id
52-
cluster.wait_for_nodes()
53-
54-
# Make sure the worker node is up and running.
55-
@ray.remote
56-
def task():
57-
return "hellodarknessmyoldfriend"
58-
59-
ray.get(task.remote(), timeout=5)
60-
61-
# TODO(#54703): This test is deliberately overspecified right now. The test shouldn't
62-
# care about the cgroup hierarchy. It should just verify that application and system processes
63-
# are started in a cgroup with the correct constraints. This will be updated once cgroup
64-
# process management is completed.
65-
node_cgroup = Path(base_cgroup) / f"ray_node_{worker_node_id}"
35+
# TODO(#54703): This test is deliberately overspecified right now. The test shouldn't
36+
# care about the cgroup hierarchy. It should just verify that application and system processes
37+
# are started in a cgroup with the correct constraints. This will be updated once cgroup
38+
# process management is completed.
39+
def assert_cgroup_hierarchy_exists_for_node(
40+
node_id: str, resource_isolation_config: ResourceIsolationConfig
41+
):
42+
base_cgroup_for_node = resource_isolation_config.cgroup_path
43+
node_cgroup = Path(base_cgroup_for_node) / f"ray_node_{node_id}"
6644
system_cgroup = node_cgroup / "system"
45+
system_leaf_cgroup = system_cgroup / "leaf"
6746
application_cgroup = node_cgroup / "application"
47+
application_leaf_cgroup = application_cgroup / "leaf"
6848

6949
# 1) Check that the cgroup hierarchy is created correctly for the node.
7050
assert node_cgroup.is_dir()
7151
assert system_cgroup.is_dir()
52+
assert system_leaf_cgroup.is_dir()
7253
assert application_cgroup.is_dir()
54+
assert application_leaf_cgroup.is_dir()
7355

7456
# 2) Verify the constraints are applied correctly.
7557
system_cgroup_memory_min = system_cgroup / "memory.min"
@@ -87,14 +69,24 @@ def task():
8769
10000 - resource_isolation_config.system_reserved_cpu_weight
8870
)
8971

90-
# 3) Gracefully shutting down the node cleans up everything. Don't need to check
91-
# everything. If the base_cgroup is deleted, then all clean up succeeded.
92-
cluster.remove_node(worker_node)
72+
# 3) Check to see that all system pids are inside the system cgroup
73+
system_leaf_cgroup_procs = system_leaf_cgroup / "cgroup.procs"
74+
# At least the raylet process is always moved.
75+
with open(system_leaf_cgroup_procs, "r") as cgroup_procs_file:
76+
lines = cgroup_procs_file.readlines()
77+
assert (
78+
len(lines) > 0
79+
), f"Expected only system process passed into the raylet. Found {lines}"
80+
81+
82+
def assert_cgroup_hierarchy_cleaned_up_for_node(
83+
node_id: str, resource_isolation_config: ResourceIsolationConfig
84+
):
85+
base_cgroup_for_node = resource_isolation_config.cgroup_path
86+
node_cgroup = Path(base_cgroup_for_node) / f"ray_node_{node_id}"
9387
assert not node_cgroup.is_dir()
9488

9589

96-
# The following tests will test integration of resource isolation
97-
# with the 'ray start' command.
9890
@pytest.fixture
9991
def cleanup_ray():
10092
"""Shutdown all ray instances"""
@@ -114,19 +106,41 @@ def test_ray_start_invalid_resource_isolation_config(cleanup_ray):
114106
assert isinstance(result.exception, ValueError)
115107

116108

117-
def test_ray_start_resource_isolation_config_default_values(monkeypatch, cleanup_ray):
118-
monkeypatch.setattr(utils, "get_num_cpus", lambda *args, **kwargs: 16)
119-
# The DEFAULT_CGROUP_PATH override is only relevant when running locally.
120-
monkeypatch.setattr(ray_constants, "DEFAULT_CGROUP_PATH", _BASE_CGROUP_PATH)
121-
109+
def test_ray_start_resource_isolation_creates_cgroup_hierarchy_and_cleans_up(
110+
monkeypatch, cleanup_ray
111+
):
112+
object_store_memory = 1024**3
113+
system_reserved_memory = 1024**3
114+
system_reserved_cpu = 1
115+
resource_isolation_config = ResourceIsolationConfig(
116+
cgroup_path=_BASE_CGROUP_PATH,
117+
enable_resource_isolation=True,
118+
system_reserved_cpu=system_reserved_cpu,
119+
system_reserved_memory=system_reserved_memory,
120+
)
121+
node_id = ray.NodeID.from_random().hex()
122+
os.environ["RAY_OVERRIDE_NODE_ID_FOR_TESTING"] = node_id
122123
runner = CliRunner()
123124
result = runner.invoke(
124125
scripts.start,
125-
["--head", "--enable-resource-isolation"],
126+
[
127+
"--head",
128+
"--enable-resource-isolation",
129+
"--cgroup-path",
130+
_BASE_CGROUP_PATH,
131+
"--system-reserved-cpu",
132+
system_reserved_cpu,
133+
"--system-reserved-memory",
134+
system_reserved_memory,
135+
"--object-store-memory",
136+
object_store_memory,
137+
],
126138
)
127-
# TODO(#54703): Need to rewrite this test to check for side-effects on the cgroup
128-
# hierarchy once the rest of the implemetation is complete.
129139
assert result.exit_code == 0
140+
resource_isolation_config.add_object_store_memory(object_store_memory)
141+
assert_cgroup_hierarchy_exists_for_node(node_id, resource_isolation_config)
142+
runner.invoke(scripts.stop)
143+
assert_cgroup_hierarchy_cleaned_up_for_node(node_id, resource_isolation_config)
130144

131145

132146
# The following tests will test integration of resource isolation
@@ -144,50 +158,31 @@ def test_ray_init_resource_isolation_disabled_by_default(ray_shutdown):
144158
assert not node.resource_isolation_config.is_enabled()
145159

146160

147-
def test_ray_init_with_resource_isolation_default_values(monkeypatch, ray_shutdown):
148-
total_system_cpu = 10
149-
monkeypatch.setattr(utils, "get_num_cpus", lambda *args, **kwargs: total_system_cpu)
150-
# The DEFAULT_CGROUP_PATH override is only relevant when running locally.
151-
monkeypatch.setattr(ray_constants, "DEFAULT_CGROUP_PATH", _BASE_CGROUP_PATH)
152-
ray.init(address="local", enable_resource_isolation=True)
153-
node = ray._private.worker._global_node
154-
assert node is not None
155-
assert node.resource_isolation_config.is_enabled()
156-
157-
158161
def test_ray_init_with_resource_isolation_override_defaults(ray_shutdown):
159-
cgroup_path = _BASE_CGROUP_PATH
160162
system_reserved_cpu = 1
161-
system_reserved_memory = 1 * 10**9
162-
object_store_memory = 1 * 10**9
163+
system_reserved_memory = 1024**3
164+
object_store_memory = 1024**3
163165
resource_isolation_config = ResourceIsolationConfig(
164166
enable_resource_isolation=True,
165-
cgroup_path=cgroup_path,
167+
cgroup_path=_BASE_CGROUP_PATH,
166168
system_reserved_cpu=system_reserved_cpu,
167169
system_reserved_memory=system_reserved_memory,
168170
)
169171
resource_isolation_config.add_object_store_memory(object_store_memory)
170172
ray.init(
171173
address="local",
172174
enable_resource_isolation=True,
173-
_cgroup_path=cgroup_path,
175+
_cgroup_path=_BASE_CGROUP_PATH,
174176
system_reserved_cpu=system_reserved_cpu,
175177
system_reserved_memory=system_reserved_memory,
176178
object_store_memory=object_store_memory,
177179
)
178180
node = ray._private.worker._global_node
179-
# TODO(#54703): Need to rewrite this test to check for side-effects on the cgroup
180-
# hierarchy once the rest of the implemetation is complete.
181181
assert node is not None
182-
assert node.resource_isolation_config.is_enabled()
183-
assert (
184-
node.resource_isolation_config.system_reserved_cpu_weight
185-
== resource_isolation_config.system_reserved_cpu_weight
186-
)
187-
assert (
188-
node.resource_isolation_config.system_reserved_memory
189-
== resource_isolation_config.system_reserved_memory
190-
)
182+
node_id = node.node_id
183+
assert_cgroup_hierarchy_exists_for_node(node_id, resource_isolation_config)
184+
ray.shutdown()
185+
assert_cgroup_hierarchy_cleaned_up_for_node(node_id, resource_isolation_config)
191186

192187

193188
if __name__ == "__main__":

src/ray/common/cgroup2/cgroup_manager.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,7 @@ Status CgroupManager::AddProcessToSystemCgroup(const std::string &pid) {
298298
// TODO(#54703): Add link to OSS documentation once available.
299299
RAY_CHECK(!s.IsNotFound()) << "Failed to move process " << pid << " into system cgroup "
300300
<< system_leaf_cgroup_
301-
<< "because the cgroup was not found. "
301+
<< " because the cgroup was not found. "
302302
"If resource isolation is enabled, Ray's cgroup "
303303
"hierarchy must not be modified "
304304
"while Ray is running.";

src/ray/raylet/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,7 @@ ray_cc_binary(
304304
"//src/ray/util:stream_redirection_options",
305305
"//src/ray/util:time",
306306
"@com_github_gflags_gflags//:gflags",
307+
"@com_google_absl//absl/strings",
307308
"@nlohmann_json",
308309
],
309310
)

src/ray/raylet/main.cc

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include <utility>
2222
#include <vector>
2323

24+
#include "absl/strings/str_split.h"
2425
#include "gflags/gflags.h"
2526
#include "nlohmann/json.hpp"
2627
#include "ray/common/asio/instrumented_io_context.h"
@@ -143,6 +144,10 @@ DEFINE_int64(system_reserved_memory_bytes,
143144
"be applied as a memory.min constraint to the system cgroup. If "
144145
"enable-resource-isolation is true, then this cannot be -1");
145146

147+
DEFINE_string(system_pids,
148+
"",
149+
"A comma-separated list of pids to move into the system cgroup.");
150+
146151
absl::flat_hash_map<std::string, std::string> parse_node_labels(
147152
const std::string &labels_json_str) {
148153
absl::flat_hash_map<std::string, std::string> labels;
@@ -253,6 +258,7 @@ int main(int argc, char *argv[]) {
253258
const std::string cgroup_path = FLAGS_cgroup_path;
254259
const int64_t system_reserved_cpu_weight = FLAGS_system_reserved_cpu_weight;
255260
const int64_t system_reserved_memory_bytes = FLAGS_system_reserved_memory_bytes;
261+
const std::string system_pids = FLAGS_system_pids;
256262

257263
RAY_CHECK_NE(FLAGS_cluster_id, "") << "Expected cluster ID.";
258264
ray::ClusterID cluster_id = ray::ClusterID::FromHex(FLAGS_cluster_id);
@@ -271,10 +277,11 @@ int main(int argc, char *argv[]) {
271277
"system_reserved_cpu_weight must be set to a value between [1,10000]";
272278
RAY_CHECK_NE(system_reserved_memory_bytes, -1)
273279
<< "Failed to start up raylet. If enable_resource_isolation is set to true, "
274-
"system_reserved_memory_byres must be set to a value > 0";
280+
"system_reserved_memory_bytes must be set to a value > 0";
275281

276282
std::unique_ptr<ray::SysFsCgroupDriver> cgroup_driver =
277283
std::make_unique<ray::SysFsCgroupDriver>();
284+
278285
ray::StatusOr<std::unique_ptr<ray::CgroupManager>> cgroup_manager_s =
279286
ray::CgroupManager::Create(std::move(cgroup_path),
280287
node_id,
@@ -294,6 +301,26 @@ int main(int argc, char *argv[]) {
294301
<< "Resource isolation with cgroups is only supported in linux. Please set "
295302
"enable_resource_isolation to false. This is likely a misconfiguration.";
296303
#endif
304+
305+
// Move system processes into the system cgroup.
306+
// TODO(#54703): This logic needs to be hardened and moved out of main.cc. E.g.
307+
// if system_pids is ",,,,,,", this will log an error for each empty
308+
// string.
309+
std::vector<std::string> system_pids_to_move;
310+
if (!system_pids.empty()) {
311+
system_pids_to_move = std::move(absl::StrSplit(system_pids, ","));
312+
}
313+
system_pids_to_move.emplace_back(std::to_string(ray::GetPID()));
314+
for (const auto &pid : system_pids_to_move) {
315+
ray::Status s = cgroup_manager->AddProcessToSystemCgroup(pid);
316+
// TODO(#54703): This could be upgraded to a RAY_CHECK.
317+
if (!s.ok()) {
318+
RAY_LOG(WARNING) << absl::StrFormat(
319+
"Failed to move process %s into system cgroup with error %s",
320+
pid,
321+
s.ToString());
322+
}
323+
}
297324
}
298325

299326
// Configuration for the node manager.

0 commit comments

Comments
 (0)