Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
6fc9652
[core] (cgroups 7/n) cleaning up old cgroup integration code for raylet
israbbani Sep 5, 2025
70ed06d
Merge branch 'irabbani/cgroups-6' into irabbani/cgroups-7
israbbani Sep 5, 2025
fd39d7e
removing a few more references
israbbani Sep 5, 2025
4e8e20c
Merge branch 'irabbani/cgroups-7' of github.com:ray-project/ray into …
israbbani Sep 5, 2025
9304014
[core] (cgroups 8/n) Wiring CgroupManager into the raylet. Creating
israbbani Sep 5, 2025
20bf0dd
Merge branch 'irabbani/cgroups-6' into irabbani/cgroups-7
israbbani Sep 6, 2025
699f3ba
Adding better error messages for creating the CgroupManager.
israbbani Sep 6, 2025
58e0c1a
Removing node_manager configs
israbbani Sep 6, 2025
e2e957d
unnecessary comment
israbbani Sep 6, 2025
9e7a2ef
Merge branch 'irabbani/cgroups-7' into irabbani/cgroups-8
israbbani Sep 6, 2025
f86a010
Merge branch 'irabbani/cgroups-6' into irabbani/cgroups-7
israbbani Sep 8, 2025
29a7ae4
Merge branch 'irabbani/cgroups-7' into irabbani/cgroups-8
israbbani Sep 8, 2025
a843dd4
Merge branch 'irabbani/cgroups-6' into irabbani/cgroups-7
israbbani Sep 8, 2025
43a180e
Merge branch 'irabbani/cgroups-7' into irabbani/cgroups-8
israbbani Sep 8, 2025
a3164a4
bad merge
israbbani Sep 8, 2025
9342ed5
Merge branch 'irabbani/cgroups-6' into irabbani/cgroups-8
israbbani Sep 9, 2025
5b89821
fixing ci
israbbani Sep 9, 2025
4af9d3f
Merge branch 'irabbani/cgroups-8' of github.com:ray-project/ray into …
israbbani Sep 9, 2025
74612b0
Merge branch 'irabbani/cgroups-6' into irabbani/cgroups-8
israbbani Sep 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions python/ray/_private/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2380,7 +2380,6 @@ def is_initialized() -> bool:
return ray._private.worker.global_worker.connected


# TODO(hjiang): Add cgroup path along with [enable_resource_isolation].
@with_connect_or_shutdown_lock
def connect(
node,
Expand All @@ -2399,7 +2398,6 @@ def connect(
worker_launch_time_ms: int = -1,
worker_launched_time_ms: int = -1,
debug_source: str = "",
enable_resource_isolation: bool = False,
):
"""Connect this worker to the raylet, to Plasma, and to GCS.

Expand Down Expand Up @@ -2428,7 +2426,6 @@ def connect(
finshes launching. If the worker is not launched by raylet (e.g.,
driver), this must be -1 (default value).
debug_source: Source information for `CoreWorker`, used for debugging and informational purpose, rather than functional purpose.
enable_resource_isolation: If true, core worker enables resource isolation by adding itself into appropriate cgroup.
"""
# Do some basic checking to make sure we didn't call ray.init twice.
error_message = "Perhaps you called ray.init twice by accident?"
Expand Down Expand Up @@ -2607,7 +2604,6 @@ def connect(
worker_launch_time_ms,
worker_launched_time_ms,
debug_source,
enable_resource_isolation,
)

if mode == SCRIPT_MODE:
Expand Down
1 change: 0 additions & 1 deletion python/ray/_private/workers/default_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,6 @@
ray_debugger_external=args.ray_debugger_external,
worker_launch_time_ms=args.worker_launch_time_ms,
worker_launched_time_ms=worker_launched_time_ms,
enable_resource_isolation=args.enable_resource_isolation,
)

worker = ray._private.worker.global_worker
Expand Down
3 changes: 1 addition & 2 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -2987,7 +2987,7 @@ cdef class CoreWorker:
local_mode, driver_name,
serialized_job_config, metrics_agent_port, runtime_env_hash,
startup_token, session_name, cluster_id, entrypoint,
worker_launch_time_ms, worker_launched_time_ms, debug_source, enable_resource_isolation):
worker_launch_time_ms, worker_launched_time_ms, debug_source):
self.is_local_mode = local_mode

cdef CCoreWorkerOptions options = CCoreWorkerOptions()
Expand Down Expand Up @@ -3043,7 +3043,6 @@ cdef class CoreWorker:
options.worker_launch_time_ms = worker_launch_time_ms
options.worker_launched_time_ms = worker_launched_time_ms
options.debug_source = debug_source
options.enable_resource_isolation = enable_resource_isolation
CCoreWorkerProcess.Initialize(options)

self.cgname_to_eventloop_dict = None
Expand Down
1 change: 0 additions & 1 deletion python/ray/includes/libcoreworker.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,6 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
int64_t worker_launch_time_ms
int64_t worker_launched_time_ms
c_string debug_source
c_bool enable_resource_isolation

cdef cppclass CCoreWorkerProcess "ray::core::CoreWorkerProcess":
@staticmethod
Expand Down
62 changes: 41 additions & 21 deletions src/ray/common/cgroup2/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,13 +1,40 @@
load("//bazel:ray.bzl", "ray_cc_library")

config_setting(
name = "is_linux",
constraint_values = ["@platforms//os:linux"],
)

# Public targets.
ray_cc_library(
name = "cgroup_manager",
srcs = select({
":is_linux": ["cgroup_manager.cc"],
"//conditions:default": ["noop_cgroup_manager.cc"],
}),
hdrs = ["cgroup_manager.h"],
visibility = ["//visibility:public"],
deps = [
":cgroup_driver_interface",
":cgroup_manager_interface",
"//src/ray/common:status",
"//src/ray/common:status_or",
] + select({
":is_linux": [
":scoped_cgroup_operation",
"//src/ray/util:logging",
"@com_google_absl//absl/strings",
],
"//conditions:default": [],
}),
)

ray_cc_library(
name = "cgroup_driver_interface",
hdrs = [
"cgroup_driver_interface.h",
],
target_compatible_with = [
"@platforms//os:linux",
],
visibility = ["//visibility:public"],
deps = [
"//src/ray/common:status",
"//src/ray/common:status_or",
Expand All @@ -19,51 +46,42 @@ ray_cc_library(
hdrs = [
"cgroup_manager_interface.h",
],
target_compatible_with = [
"@platforms//os:linux",
],
visibility = ["//visibility:public"],
deps = [
"//src/ray/common:status",
"//src/ray/common:status_or",
],
)

ray_cc_library(
name = "cgroup_manager",
srcs = ["cgroup_manager.cc"],
name = "sysfs_cgroup_driver",
srcs = ["sysfs_cgroup_driver.cc"],
hdrs = [
"cgroup_manager.h",
"scoped_cgroup_operation.h",
"sysfs_cgroup_driver.h",
],
target_compatible_with = [
"@platforms//os:linux",
],
visibility = ["//visibility:public"],
deps = [
":cgroup_driver_interface",
":cgroup_manager_interface",
"//src/ray/common:status",
"//src/ray/common:status_or",
"//src/ray/util:logging",
"@com_google_absl//absl/strings",
],
)

# Private Targets.
ray_cc_library(
name = "sysfs_cgroup_driver",
srcs = ["sysfs_cgroup_driver.cc"],
name = "scoped_cgroup_operation",
hdrs = [
"sysfs_cgroup_driver.h",
"scoped_cgroup_operation.h",
],
target_compatible_with = [
"@platforms//os:linux",
],
deps = [
":cgroup_driver_interface",
"//src/ray/common:status",
"//src/ray/common:status_or",
"//src/ray/util:logging",
"@com_google_absl//absl/strings",
],
visibility = [":__subpackages__"],
)

ray_cc_library(
Expand All @@ -74,6 +92,7 @@ ray_cc_library(
target_compatible_with = [
"@platforms//os:linux",
],
visibility = [":__subpackages__"],
deps = [
":cgroup_driver_interface",
"//src/ray/common:status",
Expand All @@ -87,6 +106,7 @@ ray_cc_library(
target_compatible_with = [
"@platforms//os:linux",
],
visibility = [":__subpackages__"],
deps = [
"//src/ray/common:id",
"//src/ray/common:status",
Expand Down
39 changes: 39 additions & 0 deletions src/ray/common/cgroup2/noop_cgroup_manager.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2025 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <memory>
#include <string>
#include <utility>

#include "ray/common/cgroup2/cgroup_driver_interface.h"
#include "ray/common/cgroup2/cgroup_manager.h"
#include "ray/common/status_or.h"

namespace ray {

CgroupManager::CgroupManager(std::string base_cgroup_path,
const std::string &node_id,
std::unique_ptr<CgroupDriverInterface> cgroup_driver) {}

CgroupManager::~CgroupManager() {}

StatusOr<std::unique_ptr<CgroupManager>> CgroupManager::Create(
std::string base_cgroup_path,
const std::string &node_id,
const int64_t system_reserved_cpu_weight,
const int64_t system_reserved_memory_bytes,
std::unique_ptr<CgroupDriverInterface> cgroup_driver) {
return std::unique_ptr<CgroupManager>(
new CgroupManager(base_cgroup_path, node_id, std::move(cgroup_driver)));
}
} // namespace ray
7 changes: 1 addition & 6 deletions src/ray/core_worker/core_worker_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,7 @@ struct CoreWorkerOptions {
entrypoint(""),
worker_launch_time_ms(-1),
worker_launched_time_ms(-1),
debug_source(""),
enable_resource_isolation(false) {}
debug_source("") {}

/// Type of this worker (i.e., DRIVER or WORKER).
WorkerType worker_type;
Expand Down Expand Up @@ -212,10 +211,6 @@ struct CoreWorkerOptions {
// Source information for `CoreWorker`, used for debugging and informational purpose,
// rather than functional purpose.
std::string debug_source;

// If true, core worker enables resource isolation through cgroupv2 by reserving
// resources for ray system processes.
bool enable_resource_isolation = false;
};
} // namespace core
} // namespace ray
7 changes: 0 additions & 7 deletions src/ray/core_worker/core_worker_process.cc
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,6 @@ std::shared_ptr<CoreWorker> CoreWorkerProcessImpl::CreateCoreWorker(
options.worker_type, worker_id, GetProcessJobID(options));
auto pid = getpid();

// Move worker process into cgroup on startup.
AppProcCgroupMetadata app_cgroup_metadata;
app_cgroup_metadata.pid = pid;
app_cgroup_metadata.max_memory = kUnlimitedCgroupMemory;
GetCgroupSetup(options.enable_resource_isolation)
.ApplyCgroupContext(app_cgroup_metadata);

RAY_LOG(DEBUG) << "Creating core worker with debug source: " << options.debug_source;

RAY_LOG(DEBUG).WithField(worker_id) << "Constructing CoreWorker";
Expand Down
3 changes: 2 additions & 1 deletion src/ray/raylet/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,8 @@ ray_cc_binary(
"//src/ray/common:lease",
"//src/ray/common:ray_config",
"//src/ray/common:status",
"//src/ray/common/cgroup:cgroup_manager",
"//src/ray/common/cgroup2:cgroup_manager",
"//src/ray/common/cgroup2:sysfs_cgroup_driver",
"//src/ray/gcs/gcs_client:gcs_client_lib",
"//src/ray/object_manager:ownership_object_directory",
"//src/ray/raylet/scheduling:cluster_lease_manager",
Expand Down
75 changes: 63 additions & 12 deletions src/ray/raylet/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@
#include "gflags/gflags.h"
#include "nlohmann/json.hpp"
#include "ray/common/asio/instrumented_io_context.h"
#include "ray/common/cgroup/cgroup_manager.h"
#include "ray/common/cgroup2/cgroup_manager.h"
#include "ray/common/cgroup2/sysfs_cgroup_driver.h"
#include "ray/common/constants.h"
#include "ray/common/id.h"
#include "ray/common/lease/lease.h"
#include "ray/common/ray_config.h"
#include "ray/common/status.h"
#include "ray/common/status_or.h"
#include "ray/gcs/gcs_client/gcs_client.h"
#include "ray/object_manager/ownership_object_directory.h"
#include "ray/raylet/local_object_manager.h"
Expand Down Expand Up @@ -96,12 +98,6 @@ DEFINE_int64(object_store_memory, -1, "The initial memory of the object store.")
DEFINE_string(node_name, "", "The user-provided identifier or name for this node.");
DEFINE_string(session_name, "", "The current Ray session name.");
DEFINE_string(cluster_id, "", "ID of the cluster, separate from observability.");
// TODO(hjiang): At the moment only enablement flag is added, I will add other flags for
// CPU and memory resource reservation in the followup PR.
DEFINE_bool(enable_resource_isolation,
false,
"Enable resource isolation through cgroupv2 by reserving resources for ray "
"system processes.");

#ifdef __linux__
DEFINE_string(plasma_directory,
Expand All @@ -117,6 +113,30 @@ DEFINE_bool(huge_pages, false, "Enable huge pages.");
DEFINE_string(labels,
"",
"Define the key-value format of node labels, which is a serialized JSON.");
DEFINE_bool(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking out loud, do we need this boolean flag? Could we infer it to be true if cgroup_path and other configs are present?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point. I see the argument for both. More flags means more configuration possibilities. In this case, I picked an enable_ flag because the config is complex and it simplifies invariant checking and understanding the user's intent.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I'm not particularly opinionated one way or the other, mostly I guess probing to see if there was a reason to this rhyme and it sounds like you had one in mind. So we can keep this for now on the notion that it'll make it easier to extend later should that come along.

enable_resource_isolation,
false,
"Enables resource isolation through cgroupv2. The raylet will create and "
"manage a cgroup hierarchy that separates system processes and worker processes "
"into separate cgroups.");
DEFINE_string(
cgroup_path,
"",
"Path of the cgroup that the raylet will take ownership of to create its cgorup "
"hierarchy. The raylet process must have read, write, and execute permission for "
"this path. If enable_resource_isolation is true, then this cannot be empty.");
DEFINE_int64(
system_reserved_cpu_weight,
-1,
"The amount of cores reserved for ray system processes. It will be applied "
"as a cpu.weight constraint to the system cgroup. 10000 - "
"system_reserved_cpu_weight will be applied as a constraint to the "
"application cgroup. If enable resource isolation is true, then this cannot be -1.");
DEFINE_int64(system_reserved_memory_bytes,
-1,
"The amount of memory in bytes reserved for ray system processes. It will "
"be applied as a memory.min constraint to the sytem cgroup. If enable "
"resource isolation is true, then this cannot be -1");

absl::flat_hash_map<std::string, std::string> parse_node_labels(
const std::string &labels_json_str) {
Expand Down Expand Up @@ -224,18 +244,50 @@ int main(int argc, char *argv[]) {
const std::string session_name = FLAGS_session_name;
const bool is_head_node = FLAGS_head;
const std::string labels_json_str = FLAGS_labels;
const bool enable_resource_isolation = FLAGS_enable_resource_isolation;
const std::string cgroup_path = FLAGS_cgroup_path;
const int64_t system_reserved_cpu_weight = FLAGS_system_reserved_cpu_weight;
const int64_t system_reserved_memory_bytes = FLAGS_system_reserved_memory_bytes;

RAY_CHECK_NE(FLAGS_cluster_id, "") << "Expected cluster ID.";
ray::ClusterID cluster_id = ray::ClusterID::FromHex(FLAGS_cluster_id);
RAY_LOG(INFO) << "Setting cluster ID to: " << cluster_id;
gflags::ShutDownCommandLineFlags();

// Get cgroup setup instance and perform necessary resource setup.
ray::GetCgroupSetup(FLAGS_enable_resource_isolation);
// TODO(#54703): Link OSS documentation once it's available in the error messages.
if (enable_resource_isolation) {
RAY_CHECK(!cgroup_path.empty())
<< "Failed to start up raylet. If enable_resource_isolation is set to true, "
"cgroup_path cannot be empty.";
RAY_CHECK_NE(system_reserved_cpu_weight, -1)
<< "Failed to start up raylet. If enable_resource_isolation is set to true, "
"system_reserved_cpu_weight must be set to a value between [1,10000]";
RAY_CHECK_NE(system_reserved_memory_bytes, -1)
<< "Failed to start up raylet. If enable_resource_isolation is set to true, "
"system_reserved_memory_byres must be set to a value > 0";

#ifndef __linux__
RAY_LOG(WARNING)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to do a system check to see if cgroups are enabled on the host?

Copy link
Contributor Author

@israbbani israbbani Sep 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This warning is for if the user is trying to enable resource isolation on a non-linux system. So Cgroups will not be enabled.

The line above this will create the appropriate target of CgroupManager based on the platform. If it's linux, it will fail if cgroups are not enabled on the host.

    RAY_CHECK(cgroup_manager.ok())
        << "Failed to start raylet. Could not create CgroupManager because of "
        << cgroup_manager.ToString();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I misspoke. The following is implemented in 9/n:

The line above this will create the appropriate target of CgroupManager based on the platform.

In this PR, we still do a system check if cgroups are enabled on the host in CgroupManager::Create.

<< "Resource isolation with cgroups is only supported in linux. Please set "
"enable_resource_isolation to false. This is likely a misconfiguration.";
#endif
Comment on lines +283 to +287
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought it would be useful to add a warning when resource isolation is enabled on non-linux platforms. It's likely a misconfiguration.

}

std::unique_ptr<ray::SysFsCgroupDriver> cgroup_driver;
ray::StatusOr<std::unique_ptr<ray::CgroupManager>> cgroup_manager =
ray::CgroupManager::Create(std::move(cgroup_path),
node_id,
system_reserved_cpu_weight,
system_reserved_memory_bytes,
std::move(cgroup_driver));

// TODO(#54703) - Link to OSS documentation once available.
RAY_CHECK(cgroup_manager.ok())
<< "Failed to start raylet. Could not create CgroupManager because of "
<< cgroup_manager.ToString();

// Configuration for the node manager.
ray::raylet::NodeManagerConfig node_manager_config;
node_manager_config.enable_resource_isolation = FLAGS_enable_resource_isolation;

absl::flat_hash_map<std::string, double> static_resource_conf;

Expand Down Expand Up @@ -542,8 +594,7 @@ int main(int argc, char *argv[]) {
/*starting_worker_timeout_callback=*/
[&] { cluster_lease_manager->ScheduleAndGrantLeases(); },
node_manager_config.ray_debugger_external,
/*get_time=*/[]() { return absl::Now(); },
node_manager_config.enable_resource_isolation);
/*get_time=*/[]() { return absl::Now(); });

client_call_manager = std::make_unique<ray::rpc::ClientCallManager>(
main_service, /*record_stats=*/true);
Expand Down
Loading