diff --git a/python/ray/_private/node.py b/python/ray/_private/node.py index 49a50e2ff4bb..55975b41b75f 100644 --- a/python/ray/_private/node.py +++ b/python/ray/_private/node.py @@ -1185,6 +1185,10 @@ def start_raylet( create_err=True, ) + self.resource_isolation_config.add_system_pids( + self._get_system_processes_for_resource_isolation() + ) + process_info = ray._private.services.start_raylet( self.redis_address, self.gcs_address, @@ -1427,6 +1431,15 @@ def start_ray_processes(self): self.start_raylet(plasma_directory, fallback_directory, object_store_memory) + def _get_system_processes_for_resource_isolation(self) -> str: + """Returns a list of system processes that will be isolated by raylet. + + NOTE: If a new system process is started before the raylet starts up, it needs to be + added to self.all_processes so it can be moved into the raylet's managed cgroup + hierarchy. + """ + return ",".join(str(p[0].process.pid) for p in self.all_processes.values()) + def _kill_process_type( self, process_type, diff --git a/python/ray/_private/resource_isolation_config.py b/python/ray/_private/resource_isolation_config.py index d8523f2e4927..9d12fa817363 100644 --- a/python/ray/_private/resource_isolation_config.py +++ b/python/ray/_private/resource_isolation_config.py @@ -43,10 +43,10 @@ def __init__( system_reserved_cpu: Optional[float] = None, system_reserved_memory: Optional[int] = None, ): - self._resource_isolation_enabled = enable_resource_isolation self.cgroup_path = cgroup_path self.system_reserved_memory = system_reserved_memory + self.system_pids = "" # cgroupv2 cpu.weight calculated from system_reserved_cpu # assumes ray uses all available cores. self.system_reserved_cpu_weight: int = None @@ -115,6 +115,10 @@ def add_object_store_memory(self, object_store_memory: int): ) self._constructed = True + def add_system_pids(self, system_pids: str): + """A comma-separated list of pids to move into the system cgroup.""" + self.system_pids = system_pids + @staticmethod def _validate_and_get_cgroup_path(cgroup_path: Optional[str]) -> str: """Returns the ray_constants.DEFAULT_CGROUP_PATH if cgroup_path is not diff --git a/python/ray/_private/services.py b/python/ray/_private/services.py index c69dec3151fd..5bc0efcecede 100644 --- a/python/ray/_private/services.py +++ b/python/ray/_private/services.py @@ -1904,6 +1904,7 @@ def start_raylet( command.append( f"--system-reserved-memory-bytes={resource_isolation_config.system_reserved_memory}" ) + command.append(f"--system-pids={resource_isolation_config.system_pids}") if raylet_stdout_filepath: command.append(f"--stdout_filepath={raylet_stdout_filepath}") diff --git a/python/ray/tests/resource_isolation/test_resource_isolation_integration.py b/python/ray/tests/resource_isolation/test_resource_isolation_integration.py index f8d2b01bf0be..343d2ce81ba6 100644 --- a/python/ray/tests/resource_isolation/test_resource_isolation_integration.py +++ b/python/ray/tests/resource_isolation/test_resource_isolation_integration.py @@ -1,3 +1,4 @@ +import os import sys from pathlib import Path @@ -5,8 +6,6 @@ from click.testing import CliRunner import ray -import ray._private.ray_constants as ray_constants -import ray._private.utils as utils import ray.scripts.scripts as scripts from ray._private.resource_isolation_config import ResourceIsolationConfig @@ -21,6 +20,7 @@ # # Run these commands locally before running the test suite: # sudo mkdir -p /sys/fs/cgroup/resource_isolation_test +# echo "+cpu +memory" | sudo tee -a /sys/fs/cgroup/resource_isolation_test/cgroup.subtree_control # sudo chown -R $(whoami):$(whoami) /sys/fs/cgroup/resource_isolation_test/ # sudo chmod -R u+rwx /sys/fs/cgroup/resource_isolation_test/ # echo $$ | sudo tee /sys/fs/cgroup/resource_isolation_test/cgroup.procs @@ -32,44 +32,26 @@ # _BASE_CGROUP_PATH = "/sys/fs/cgroup/resource_isolation_test" -def test_resource_isolation_enabled_creates_cgroup_hierarchy(ray_start_cluster): - cluster = ray_start_cluster - base_cgroup = _BASE_CGROUP_PATH - resource_isolation_config = ResourceIsolationConfig( - enable_resource_isolation=True, - cgroup_path=base_cgroup, - system_reserved_memory=1024**3, - system_reserved_cpu=1, - ) - # Need to use a worker node because the driver cannot delete the head node. - cluster.add_node(num_cpus=0) - ray.init(address=cluster.address) - - worker_node = cluster.add_node( - num_cpus=1, resource_isolation_config=resource_isolation_config - ) - worker_node_id = worker_node.node_id - cluster.wait_for_nodes() - - # Make sure the worker node is up and running. - @ray.remote - def task(): - return "hellodarknessmyoldfriend" - - ray.get(task.remote(), timeout=5) - - # TODO(#54703): This test is deliberately overspecified right now. The test shouldn't - # care about the cgroup hierarchy. It should just verify that application and system processes - # are started in a cgroup with the correct constraints. This will be updated once cgroup - # process management is completed. - node_cgroup = Path(base_cgroup) / f"ray_node_{worker_node_id}" +# TODO(#54703): This test is deliberately overspecified right now. The test shouldn't +# care about the cgroup hierarchy. It should just verify that application and system processes +# are started in a cgroup with the correct constraints. This will be updated once cgroup +# process management is completed. +def assert_cgroup_hierarchy_exists_for_node( + node_id: str, resource_isolation_config: ResourceIsolationConfig +): + base_cgroup_for_node = resource_isolation_config.cgroup_path + node_cgroup = Path(base_cgroup_for_node) / f"ray_node_{node_id}" system_cgroup = node_cgroup / "system" + system_leaf_cgroup = system_cgroup / "leaf" application_cgroup = node_cgroup / "application" + application_leaf_cgroup = application_cgroup / "leaf" # 1) Check that the cgroup hierarchy is created correctly for the node. assert node_cgroup.is_dir() assert system_cgroup.is_dir() + assert system_leaf_cgroup.is_dir() assert application_cgroup.is_dir() + assert application_leaf_cgroup.is_dir() # 2) Verify the constraints are applied correctly. system_cgroup_memory_min = system_cgroup / "memory.min" @@ -87,14 +69,24 @@ def task(): 10000 - resource_isolation_config.system_reserved_cpu_weight ) - # 3) Gracefully shutting down the node cleans up everything. Don't need to check - # everything. If the base_cgroup is deleted, then all clean up succeeded. - cluster.remove_node(worker_node) + # 3) Check to see that all system pids are inside the system cgroup + system_leaf_cgroup_procs = system_leaf_cgroup / "cgroup.procs" + # At least the raylet process is always moved. + with open(system_leaf_cgroup_procs, "r") as cgroup_procs_file: + lines = cgroup_procs_file.readlines() + assert ( + len(lines) > 0 + ), f"Expected only system process passed into the raylet. Found {lines}" + + +def assert_cgroup_hierarchy_cleaned_up_for_node( + node_id: str, resource_isolation_config: ResourceIsolationConfig +): + base_cgroup_for_node = resource_isolation_config.cgroup_path + node_cgroup = Path(base_cgroup_for_node) / f"ray_node_{node_id}" assert not node_cgroup.is_dir() -# The following tests will test integration of resource isolation -# with the 'ray start' command. @pytest.fixture def cleanup_ray(): """Shutdown all ray instances""" @@ -114,19 +106,41 @@ def test_ray_start_invalid_resource_isolation_config(cleanup_ray): assert isinstance(result.exception, ValueError) -def test_ray_start_resource_isolation_config_default_values(monkeypatch, cleanup_ray): - monkeypatch.setattr(utils, "get_num_cpus", lambda *args, **kwargs: 16) - # The DEFAULT_CGROUP_PATH override is only relevant when running locally. - monkeypatch.setattr(ray_constants, "DEFAULT_CGROUP_PATH", _BASE_CGROUP_PATH) - +def test_ray_start_resource_isolation_creates_cgroup_hierarchy_and_cleans_up( + monkeypatch, cleanup_ray +): + object_store_memory = 1024**3 + system_reserved_memory = 1024**3 + system_reserved_cpu = 1 + resource_isolation_config = ResourceIsolationConfig( + cgroup_path=_BASE_CGROUP_PATH, + enable_resource_isolation=True, + system_reserved_cpu=system_reserved_cpu, + system_reserved_memory=system_reserved_memory, + ) + node_id = ray.NodeID.from_random().hex() + os.environ["RAY_OVERRIDE_NODE_ID_FOR_TESTING"] = node_id runner = CliRunner() result = runner.invoke( scripts.start, - ["--head", "--enable-resource-isolation"], + [ + "--head", + "--enable-resource-isolation", + "--cgroup-path", + _BASE_CGROUP_PATH, + "--system-reserved-cpu", + system_reserved_cpu, + "--system-reserved-memory", + system_reserved_memory, + "--object-store-memory", + object_store_memory, + ], ) - # TODO(#54703): Need to rewrite this test to check for side-effects on the cgroup - # hierarchy once the rest of the implemetation is complete. assert result.exit_code == 0 + resource_isolation_config.add_object_store_memory(object_store_memory) + assert_cgroup_hierarchy_exists_for_node(node_id, resource_isolation_config) + runner.invoke(scripts.stop) + assert_cgroup_hierarchy_cleaned_up_for_node(node_id, resource_isolation_config) # The following tests will test integration of resource isolation @@ -144,25 +158,13 @@ def test_ray_init_resource_isolation_disabled_by_default(ray_shutdown): assert not node.resource_isolation_config.is_enabled() -def test_ray_init_with_resource_isolation_default_values(monkeypatch, ray_shutdown): - total_system_cpu = 10 - monkeypatch.setattr(utils, "get_num_cpus", lambda *args, **kwargs: total_system_cpu) - # The DEFAULT_CGROUP_PATH override is only relevant when running locally. - monkeypatch.setattr(ray_constants, "DEFAULT_CGROUP_PATH", _BASE_CGROUP_PATH) - ray.init(address="local", enable_resource_isolation=True) - node = ray._private.worker._global_node - assert node is not None - assert node.resource_isolation_config.is_enabled() - - def test_ray_init_with_resource_isolation_override_defaults(ray_shutdown): - cgroup_path = _BASE_CGROUP_PATH system_reserved_cpu = 1 - system_reserved_memory = 1 * 10**9 - object_store_memory = 1 * 10**9 + system_reserved_memory = 1024**3 + object_store_memory = 1024**3 resource_isolation_config = ResourceIsolationConfig( enable_resource_isolation=True, - cgroup_path=cgroup_path, + cgroup_path=_BASE_CGROUP_PATH, system_reserved_cpu=system_reserved_cpu, system_reserved_memory=system_reserved_memory, ) @@ -170,24 +172,17 @@ def test_ray_init_with_resource_isolation_override_defaults(ray_shutdown): ray.init( address="local", enable_resource_isolation=True, - _cgroup_path=cgroup_path, + _cgroup_path=_BASE_CGROUP_PATH, system_reserved_cpu=system_reserved_cpu, system_reserved_memory=system_reserved_memory, object_store_memory=object_store_memory, ) node = ray._private.worker._global_node - # TODO(#54703): Need to rewrite this test to check for side-effects on the cgroup - # hierarchy once the rest of the implemetation is complete. assert node is not None - assert node.resource_isolation_config.is_enabled() - assert ( - node.resource_isolation_config.system_reserved_cpu_weight - == resource_isolation_config.system_reserved_cpu_weight - ) - assert ( - node.resource_isolation_config.system_reserved_memory - == resource_isolation_config.system_reserved_memory - ) + node_id = node.node_id + assert_cgroup_hierarchy_exists_for_node(node_id, resource_isolation_config) + ray.shutdown() + assert_cgroup_hierarchy_cleaned_up_for_node(node_id, resource_isolation_config) if __name__ == "__main__": diff --git a/src/ray/common/cgroup2/cgroup_manager.cc b/src/ray/common/cgroup2/cgroup_manager.cc index 4e696490b010..9a808305c365 100644 --- a/src/ray/common/cgroup2/cgroup_manager.cc +++ b/src/ray/common/cgroup2/cgroup_manager.cc @@ -298,7 +298,7 @@ Status CgroupManager::AddProcessToSystemCgroup(const std::string &pid) { // TODO(#54703): Add link to OSS documentation once available. RAY_CHECK(!s.IsNotFound()) << "Failed to move process " << pid << " into system cgroup " << system_leaf_cgroup_ - << "because the cgroup was not found. " + << " because the cgroup was not found. " "If resource isolation is enabled, Ray's cgroup " "hierarchy must not be modified " "while Ray is running."; diff --git a/src/ray/raylet/BUILD.bazel b/src/ray/raylet/BUILD.bazel index c3d618c73a3c..342c27452708 100644 --- a/src/ray/raylet/BUILD.bazel +++ b/src/ray/raylet/BUILD.bazel @@ -304,6 +304,7 @@ ray_cc_binary( "//src/ray/util:stream_redirection_options", "//src/ray/util:time", "@com_github_gflags_gflags//:gflags", + "@com_google_absl//absl/strings", "@nlohmann_json", ], ) diff --git a/src/ray/raylet/main.cc b/src/ray/raylet/main.cc index 4fc3b82559f2..06e96126d151 100644 --- a/src/ray/raylet/main.cc +++ b/src/ray/raylet/main.cc @@ -21,6 +21,7 @@ #include #include +#include "absl/strings/str_split.h" #include "gflags/gflags.h" #include "nlohmann/json.hpp" #include "ray/common/asio/instrumented_io_context.h" @@ -143,6 +144,10 @@ DEFINE_int64(system_reserved_memory_bytes, "be applied as a memory.min constraint to the system cgroup. If " "enable-resource-isolation is true, then this cannot be -1"); +DEFINE_string(system_pids, + "", + "A comma-separated list of pids to move into the system cgroup."); + absl::flat_hash_map parse_node_labels( const std::string &labels_json_str) { absl::flat_hash_map labels; @@ -253,6 +258,7 @@ int main(int argc, char *argv[]) { const std::string cgroup_path = FLAGS_cgroup_path; const int64_t system_reserved_cpu_weight = FLAGS_system_reserved_cpu_weight; const int64_t system_reserved_memory_bytes = FLAGS_system_reserved_memory_bytes; + const std::string system_pids = FLAGS_system_pids; RAY_CHECK_NE(FLAGS_cluster_id, "") << "Expected cluster ID."; ray::ClusterID cluster_id = ray::ClusterID::FromHex(FLAGS_cluster_id); @@ -271,10 +277,11 @@ int main(int argc, char *argv[]) { "system_reserved_cpu_weight must be set to a value between [1,10000]"; RAY_CHECK_NE(system_reserved_memory_bytes, -1) << "Failed to start up raylet. If enable_resource_isolation is set to true, " - "system_reserved_memory_byres must be set to a value > 0"; + "system_reserved_memory_bytes must be set to a value > 0"; std::unique_ptr cgroup_driver = std::make_unique(); + ray::StatusOr> cgroup_manager_s = ray::CgroupManager::Create(std::move(cgroup_path), node_id, @@ -294,6 +301,26 @@ int main(int argc, char *argv[]) { << "Resource isolation with cgroups is only supported in linux. Please set " "enable_resource_isolation to false. This is likely a misconfiguration."; #endif + + // Move system processes into the system cgroup. + // TODO(#54703): This logic needs to be hardened and moved out of main.cc. E.g. + // if system_pids is ",,,,,,", this will log an error for each empty + // string. + std::vector system_pids_to_move; + if (!system_pids.empty()) { + system_pids_to_move = std::move(absl::StrSplit(system_pids, ",")); + } + system_pids_to_move.emplace_back(std::to_string(ray::GetPID())); + for (const auto &pid : system_pids_to_move) { + ray::Status s = cgroup_manager->AddProcessToSystemCgroup(pid); + // TODO(#54703): This could be upgraded to a RAY_CHECK. + if (!s.ok()) { + RAY_LOG(WARNING) << absl::StrFormat( + "Failed to move process %s into system cgroup with error %s", + pid, + s.ToString()); + } + } } // Configuration for the node manager.