Skip to content

Commit 6044577

Browse files
israbbaniedoakes
andauthored
[core] (cgroups 12/n) Raylet will start worker processes in the application cgroup (#56549)
This PR stacks on #56522 . For more details about the resource isolation project see #54703. This PR the makes the raylet move runtime_env and dashboard agents into the system cgroup. Workers are now spawned inside the application cgroup. It introduces the following: * I've added a new target `raylet_cgroup_types` which defines the type used all functions that need to add a process to a cgroup. * A new parameter is added to `NodeManager`, `WorkerPool`, `AgentManager`, and `Process` constructors. The parameter is a callback that will use the CgroupManager to add a process to the respective cgroup. * The callback is created in `main.cc`. * `main.cc` owns CgroupManager because it needs to outlive the `WorkerPool`. * `process.c` calls the callback after fork() in the child process so nothing else can happen in the forked process before it's moved into the correct cgroup. * Integration tests in python for end-to-end testing of cgroups with system and application processes moved into their respective cgroups. The tests are inside `python/ray/tests/resource_isolation/test_resource_isolation_integration.py` and have similar setup/teardown to the C++ integration tests introduced in #55063. --------- Signed-off-by: Ibrahim Rabbani <[email protected]> Co-authored-by: Edward Oakes <[email protected]>
1 parent b51e4e4 commit 6044577

File tree

16 files changed

+586
-131
lines changed

16 files changed

+586
-131
lines changed

python/ray/tests/resource_isolation/test_resource_isolation_integration.py

Lines changed: 368 additions & 43 deletions
Large diffs are not rendered by default.

src/ray/common/cgroup2/cgroup_manager.cc

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -289,25 +289,31 @@ Status CgroupManager::Initialize(int64_t system_reserved_cpu_weight,
289289
cpu_weight_constraint_.name_,
290290
std::to_string(application_cgroup_cpu_weight)));
291291
RegisterRemoveConstraint(application_cgroup_, cpu_weight_constraint_);
292-
293292
return Status::OK();
294293
}
295294

296-
Status CgroupManager::AddProcessToSystemCgroup(const std::string &pid) {
297-
Status s = cgroup_driver_->AddProcessToCgroup(system_leaf_cgroup_, pid);
295+
Status CgroupManager::AddProcessToCgroup(const std::string &cgroup,
296+
const std::string &pid) {
297+
Status s = cgroup_driver_->AddProcessToCgroup(cgroup, pid);
298298
// TODO(#54703): Add link to OSS documentation once available.
299-
RAY_CHECK(!s.IsNotFound()) << "Failed to move process " << pid << " into system cgroup "
300-
<< system_leaf_cgroup_
301-
<< " because the cgroup was not found. "
302-
"If resource isolation is enabled, Ray's cgroup "
303-
"hierarchy must not be modified "
304-
"while Ray is running.";
299+
RAY_CHECK(!s.IsNotFound())
300+
<< "Failed to move process " << pid << " into cgroup " << cgroup
301+
<< " because the cgroup was not found. If resource isolation is enabled, Ray's "
302+
"cgroup hierarchy must not be modified while Ray is running.";
305303
RAY_CHECK(!s.IsPermissionDenied())
306-
<< "Failed to move process " << pid << " into system cgroup " << system_leaf_cgroup_
304+
<< "Failed to move process " << pid << " into cgroup " << cgroup
307305
<< " because Ray does not have read, write, and execute "
308306
"permissions for the cgroup. If resource isolation is enabled, Ray's cgroup "
309307
"hierarchy must not be modified while Ray is running.";
310-
311308
return s;
312309
}
310+
311+
Status CgroupManager::AddProcessToApplicationCgroup(const std::string &pid) {
312+
return AddProcessToCgroup(application_leaf_cgroup_, pid);
313+
}
314+
315+
Status CgroupManager::AddProcessToSystemCgroup(const std::string &pid) {
316+
return AddProcessToCgroup(system_leaf_cgroup_, pid);
317+
}
318+
313319
} // namespace ray

src/ray/common/cgroup2/cgroup_manager.h

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ class CgroupManager : public CgroupManagerInterface {
7070
CgroupManager &operator=(CgroupManager &&);
7171

7272
/**
73-
Moves the process into the system leaf cgroup (@see
73+
Moves the process into the application leaf cgroup (@see
7474
CgroupManagerInterface::kLeafCgroupName).
7575
7676
To move the pid, the process must have read, write, and execute permissions for the
@@ -81,6 +81,25 @@ class CgroupManager : public CgroupManagerInterface {
8181
TODO(#54703): There currently is not a good way to signal to the caller that
8282
the method can cause a FATAL error. Revisit this once we've settled on a pattern.
8383
84+
NOTE: If the process does not have adequate cgroup permissions or the application leaf
85+
cgroup does not exist, this will fail a RAY_CHECK.
86+
87+
@param pid of the process to move into the application leaf cgroup.
88+
89+
@return Status::OK if pid moved successfully.
90+
@return Status::NotFound if the application cgroup does not exist.
91+
*/
92+
Status AddProcessToApplicationCgroup(const std::string &pid) override;
93+
94+
/**
95+
Moves the process into the system leaf cgroup (@see
96+
CgroupManagerInterface::kLeafCgroupName).
97+
98+
To move the pid, the process must have read, write, and execute permissions for the
99+
1) the cgroup the pid is currently in i.e. the source cgroup.
100+
2) the system leaf cgroup i.e. the destination cgroup.
101+
3) the lowest common ancestor of the source and destination cgroups.
102+
84103
NOTE: If the process does not have adequate cgroup permissions or the system leaf
85104
cgroup does not exist, this will fail a RAY_CHECK.
86105
@@ -107,6 +126,24 @@ class CgroupManager : public CgroupManagerInterface {
107126
const std::string &node_id,
108127
std::unique_ptr<CgroupDriverInterface> cgroup_driver);
109128

129+
/**
130+
Moves the process into the specified cgroup.
131+
132+
To move the pid, the process must have read, write, and execute permissions for the
133+
1) the cgroup the pid is currently in i.e. the source cgroup.
134+
2) the system leaf cgroup i.e. the destination cgroup.
135+
3) the lowest common ancestor of the source and destination cgroups.
136+
137+
NOTE: If the process does not have adequate cgroup permissions or the system leaf
138+
cgroup does not exist, this will fail a RAY_CHECK.
139+
140+
@param pid of the process to move into the system leaf cgroup.
141+
142+
@return Status::OK if pid moved successfully.
143+
@return Status::NotFound if the system cgroup does not exist.
144+
*/
145+
Status AddProcessToCgroup(const std::string &cgroup, const std::string &pid);
146+
110147
/**
111148
Performs the following operations:
112149

src/ray/common/cgroup2/cgroup_manager_interface.h

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,26 @@ namespace ray {
4242
*/
4343
class CgroupManagerInterface {
4444
public:
45-
// TODO(#54703): These will be implemented in a later PR to move processes
46-
// into a cgroup.
47-
// virtual Status AddProcessToApplicationCgroup(int) = 0;
45+
/*
46+
Moves the process into the application leaf cgroup (@see kLeafCgroupName).
47+
48+
To move the pid, the process must have read, write, and execute permissions for the
49+
1) the cgroup the pid is currently in i.e. the source cgroup.
50+
2) the system leaf cgroup i.e. the destination cgroup.
51+
3) the lowest common ancestor of the source and destination cgroups.
52+
53+
TODO(#54703): There currently is not a good way to signal to the caller that
54+
the method can cause a FATAL error. Revisit this once we've settled on a pattern.
55+
56+
NOTE: If the process does not have adequate cgroup permissions or the application leaf
57+
cgroup does not exist, this will fail a RAY_CHECK.
58+
59+
@param pid of the process to move into the system leaf cgroup.
60+
61+
@return Status::OK if pid moved successfully.
62+
@return Status::NotFound if the application cgroup does not exist.
63+
*/
64+
virtual Status AddProcessToApplicationCgroup(const std::string &pid) = 0;
4865

4966
/**
5067
Moves the process into the system leaf cgroup (@see kLeafCgroupName).

src/ray/common/cgroup2/noop_cgroup_manager.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,4 +41,8 @@ Status CgroupManager::AddProcessToSystemCgroup(const std::string &pid) {
4141
return Status::OK();
4242
}
4343

44+
Status CgroupManager::AddProcessToApplicationCgroup(const std::string &pid) {
45+
return Status::OK();
46+
}
47+
4448
} // namespace ray

src/ray/raylet/BUILD.bazel

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,6 @@ ray_cc_library(
221221
"//src/ray/common:flatbuf_utils",
222222
"//src/ray/common:lease",
223223
"//src/ray/common:memory_monitor",
224-
"//src/ray/common/cgroup2:cgroup_manager_interface",
225224
"//src/ray/core_worker:experimental_mutable_object_provider",
226225
"//src/ray/core_worker_rpc_client:core_worker_client_pool",
227226
"//src/ray/flatbuffers:node_manager_generated",

src/ray/raylet/agent_manager.cc

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include <memory>
1818
#include <string>
1919
#include <thread>
20+
#include <utility>
2021
#include <vector>
2122

2223
#include "ray/common/ray_config.h"
@@ -27,7 +28,7 @@
2728
namespace ray {
2829
namespace raylet {
2930

30-
void AgentManager::StartAgent() {
31+
void AgentManager::StartAgent(AddProcessToCgroupHook add_to_cgroup) {
3132
std::vector<const char *> argv;
3233
argv.reserve(options_.agent_commands.size());
3334
for (const std::string &arg : options_.agent_commands) {
@@ -67,7 +68,8 @@ void AgentManager::StartAgent() {
6768
false,
6869
env,
6970
/*pipe_to_stdin*/
70-
RayConfig::instance().enable_pipe_based_agent_to_parent_health_check());
71+
RayConfig::instance().enable_pipe_based_agent_to_parent_health_check(),
72+
std::move(add_to_cgroup));
7173
if (!process_.IsValid() || ec) {
7274
// The worker failed to start. This is a fatal error.
7375
RAY_LOG(FATAL) << "Failed to start agent " << options_.agent_name

src/ray/raylet/agent_manager.h

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ namespace raylet {
3333
using DelayExecutorFn = std::function<std::shared_ptr<boost::asio::deadline_timer>(
3434
std::function<void()>, uint32_t)>;
3535

36+
// TODO(#54703): Put this type in a separate target.
37+
using AddProcessToCgroupHook = std::function<void(const std::string &)>;
38+
3639
// Manages a separate "Agent" process. In constructor (or the `StartAgent` method) it
3740
// starts a process with `agent_commands` plus some additional arguments.
3841
//
@@ -59,7 +62,8 @@ class AgentManager {
5962
Options options,
6063
DelayExecutorFn delay_executor,
6164
std::function<void(const rpc::NodeDeathInfo &)> shutdown_raylet_gracefully,
62-
bool start_agent = true /* for test */)
65+
bool start_agent = true /* for test */,
66+
AddProcessToCgroupHook add_to_cgroup = [](const std::string &) {})
6367
: options_(std::move(options)),
6468
delay_executor_(std::move(delay_executor)),
6569
shutdown_raylet_gracefully_(std::move(shutdown_raylet_gracefully)),
@@ -71,13 +75,13 @@ class AgentManager {
7175
RAY_LOG(FATAL) << "AgentManager agent_commands must not be empty.";
7276
}
7377
if (start_agent) {
74-
StartAgent();
78+
StartAgent(std::move(add_to_cgroup));
7579
}
7680
}
7781
~AgentManager();
7882

7983
private:
80-
void StartAgent();
84+
void StartAgent(AddProcessToCgroupHook add_to_cgroup);
8185

8286
private:
8387
const Options options_;

src/ray/raylet/main.cc

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,10 @@ int main(int argc, char *argv[]) {
266266
gflags::ShutDownCommandLineFlags();
267267

268268
std::unique_ptr<ray::CgroupManager> cgroup_manager;
269+
AddProcessToCgroupHook add_process_to_cgroup_hook = [](const std::string &) {};
270+
AddProcessToCgroupHook add_process_to_application_cgroup_hook =
271+
[](const std::string &) {};
272+
AddProcessToCgroupHook add_process_to_system_cgroup_hook = [](const std::string &) {};
269273

270274
// TODO(#54703): Link OSS documentation once it's available in the error messages.
271275
if (enable_resource_isolation) {
@@ -321,6 +325,27 @@ int main(int argc, char *argv[]) {
321325
s.ToString());
322326
}
323327
}
328+
add_process_to_application_cgroup_hook =
329+
[&cgroup_mgr = *cgroup_manager](const std::string &pid) {
330+
ray::Status s = cgroup_mgr.AddProcessToApplicationCgroup(pid);
331+
if (!s.ok()) {
332+
RAY_LOG(WARNING) << absl::StrFormat(
333+
"Failed to move process %s into the application cgroup with error %s.",
334+
pid,
335+
s.ToString());
336+
}
337+
};
338+
339+
add_process_to_system_cgroup_hook = [&cgroup_mgr =
340+
*cgroup_manager](const std::string &pid) {
341+
ray::Status s = cgroup_mgr.AddProcessToSystemCgroup(pid);
342+
if (!s.ok()) {
343+
RAY_LOG(WARNING) << absl::StrFormat(
344+
"Failed to move process %s into the system cgroup with error %s.",
345+
pid,
346+
s.ToString());
347+
}
348+
};
324349
}
325350

326351
// Configuration for the node manager.
@@ -647,7 +672,8 @@ int main(int argc, char *argv[]) {
647672
/*starting_worker_timeout_callback=*/
648673
[&] { cluster_lease_manager->ScheduleAndGrantLeases(); },
649674
node_manager_config.ray_debugger_external,
650-
/*get_time=*/[]() { return absl::Now(); });
675+
/*get_time=*/[]() { return absl::Now(); },
676+
std::move(add_process_to_application_cgroup_hook));
651677

652678
client_call_manager = std::make_unique<ray::rpc::ClientCallManager>(
653679
main_service, /*record_stats=*/true);
@@ -950,7 +976,7 @@ int main(int argc, char *argv[]) {
950976
std::move(raylet_client_factory),
951977
/*check_signals=*/nullptr),
952978
shutdown_raylet_gracefully,
953-
std::move(cgroup_manager));
979+
std::move(add_process_to_system_cgroup_hook));
954980

955981
// Initialize the node manager.
956982
raylet = std::make_unique<ray::raylet::Raylet>(main_service,

0 commit comments

Comments
 (0)