Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions python/ray/_private/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2380,7 +2380,6 @@ def is_initialized() -> bool:
return ray._private.worker.global_worker.connected


# TODO(hjiang): Add cgroup path along with [enable_resource_isolation].
@with_connect_or_shutdown_lock
def connect(
node,
Expand All @@ -2399,7 +2398,6 @@ def connect(
worker_launch_time_ms: int = -1,
worker_launched_time_ms: int = -1,
debug_source: str = "",
enable_resource_isolation: bool = False,
):
"""Connect this worker to the raylet, to Plasma, and to GCS.

Expand Down Expand Up @@ -2428,7 +2426,6 @@ def connect(
finshes launching. If the worker is not launched by raylet (e.g.,
driver), this must be -1 (default value).
debug_source: Source information for `CoreWorker`, used for debugging and informational purpose, rather than functional purpose.
enable_resource_isolation: If true, core worker enables resource isolation by adding itself into appropriate cgroup.
"""
# Do some basic checking to make sure we didn't call ray.init twice.
error_message = "Perhaps you called ray.init twice by accident?"
Expand Down Expand Up @@ -2607,7 +2604,6 @@ def connect(
worker_launch_time_ms,
worker_launched_time_ms,
debug_source,
enable_resource_isolation,
)

if mode == SCRIPT_MODE:
Expand Down
1 change: 0 additions & 1 deletion python/ray/_private/workers/default_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,6 @@
ray_debugger_external=args.ray_debugger_external,
worker_launch_time_ms=args.worker_launch_time_ms,
worker_launched_time_ms=worker_launched_time_ms,
enable_resource_isolation=args.enable_resource_isolation,
)

worker = ray._private.worker.global_worker
Expand Down
3 changes: 1 addition & 2 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -3002,7 +3002,7 @@ cdef class CoreWorker:
local_mode, driver_name,
serialized_job_config, metrics_agent_port, runtime_env_hash,
startup_token, session_name, cluster_id, entrypoint,
worker_launch_time_ms, worker_launched_time_ms, debug_source, enable_resource_isolation):
worker_launch_time_ms, worker_launched_time_ms, debug_source):
self.is_local_mode = local_mode

cdef CCoreWorkerOptions options = CCoreWorkerOptions()
Expand Down Expand Up @@ -3058,7 +3058,6 @@ cdef class CoreWorker:
options.worker_launch_time_ms = worker_launch_time_ms
options.worker_launched_time_ms = worker_launched_time_ms
options.debug_source = debug_source
options.enable_resource_isolation = enable_resource_isolation
CCoreWorkerProcess.Initialize(options)

self.cgname_to_eventloop_dict = None
Expand Down
1 change: 0 additions & 1 deletion python/ray/includes/libcoreworker.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,6 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
int64_t worker_launch_time_ms
int64_t worker_launched_time_ms
c_string debug_source
c_bool enable_resource_isolation

cdef cppclass CCoreWorkerProcess "ray::core::CoreWorkerProcess":
@staticmethod
Expand Down
7 changes: 1 addition & 6 deletions src/ray/core_worker/core_worker_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,7 @@ struct CoreWorkerOptions {
entrypoint(""),
worker_launch_time_ms(-1),
worker_launched_time_ms(-1),
debug_source(""),
enable_resource_isolation(false) {}
debug_source("") {}

/// Type of this worker (i.e., DRIVER or WORKER).
WorkerType worker_type;
Expand Down Expand Up @@ -212,10 +211,6 @@ struct CoreWorkerOptions {
// Source information for `CoreWorker`, used for debugging and informational purpose,
// rather than functional purpose.
std::string debug_source;

// If true, core worker enables resource isolation through cgroupv2 by reserving
// resources for ray system processes.
bool enable_resource_isolation = false;
};
} // namespace core
} // namespace ray
7 changes: 0 additions & 7 deletions src/ray/core_worker/core_worker_process.cc
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,6 @@ std::shared_ptr<CoreWorker> CoreWorkerProcessImpl::CreateCoreWorker(
options.worker_type, worker_id, GetProcessJobID(options));
auto pid = getpid();

// Move worker process into cgroup on startup.
AppProcCgroupMetadata app_cgroup_metadata;
app_cgroup_metadata.pid = pid;
app_cgroup_metadata.max_memory = kUnlimitedCgroupMemory;
GetCgroupSetup(options.enable_resource_isolation)
.ApplyCgroupContext(app_cgroup_metadata);

RAY_LOG(DEBUG) << "Creating core worker with debug source: " << options.debug_source;

RAY_LOG(DEBUG).WithField(worker_id) << "Constructing CoreWorker";
Expand Down
1 change: 0 additions & 1 deletion src/ray/raylet/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,6 @@ ray_cc_binary(
"//src/ray/common:lease",
"//src/ray/common:ray_config",
"//src/ray/common:status",
"//src/ray/common/cgroup:cgroup_manager",
"//src/ray/core_worker:metrics",
"//src/ray/gcs/gcs_client:gcs_client_lib",
"//src/ray/object_manager:ownership_object_directory",
Expand Down
14 changes: 1 addition & 13 deletions src/ray/raylet/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
#include "gflags/gflags.h"
#include "nlohmann/json.hpp"
#include "ray/common/asio/instrumented_io_context.h"
#include "ray/common/cgroup/cgroup_manager.h"
#include "ray/common/constants.h"
#include "ray/common/id.h"
#include "ray/common/lease/lease.h"
Expand Down Expand Up @@ -97,12 +96,6 @@ DEFINE_int64(object_store_memory, -1, "The initial memory of the object store.")
DEFINE_string(node_name, "", "The user-provided identifier or name for this node.");
DEFINE_string(session_name, "", "The current Ray session name.");
DEFINE_string(cluster_id, "", "ID of the cluster, separate from observability.");
// TODO(hjiang): At the moment only enablement flag is added, I will add other flags for
// CPU and memory resource reservation in the followup PR.
DEFINE_bool(enable_resource_isolation,
false,
"Enable resource isolation through cgroupv2 by reserving resources for ray "
"system processes.");

#ifdef __linux__
DEFINE_string(plasma_directory,
Expand Down Expand Up @@ -231,12 +224,8 @@ int main(int argc, char *argv[]) {
RAY_LOG(INFO) << "Setting cluster ID to: " << cluster_id;
gflags::ShutDownCommandLineFlags();

// Get cgroup setup instance and perform necessary resource setup.
ray::GetCgroupSetup(FLAGS_enable_resource_isolation);

// Configuration for the node manager.
ray::raylet::NodeManagerConfig node_manager_config;
node_manager_config.enable_resource_isolation = FLAGS_enable_resource_isolation;

absl::flat_hash_map<std::string, double> static_resource_conf;

Expand Down Expand Up @@ -544,8 +533,7 @@ int main(int argc, char *argv[]) {
/*starting_worker_timeout_callback=*/
[&] { cluster_lease_manager->ScheduleAndGrantLeases(); },
node_manager_config.ray_debugger_external,
/*get_time=*/[]() { return absl::Now(); },
node_manager_config.enable_resource_isolation);
/*get_time=*/[]() { return absl::Now(); });

client_call_manager = std::make_unique<ray::rpc::ClientCallManager>(
main_service, /*record_stats=*/true);
Expand Down
3 changes: 0 additions & 3 deletions src/ray/raylet/node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,6 @@ struct NodeManagerConfig {
int max_io_workers;
// The key-value labels of this node.
absl::flat_hash_map<std::string, std::string> labels;
// If true, core worker enables resource isolation by adding itself into appropriate
// cgroup.
bool enable_resource_isolation = false;
};

class NodeManager : public rpc::NodeManagerServiceHandler,
Expand Down
3 changes: 1 addition & 2 deletions src/ray/raylet/tests/worker_pool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,7 @@ class WorkerPoolMock : public WorkerPool {
"",
[]() {},
0,
[this]() { return absl::FromUnixMillis(current_time_ms_); },
/*enable_resource_isolation=*/false),
[this]() { return absl::FromUnixMillis(current_time_ms_); }),
last_worker_process_(),
instrumented_io_service_(io_service),
client_call_manager_(instrumented_io_service_, false),
Expand Down
12 changes: 2 additions & 10 deletions src/ray/raylet/worker_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,7 @@ WorkerPool::WorkerPool(instrumented_io_context &io_service,
std::string native_library_path,
std::function<void()> starting_worker_timeout_callback,
int ray_debugger_external,
std::function<absl::Time()> get_time,
bool enable_resource_isolation)
std::function<absl::Time()> get_time)
: worker_startup_token_counter_(0),
io_service_(&io_service),
node_id_(node_id),
Expand All @@ -123,8 +122,7 @@ WorkerPool::WorkerPool(instrumented_io_context &io_service,
std::min(num_prestarted_python_workers, maximum_startup_concurrency_)),
num_prestart_python_workers(num_prestarted_python_workers),
periodical_runner_(PeriodicalRunner::Create(io_service)),
get_time_(std::move(get_time)),
enable_resource_isolation_(enable_resource_isolation) {
get_time_(std::move(get_time)) {
RAY_CHECK_GT(maximum_startup_concurrency_, 0);
// We need to record so that the metric exists. This way, we report that 0
// processes have started before a task runs on the node (as opposed to the
Expand Down Expand Up @@ -443,12 +441,6 @@ WorkerPool::BuildProcessCommandArgs(const Language &language,
serialized_preload_python_modules);
}

// Pass resource isolation flag to python worker.
if (language == Language::PYTHON && worker_type == rpc::WorkerType::WORKER) {
worker_command_args.emplace_back(absl::StrFormat(
"--enable-resource-isolation=%s", enable_resource_isolation_ ? "true" : "false"));
}

// We use setproctitle to change python worker process title,
// causing the process's /proc/PID/environ being empty.
// Add `SPT_NOENV` env to prevent setproctitle breaking /proc/PID/environ.
Expand Down
8 changes: 1 addition & 7 deletions src/ray/raylet/worker_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,6 @@ class WorkerPool : public WorkerPoolInterface {
/// \param ray_debugger_external Ray debugger in workers will be started in a way
/// that they are accessible from outside the node.
/// \param get_time A callback to get the current time in milliseconds.
/// \param enable_resource_isolation If true, core worker enables resource isolation by
/// adding itself into appropriate cgroup.
WorkerPool(instrumented_io_context &io_service,
const NodeID &node_id,
Expand All @@ -320,8 +319,7 @@ class WorkerPool : public WorkerPoolInterface {
std::string native_library_path,
std::function<void()> starting_worker_timeout_callback,
int ray_debugger_external,
std::function<absl::Time()> get_time,
bool enable_resource_isolation);
std::function<absl::Time()> get_time);

/// Destructor responsible for freeing a set of workers owned by this class.
~WorkerPool() override;
Expand Down Expand Up @@ -912,10 +910,6 @@ class WorkerPool : public WorkerPoolInterface {
int64_t process_failed_pending_registration_ = 0;
int64_t process_failed_runtime_env_setup_failed_ = 0;

// If true, core worker enables resource isolation by adding itself into appropriate
// cgroup after it is created.
bool enable_resource_isolation_ = false;

/// Ray metrics
ray::stats::Sum ray_metric_num_workers_started_{
/*name=*/"internal_num_processes_started",
Expand Down