From cdd99e44c2822133594a059b1b8528a58eaf6430 Mon Sep 17 00:00:00 2001 From: Rueian Date: Thu, 12 Jun 2025 15:26:18 -0700 Subject: [PATCH] [core] Move dependencies of NodeManger to main.cc for better testability Signed-off-by: Rueian --- src/mock/ray/raylet/worker_pool.h | 98 +++++ src/ray/raylet/local_task_manager_test.cc | 120 ++++++ src/ray/raylet/main.cc | 350 +++++++++++++++++- src/ray/raylet/node_manager.cc | 172 +-------- src/ray/raylet/node_manager.h | 60 +-- src/ray/raylet/raylet.cc | 132 +------ src/ray/raylet/raylet.h | 11 +- .../scheduling/cluster_task_manager_test.cc | 120 ++++++ src/ray/raylet/test/node_manager_test.cc | 139 ++++++- src/ray/raylet/worker_pool.h | 126 +++++-- 10 files changed, 965 insertions(+), 363 deletions(-) diff --git a/src/mock/ray/raylet/worker_pool.h b/src/mock/ray/raylet/worker_pool.h index bd34e707f28f..6141c615ff66 100644 --- a/src/mock/ray/raylet/worker_pool.h +++ b/src/mock/ray/raylet/worker_pool.h @@ -37,9 +37,107 @@ class MockWorkerPool : public WorkerPoolInterface { GetRegisteredWorker, (const WorkerID &worker_id), (const, override)); + MOCK_METHOD(std::shared_ptr, + GetRegisteredWorker, + (const std::shared_ptr &connection), + (const, override)); MOCK_METHOD(std::shared_ptr, GetRegisteredDriver, (const WorkerID &worker_id), (const, override)); + MOCK_METHOD(std::shared_ptr, + GetRegisteredDriver, + (const std::shared_ptr &connection), + (const, override)); + MOCK_METHOD(void, + HandleJobStarted, + (const JobID &job_id, const rpc::JobConfig &job_config), + (override)); + MOCK_METHOD(void, HandleJobFinished, (const JobID &job_id), (override)); + MOCK_METHOD(void, Start, (), (override)); + MOCK_METHOD(void, SetNodeManagerPort, (int node_manager_port), (override)); + MOCK_METHOD(void, + SetRuntimeEnvAgentClient, + (std::unique_ptr runtime_env_agent_client), + (override)); + MOCK_METHOD((std::vector>), + GetAllRegisteredDrivers, + (bool filter_dead_drivers), + (const, override)); + MOCK_METHOD(Status, + RegisterDriver, + (const std::shared_ptr &worker, + const rpc::JobConfig &job_config, + std::function send_reply_callback), + (override)); + MOCK_METHOD(Status, + RegisterWorker, + (const std::shared_ptr &worker, + pid_t pid, + StartupToken worker_startup_token, + std::function send_reply_callback), + (override)); + MOCK_METHOD(Status, + RegisterWorker, + (const std::shared_ptr &worker, + pid_t pid, + StartupToken worker_startup_token), + (override)); + MOCK_METHOD(void, + OnWorkerStarted, + (const std::shared_ptr &worker), + (override)); + MOCK_METHOD(void, + PushSpillWorker, + (const std::shared_ptr &worker), + (override)); + MOCK_METHOD(void, + PushRestoreWorker, + (const std::shared_ptr &worker), + (override)); + MOCK_METHOD(void, + DisconnectWorker, + (const std::shared_ptr &worker, + rpc::WorkerExitType disconnect_type), + (override)); + MOCK_METHOD(void, + DisconnectDriver, + (const std::shared_ptr &driver), + (override)); + MOCK_METHOD(void, + PrestartWorkers, + (const TaskSpecification &task_spec, int64_t backlog_size), + (override)); + MOCK_METHOD(void, + StartNewWorker, + (const std::shared_ptr &pop_worker_request), + (override)); + MOCK_METHOD(std::string, DebugString, (), (const, override)); + + MOCK_METHOD(void, + PopSpillWorker, + (std::function)> callback), + (override)); + + MOCK_METHOD(void, + PopRestoreWorker, + (std::function)> callback), + (override)); + + MOCK_METHOD(void, + PushDeleteWorker, + (const std::shared_ptr &worker), + (override)); + + MOCK_METHOD(void, + PopDeleteWorker, + (std::function)> callback), + (override)); + + boost::optional GetJobConfig( + const JobID &job_id) const override { + RAY_CHECK(false) << "Not used."; + return boost::none; + } }; } // namespace ray::raylet diff --git a/src/ray/raylet/local_task_manager_test.cc b/src/ray/raylet/local_task_manager_test.cc index 0efd0063e1e1..d01dde3411f3 100644 --- a/src/ray/raylet/local_task_manager_test.cc +++ b/src/ray/raylet/local_task_manager_test.cc @@ -129,6 +129,126 @@ class MockWorkerPool : public WorkerPoolInterface { return 0; } + std::shared_ptr GetRegisteredWorker( + const std::shared_ptr &connection) const override { + RAY_CHECK(false) << "Not used."; + return nullptr; + } + + std::shared_ptr GetRegisteredDriver( + const std::shared_ptr &connection) const override { + RAY_CHECK(false) << "Not used."; + return nullptr; + } + + void HandleJobStarted(const JobID &job_id, const rpc::JobConfig &job_config) override { + RAY_CHECK(false) << "Not used."; + } + + void HandleJobFinished(const JobID &job_id) override { + RAY_CHECK(false) << "Not used."; + } + + void Start() override { RAY_CHECK(false) << "Not used."; } + + void SetNodeManagerPort(int node_manager_port) override { + RAY_CHECK(false) << "Not used."; + } + + void SetRuntimeEnvAgentClient( + std::unique_ptr runtime_env_agent_client) override { + RAY_CHECK(false) << "Not used."; + } + + std::vector> GetAllRegisteredDrivers( + bool filter_dead_drivers) const override { + RAY_CHECK(false) << "Not used."; + return {}; + } + + Status RegisterDriver(const std::shared_ptr &worker, + const rpc::JobConfig &job_config, + std::function send_reply_callback) override { + RAY_CHECK(false) << "Not used."; + return Status::Invalid("Not used."); + } + + Status RegisterWorker(const std::shared_ptr &worker, + pid_t pid, + StartupToken worker_startup_token, + std::function send_reply_callback) override { + RAY_CHECK(false) << "Not used."; + return Status::Invalid("Not used."); + } + + Status RegisterWorker(const std::shared_ptr &worker, + pid_t pid, + StartupToken worker_startup_token) override { + RAY_CHECK(false) << "Not used."; + return Status::Invalid("Not used."); + } + + boost::optional GetJobConfig( + const JobID &job_id) const override { + RAY_CHECK(false) << "Not used."; + return boost::none; + } + + void OnWorkerStarted(const std::shared_ptr &worker) override { + RAY_CHECK(false) << "Not used."; + } + + void PushSpillWorker(const std::shared_ptr &worker) override { + RAY_CHECK(false) << "Not used."; + } + + void PushRestoreWorker(const std::shared_ptr &worker) override { + RAY_CHECK(false) << "Not used."; + } + + void DisconnectWorker(const std::shared_ptr &worker, + rpc::WorkerExitType disconnect_type) override { + RAY_CHECK(false) << "Not used."; + } + + void DisconnectDriver(const std::shared_ptr &driver) override { + RAY_CHECK(false) << "Not used."; + } + + void PrestartWorkers(const TaskSpecification &task_spec, + int64_t backlog_size) override { + RAY_CHECK(false) << "Not used."; + } + + void StartNewWorker( + const std::shared_ptr &pop_worker_request) override { + RAY_CHECK(false) << "Not used."; + } + + std::string DebugString() const override { + RAY_CHECK(false) << "Not used."; + return ""; + } + + void PopSpillWorker( + std::function)> callback) override { + RAY_CHECK(false) << "Not used."; + } + + void PopRestoreWorker( + std::function)> callback) override { + RAY_CHECK(false) << "Not used."; + } + + void PushDeleteWorker(const std::shared_ptr &worker) override { + RAY_CHECK(false) << "Not used."; + } + + void PopDeleteWorker( + std::function)> callback) override { + RAY_CHECK(false) << "Not used."; + } + std::list> workers; absl::flat_hash_map> callbacks; int num_pops; diff --git a/src/ray/raylet/main.cc b/src/ray/raylet/main.cc index d49c5fe2eea5..88586ebf6ff8 100644 --- a/src/ray/raylet/main.cc +++ b/src/ray/raylet/main.cc @@ -30,6 +30,7 @@ #include "ray/common/status.h" #include "ray/common/task/task_common.h" #include "ray/gcs/gcs_client/gcs_client.h" +#include "ray/object_manager/ownership_object_directory.h" #include "ray/raylet/raylet.h" #include "ray/stats/stats.h" #include "ray/util/cmd_line_utils.h" @@ -38,6 +39,7 @@ #include "ray/util/stream_redirection.h" #include "ray/util/stream_redirection_options.h" #include "ray/util/subreaper.h" +#include "scheduling/cluster_task_manager.h" #include "src/ray/protobuf/gcs.pb.h" using json = nlohmann::json; @@ -256,6 +258,36 @@ int main(int argc, char *argv[]) { RAY_CHECK_OK(gcs_client->Connect(main_service)); std::unique_ptr raylet; + std::shared_ptr plasma_client; + std::shared_ptr node_manager; + std::shared_ptr client_call_manager; + std::shared_ptr worker_rpc_pool; + std::shared_ptr worker_pool; + /// Manages all local objects that are pinned (primary + /// copies), freed, and/or spilled. + std::shared_ptr local_object_manager; + /// These classes make up the new scheduler. ClusterResourceScheduler is + /// responsible for maintaining a view of the cluster state w.r.t resource + /// usage. ClusterTaskManager is responsible for queuing, spilling back, and + /// dispatching tasks. + std::shared_ptr cluster_resource_scheduler; + std::shared_ptr local_task_manager; + std::shared_ptr cluster_task_manager; + /// The raylet client to initiate the pubsub to core workers (owners). + /// It is used to subscribe objects to evict. + std::shared_ptr core_worker_subscriber; + /// The object table. This is shared between the object manager and node + /// manager. + std::shared_ptr object_directory; + /// Manages client requests for object transfers and availability. + std::shared_ptr object_manager; + /// A manager to resolve objects needed by queued tasks and workers that + /// called `ray.get` or `ray.wait`. + std::shared_ptr dependency_manager; + /// Map of workers leased out to clients. + absl::flat_hash_map> + leased_workers; + // Enable subreaper. This is called in `AsyncGetInternalConfig` below, but MSVC does // not allow a macro invocation (#ifdef) in another macro invocation (RAY_CHECK_OK), // so we have to put it here. @@ -323,6 +355,8 @@ int main(int argc, char *argv[]) { "shutdown_raylet_gracefully_internal"); }; + ray::NodeID raylet_node_id = ray::NodeID::FromHex(node_id); + RAY_CHECK_OK(gcs_client->InternalKV().AsyncGetInternalConfig( [&](::ray::Status status, const std::optional &stored_raylet_config) { RAY_CHECK_OK(status); @@ -466,10 +500,322 @@ int main(int argc, char *argv[]) { {ray::stats::SessionNameKey, session_name}}; ray::stats::Init(global_tags, metrics_agent_port, WorkerID::Nil()); - ray::NodeID raylet_node_id = ray::NodeID::FromHex(node_id); RAY_LOG(INFO).WithField(raylet_node_id) << "Setting node ID"; node_manager_config.AddDefaultLabels(raylet_node_id.Hex()); + + worker_pool = std::make_shared( + main_service, + raylet_node_id, + node_manager_config.node_manager_address, + [&]() { + // Callback to determine the maximum number of idle workers to + // keep around. + if (node_manager_config.num_workers_soft_limit >= 0) { + return node_manager_config.num_workers_soft_limit; + } + // If no limit is provided, use the available number of CPUs, + // assuming that each incoming task will likely require 1 CPU. + // We floor the available CPUs to the nearest integer to avoid + // starting too many workers when there is less than 1 CPU left. + // Otherwise, we could end up repeatedly starting the worker, then + // killing it because it idles for too long. The downside is that + // we will be slower to schedule tasks that could use a fraction + // of a CPU. + return static_cast( + cluster_resource_scheduler->GetLocalResourceManager() + .GetLocalAvailableCpus()); + }, + node_manager_config.num_prestart_python_workers, + node_manager_config.maximum_startup_concurrency, + node_manager_config.min_worker_port, + node_manager_config.max_worker_port, + node_manager_config.worker_ports, + gcs_client, + node_manager_config.worker_commands, + node_manager_config.native_library_path, + /*starting_worker_timeout_callback=*/ + [&] { cluster_task_manager->ScheduleAndDispatchTasks(); }, + node_manager_config.ray_debugger_external, + /*get_time=*/[]() { return absl::Now(); }, + node_manager_config.enable_resource_isolation); + + client_call_manager = std::make_shared( + main_service, /*record_stats=*/true); + + worker_rpc_pool = std::make_shared( + [&](const ray::rpc::Address &addr) { + return std::make_shared( + addr, + *client_call_manager, + ray::rpc::CoreWorkerClientPool::GetDefaultUnavailableTimeoutCallback( + gcs_client.get(), + worker_rpc_pool.get(), + [&](const std::string &node_manager_address, int32_t port) { + return std::make_shared( + ray::rpc::NodeManagerWorkerClient::make( + node_manager_address, port, *client_call_manager)); + }, + addr)); + }); + + core_worker_subscriber = std::make_shared( + raylet_node_id, + /*channels=*/ + std::vector{ + ray::rpc::ChannelType::WORKER_OBJECT_EVICTION, + ray::rpc::ChannelType::WORKER_REF_REMOVED_CHANNEL, + ray::rpc::ChannelType::WORKER_OBJECT_LOCATIONS_CHANNEL}, + RayConfig::instance().max_command_batch_size(), + /*get_client=*/ + [&](const ray::rpc::Address &address) { + return worker_rpc_pool->GetOrConnect(address); + }, + &main_service); + + object_directory = std::make_shared( + main_service, + gcs_client, + core_worker_subscriber.get(), + worker_rpc_pool.get(), + [&](const ObjectID &obj_id, const ray::rpc::ErrorType &error_type) { + ray::rpc::ObjectReference ref; + ref.set_object_id(obj_id.Binary()); + node_manager->MarkObjectsAsFailed(error_type, {ref}, JobID::Nil()); + }); + + object_manager = std::make_shared( + main_service, + raylet_node_id, + object_manager_config, + gcs_client, + object_directory.get(), + /*restore_spilled_object=*/ + [&](const ObjectID &object_id, + int64_t object_size, + const std::string &object_url, + std::function callback) { + local_object_manager->AsyncRestoreSpilledObject( + object_id, object_size, object_url, std::move(callback)); + }, + /*get_spilled_object_url=*/ + [&](const ObjectID &object_id) { + return local_object_manager->GetLocalSpilledObjectURL(object_id); + }, + /*spill_objects_callback=*/ + [&]() { + // This callback is called from the plasma store thread. + // NOTE: It means the local object manager should be thread-safe. + main_service.post( + [&]() { local_object_manager->SpillObjectUptoMaxThroughput(); }, + "NodeManager.SpillObjects"); + return local_object_manager->IsSpillingInProgress(); + }, + /*object_store_full_callback=*/ + [&]() { + // Post on the node manager's event loop since this + // callback is called from the plasma store thread. + // This will help keep node manager lock-less. + main_service.post([&]() { node_manager->TriggerGlobalGC(); }, + "NodeManager.GlobalGC"); + }, + /*add_object_callback=*/ + [&](const ray::ObjectInfo &object_info) { + node_manager->HandleObjectLocal(object_info); + }, + /*delete_object_callback=*/ + [&](const ObjectID &object_id) { + node_manager->HandleObjectMissing(object_id); + }, + /*pin_object=*/ + [&](const ObjectID &object_id) { + std::vector object_ids = {object_id}; + std::vector> results; + std::unique_ptr result; + if (node_manager->GetObjectsFromPlasma(object_ids, &results) && + results.size() > 0) { + result = std::move(results[0]); + } + return result; + }, + /*fail_pull_request=*/ + [&](const ObjectID &object_id, ray::rpc::ErrorType error_type) { + ray::rpc::ObjectReference ref; + ref.set_object_id(object_id.Binary()); + node_manager->MarkObjectsAsFailed(error_type, {ref}, JobID::Nil()); + }); + + local_object_manager = std::make_shared( + raylet_node_id, + node_manager_config.node_manager_address, + node_manager_config.node_manager_port, + main_service, + RayConfig::instance().free_objects_batch_size(), + RayConfig::instance().free_objects_period_milliseconds(), + *worker_pool, + *worker_rpc_pool, + /*max_io_workers*/ node_manager_config.max_io_workers, + /*is_external_storage_type_fs*/ + RayConfig::instance().is_external_storage_type_fs(), + /*max_fused_object_count*/ RayConfig::instance().max_fused_object_count(), + /*on_objects_freed*/ + [&](const std::vector &object_ids) { + object_manager->FreeObjects(object_ids, + /*local_only=*/false); + }, + /*is_plasma_object_spillable*/ + [&](const ObjectID &object_id) { + return object_manager->IsPlasmaObjectSpillable(object_id); + }, + /*core_worker_subscriber_=*/core_worker_subscriber.get(), + object_directory.get()); + + dependency_manager = + std::make_shared(*object_manager); + + cluster_resource_scheduler = std::make_shared( + main_service, + ray::scheduling::NodeID(raylet_node_id.Binary()), + node_manager_config.resource_config.GetResourceMap(), + /*is_node_available_fn*/ + [&](ray::scheduling::NodeID node_id) { + return gcs_client->Nodes().Get(NodeID::FromBinary(node_id.Binary())) != + nullptr; + }, + /*get_used_object_store_memory*/ + [&]() { + if (RayConfig::instance().scheduler_report_pinned_bytes_only()) { + // Get the current bytes used by local primary object copies. This + // is used to help node scale down decisions. A node can only be + // safely drained when this function reports zero. + int64_t bytes_used = local_object_manager->GetPrimaryBytes(); + // Report nonzero if we have objects spilled to the local filesystem. + if (bytes_used == 0 && local_object_manager->HasLocallySpilledObjects()) { + bytes_used = 1; + } + return bytes_used; + } + return object_manager->GetUsedMemory(); + }, + /*get_pull_manager_at_capacity*/ + [&]() { return object_manager->PullManagerHasPullsQueued(); }, + shutdown_raylet_gracefully, + /*labels*/ + node_manager_config.labels); + + auto get_node_info_func = [&](const NodeID &node_id) { + return gcs_client->Nodes().Get(node_id); + }; + auto announce_infeasible_task = [](const ray::RayTask &task) { + /// Publish the infeasible task error to GCS so that drivers can subscribe to it + /// and print. + bool suppress_warning = false; + + if (!task.GetTaskSpecification().PlacementGroupBundleId().first.IsNil()) { + // If the task is part of a placement group, do nothing. If necessary, the + // infeasible warning should come from the placement group scheduling, not the + // task scheduling. + suppress_warning = true; + } + + // Push a warning to the task's driver that this task is currently infeasible. + if (!suppress_warning) { + std::ostringstream error_message; + error_message + << "The actor or task with ID " << task.GetTaskSpecification().TaskId() + << " cannot be scheduled right now. It requires " + << task.GetTaskSpecification() + .GetRequiredPlacementResources() + .DebugString() + << " for placement, however the cluster currently cannot provide the " + "requested " + "resources. The required resources may be added as autoscaling takes " + "place " + "or placement groups are scheduled. Otherwise, consider reducing the " + "resource requirements of the task."; + std::string error_message_str = error_message.str(); + RAY_LOG(WARNING) << error_message_str; + } + }; + + RAY_CHECK(RayConfig::instance().max_task_args_memory_fraction() > 0 && + RayConfig::instance().max_task_args_memory_fraction() <= 1) + << "max_task_args_memory_fraction must be a nonzero fraction."; + auto max_task_args_memory = + static_cast(static_cast(object_manager->GetMemoryCapacity()) * + RayConfig::instance().max_task_args_memory_fraction()); + if (max_task_args_memory <= 0) { + RAY_LOG(WARNING) + << "Max task args should be a fraction of the object store capacity, but " + "object " + "store capacity is zero or negative. Allowing task args to use 100% of " + "the " + "local object store. This can cause ObjectStoreFullErrors if the tasks' " + "return values are greater than the remaining capacity."; + max_task_args_memory = 0; + } + + local_task_manager = std::make_shared( + raylet_node_id, + *std::dynamic_pointer_cast( + cluster_resource_scheduler), + *dependency_manager, + get_node_info_func, + *worker_pool, + leased_workers, + [&](const std::vector &object_ids, + std::vector> *results) { + return node_manager->GetObjectsFromPlasma(object_ids, results); + }, + max_task_args_memory); + + cluster_task_manager = std::make_shared( + raylet_node_id, + *std::dynamic_pointer_cast( + cluster_resource_scheduler), + get_node_info_func, + announce_infeasible_task, + *local_task_manager); + + auto raylet_client_factory = + [&](const NodeID &node_id, ray::rpc::ClientCallManager &client_call_manager) { + const ray::rpc::GcsNodeInfo *node_info = gcs_client->Nodes().Get(node_id); + RAY_CHECK(node_info) << "No GCS info for node " << node_id; + std::shared_ptr raylet_client = + ray::rpc::NodeManagerWorkerClient::make( + node_info->node_manager_address(), + node_info->node_manager_port(), + client_call_manager); + return std::make_shared( + std::move(raylet_client)); + }; + + plasma_client = std::make_shared(); + node_manager = std::make_shared( + main_service, + raylet_node_id, + node_name, + node_manager_config, + gcs_client, + *client_call_manager, + *worker_rpc_pool, + core_worker_subscriber, + cluster_resource_scheduler, + local_task_manager, + cluster_task_manager, + object_directory, + object_manager, + *local_object_manager, + *dependency_manager, + *worker_pool, + leased_workers, + *plasma_client, + std::make_unique( + *plasma_client, + std::move(raylet_client_factory), + /*check_signals=*/nullptr), + shutdown_raylet_gracefully); + // Initialize the node manager. raylet = std::make_unique(main_service, raylet_node_id, @@ -481,7 +827,7 @@ int main(int argc, char *argv[]) { gcs_client, metrics_export_port, is_head_node, - shutdown_raylet_gracefully); + node_manager); // Initialize event framework. if (RayConfig::instance().event_log_reporter_enabled() && !log_dir.empty()) { diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 0a188771252b..2051528aaa5a 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -115,9 +115,16 @@ NodeManager::NodeManager( std::shared_ptr gcs_client, rpc::ClientCallManager &client_call_manager, rpc::CoreWorkerClientPool &worker_rpc_pool, - std::unique_ptr core_worker_subscriber, - std::unique_ptr object_directory, - std::unique_ptr object_manager, + std::shared_ptr core_worker_subscriber, + std::shared_ptr cluster_resource_scheduler, + std::shared_ptr local_task_manager, + std::shared_ptr cluster_task_manager, + std::shared_ptr object_directory, + std::shared_ptr object_manager, + LocalObjectManager &local_object_manager, + DependencyManager &dependency_manager, + WorkerPoolInterface &worker_pool, + absl::flat_hash_map> &leased_workers, plasma::PlasmaClientInterface &store_client, std::unique_ptr mutable_object_provider, @@ -127,40 +134,7 @@ NodeManager::NodeManager( io_service_(io_service), gcs_client_(std::move(gcs_client)), shutdown_raylet_gracefully_(shutdown_raylet_gracefully), - worker_pool_( - io_service, - self_node_id_, - config.node_manager_address, - [this, config]() { - // Callback to determine the maximum number of idle workers to keep - // around. - if (config.num_workers_soft_limit >= 0) { - return config.num_workers_soft_limit; - } - // If no limit is provided, use the available number of CPUs, - // assuming that each incoming task will likely require 1 CPU. - // We floor the available CPUs to the nearest integer to avoid starting too - // many workers when there is less than 1 CPU left. Otherwise, we could end - // up repeatedly starting the worker, then killing it because it idles for - // too long. The downside is that we will be slower to schedule tasks that - // could use a fraction of a CPU. - return static_cast( - cluster_resource_scheduler_->GetLocalResourceManager() - .GetLocalAvailableCpus()); - }, - config.num_prestart_python_workers, - config.maximum_startup_concurrency, - config.min_worker_port, - config.max_worker_port, - config.worker_ports, - gcs_client_, - config.worker_commands, - config.native_library_path, - /*starting_worker_timeout_callback=*/ - [this] { cluster_task_manager_->ScheduleAndDispatchTasks(); }, - config.ray_debugger_external, - /*get_time=*/[]() { return absl::Now(); }, - config.enable_resource_isolation), + worker_pool_(worker_pool), client_call_manager_(client_call_manager), worker_rpc_pool_(worker_rpc_pool), core_worker_subscriber_(std::move(core_worker_subscriber)), @@ -171,7 +145,7 @@ NodeManager::NodeManager( periodical_runner_(PeriodicalRunner::Create(io_service)), report_resources_period_ms_(config.report_resources_period_ms), initial_config_(config), - dependency_manager_(*object_manager_), + dependency_manager_(dependency_manager), wait_manager_(/*is_object_local*/ [this](const ObjectID &object_id) { return dependency_manager_.CheckObjectLocal(object_id); @@ -184,35 +158,16 @@ NodeManager::NodeManager( node_manager_server_("NodeManager", config.node_manager_port, config.node_manager_address == "127.0.0.1"), - local_object_manager_( - self_node_id_, - config.node_manager_address, - config.node_manager_port, - io_service_, - RayConfig::instance().free_objects_batch_size(), - RayConfig::instance().free_objects_period_milliseconds(), - worker_pool_, - worker_rpc_pool_, - /*max_io_workers*/ config.max_io_workers, - /*is_external_storage_type_fs*/ - RayConfig::instance().is_external_storage_type_fs(), - /*max_fused_object_count*/ RayConfig::instance().max_fused_object_count(), - /*on_objects_freed*/ - [this](const std::vector &object_ids) { - object_manager_->FreeObjects(object_ids, - /*local_only=*/false); - }, - /*is_plasma_object_spillable*/ - [this](const ObjectID &object_id) { - return object_manager_->IsPlasmaObjectSpillable(object_id); - }, - /*core_worker_subscriber_=*/core_worker_subscriber_.get(), - object_directory_.get()), + local_object_manager_(local_object_manager), + leased_workers_(leased_workers), high_plasma_storage_usage_(RayConfig::instance().high_plasma_storage_usage()), local_gc_run_time_ns_(absl::GetCurrentTimeNanos()), local_gc_throttler_(RayConfig::instance().local_gc_min_interval_s() * 1e9), global_gc_throttler_(RayConfig::instance().global_gc_min_interval_s() * 1e9), local_gc_interval_ns_(RayConfig::instance().local_gc_interval_s() * 1e9), + cluster_resource_scheduler_(std::move(cluster_resource_scheduler)), + local_task_manager_(std::move(local_task_manager)), + cluster_task_manager_(std::move(cluster_task_manager)), record_metrics_period_ms_(config.record_metrics_period_ms), next_resource_seq_no_(0), ray_syncer_(io_service_, self_node_id_.Binary()), @@ -225,73 +180,7 @@ NodeManager::NodeManager( RayConfig::instance().memory_monitor_refresh_ms(), CreateMemoryUsageRefreshCallback())) { RAY_LOG(INFO).WithField(kLogKeyNodeID, self_node_id_) << "Initializing NodeManager"; - cluster_resource_scheduler_ = std::make_shared( - io_service, - scheduling::NodeID(self_node_id_.Binary()), - config.resource_config.GetResourceMap(), - /*is_node_available_fn*/ - [this](scheduling::NodeID node_id) { - return gcs_client_->Nodes().Get(NodeID::FromBinary(node_id.Binary())) != nullptr; - }, - /*get_used_object_store_memory*/ - [this]() { - if (RayConfig::instance().scheduler_report_pinned_bytes_only()) { - // Get the current bytes used by local primary object copies. This - // is used to help node scale down decisions. A node can only be - // safely drained when this function reports zero. - int64_t bytes_used = local_object_manager_.GetPrimaryBytes(); - // Report nonzero if we have objects spilled to the local filesystem. - if (bytes_used == 0 && local_object_manager_.HasLocallySpilledObjects()) { - bytes_used = 1; - } - return bytes_used; - } - return object_manager_->GetUsedMemory(); - }, - /*get_pull_manager_at_capacity*/ - [this]() { return object_manager_->PullManagerHasPullsQueued(); }, - shutdown_raylet_gracefully, - /*labels*/ - config.labels); - - auto get_node_info_func = [this](const NodeID &node_id) { - return gcs_client_->Nodes().Get(node_id); - }; - auto announce_infeasible_task = [this](const RayTask &task) { - PublishInfeasibleTaskError(task); - }; - RAY_CHECK(RayConfig::instance().max_task_args_memory_fraction() > 0 && - RayConfig::instance().max_task_args_memory_fraction() <= 1) - << "max_task_args_memory_fraction must be a nonzero fraction."; - auto max_task_args_memory = - static_cast(static_cast(object_manager_->GetMemoryCapacity()) * - RayConfig::instance().max_task_args_memory_fraction()); - if (max_task_args_memory <= 0) { - RAY_LOG(WARNING) - << "Max task args should be a fraction of the object store capacity, but object " - "store capacity is zero or negative. Allowing task args to use 100% of the " - "local object store. This can cause ObjectStoreFullErrors if the tasks' " - "return values are greater than the remaining capacity."; - max_task_args_memory = 0; - } - local_task_manager_ = std::make_unique( - self_node_id_, - *std::dynamic_pointer_cast(cluster_resource_scheduler_), - dependency_manager_, - get_node_info_func, - worker_pool_, - leased_workers_, - [this](const std::vector &object_ids, - std::vector> *results) { - return GetObjectsFromPlasma(object_ids, results); - }, - max_task_args_memory); - cluster_task_manager_ = std::make_shared( - self_node_id_, - *std::dynamic_pointer_cast(cluster_resource_scheduler_), - get_node_info_func, - announce_infeasible_task, - *local_task_manager_); + placement_group_resource_manager_ = std::make_shared( std::dynamic_pointer_cast(cluster_resource_scheduler_)); @@ -2958,31 +2847,6 @@ std::optional NodeManager::CreateSyncMessage( return std::make_optional(std::move(msg)); } -void NodeManager::PublishInfeasibleTaskError(const RayTask &task) const { - bool suppress_warning = false; - - if (!task.GetTaskSpecification().PlacementGroupBundleId().first.IsNil()) { - // If the task is part of a placement group, do nothing. If necessary, the infeasible - // warning should come from the placement group scheduling, not the task scheduling. - suppress_warning = true; - } - - // Push a warning to the task's driver that this task is currently infeasible. - if (!suppress_warning) { - std::ostringstream error_message; - error_message - << "The actor or task with ID " << task.GetTaskSpecification().TaskId() - << " cannot be scheduled right now. It requires " - << task.GetTaskSpecification().GetRequiredPlacementResources().DebugString() - << " for placement, however the cluster currently cannot provide the requested " - "resources. The required resources may be added as autoscaling takes place " - "or placement groups are scheduled. Otherwise, consider reducing the " - "resource requirements of the task."; - std::string error_message_str = error_message.str(); - RAY_LOG(WARNING) << error_message_str; - } -} - // Picks the worker with the latest submitted task and kills the process // if the memory usage is above the threshold. Allows one in-flight // process kill at a time as killing a process could sometimes take diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 0d1af26f297c..463682b224b8 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -130,20 +130,28 @@ class NodeManager : public rpc::NodeManagerServiceHandler, /// \param config Configuration of node manager, e.g. initial resources, ports, etc. /// \param object_manager_config Configuration of object manager, e.g. initial memory /// allocation. - NodeManager(instrumented_io_context &io_service, - const NodeID &self_node_id, - std::string self_node_name, - const NodeManagerConfig &config, - std::shared_ptr gcs_client, - rpc::ClientCallManager &client_call_manager, - rpc::CoreWorkerClientPool &worker_rpc_pool, - std::unique_ptr core_worker_subscriber, - std::unique_ptr object_directory, - std::unique_ptr object_manager, - plasma::PlasmaClientInterface &store_client, - std::unique_ptr - mutable_object_provider, - std::function shutdown_raylet_gracefully); + NodeManager( + instrumented_io_context &io_service, + const NodeID &self_node_id, + std::string self_node_name, + const NodeManagerConfig &config, + std::shared_ptr gcs_client, + rpc::ClientCallManager &client_call_manager, + rpc::CoreWorkerClientPool &worker_rpc_pool, + std::shared_ptr core_worker_subscriber, + std::shared_ptr cluster_resource_scheduler, + std::shared_ptr local_task_manager, + std::shared_ptr cluster_task_manager, + std::shared_ptr object_directory, + std::shared_ptr object_manager, + LocalObjectManager &local_object_manager, + DependencyManager &dependency_manager, + WorkerPoolInterface &worker_pool, + absl::flat_hash_map> &leased_workers, + plasma::PlasmaClientInterface &store_client, + std::unique_ptr + mutable_object_provider, + std::function shutdown_raylet_gracefully); /// Handle an unexpected error that occurred on a client connection. /// The client will be disconnected and no more messages will be processed. @@ -704,12 +712,6 @@ class NodeManager : public rpc::NodeManagerServiceHandler, /// \return Whether the resource is returned successfully. bool ReturnBundleResources(const BundleSpecification &bundle_spec); - /// Publish the infeasible task error to GCS so that drivers can subscribe to it and - /// print. - /// - /// \param task RayTask that is infeasible - void PublishInfeasibleTaskError(const RayTask &task) const; - /// Populate the relevant parts of the heartbeat table. This is intended for /// sending raylet <-> gcs heartbeats. In particular, this should fill in /// resource_load and resource_load_by_shape. @@ -779,7 +781,7 @@ class NodeManager : public rpc::NodeManagerServiceHandler, /// The function to shutdown raylet gracefully. std::function shutdown_raylet_gracefully_; /// A pool of workers. - WorkerPool worker_pool_; + WorkerPoolInterface &worker_pool_; /// The `ClientCallManager` object that is shared by all `NodeManagerClient`s /// as well as all `CoreWorkerClient`s. rpc::ClientCallManager &client_call_manager_; @@ -787,12 +789,12 @@ class NodeManager : public rpc::NodeManagerServiceHandler, rpc::CoreWorkerClientPool &worker_rpc_pool_; /// The raylet client to initiate the pubsub to core workers (owners). /// It is used to subscribe objects to evict. - std::unique_ptr core_worker_subscriber_; + std::shared_ptr core_worker_subscriber_; /// The object table. This is shared between the object manager and node /// manager. - std::unique_ptr object_directory_; + std::shared_ptr object_directory_; /// Manages client requests for object transfers and availability. - std::unique_ptr object_manager_; + std::shared_ptr object_manager_; /// A Plasma object store client. This is used for creating new objects in /// the object store (e.g., for actor tasks that can't be run because the /// actor died) and to pin objects that are in scope in the cluster. @@ -814,7 +816,7 @@ class NodeManager : public rpc::NodeManagerServiceHandler, /// A manager to resolve objects needed by queued tasks and workers that /// called `ray.get` or `ray.wait`. - DependencyManager dependency_manager_; + DependencyManager &dependency_manager_; /// A manager for wait requests. WaitManager wait_manager_; @@ -833,14 +835,14 @@ class NodeManager : public rpc::NodeManagerServiceHandler, /// Manages all local objects that are pinned (primary /// copies), freed, and/or spilled. - LocalObjectManager local_object_manager_; + LocalObjectManager &local_object_manager_; /// Map from node ids to addresses of the remote node managers. absl::flat_hash_map> remote_node_manager_addresses_; /// Map of workers leased out to clients. - absl::flat_hash_map> leased_workers_; + absl::flat_hash_map> &leased_workers_; /// Optional extra information about why the task failed. absl::flat_hash_map task_failure_reasons_; @@ -871,12 +873,12 @@ class NodeManager : public rpc::NodeManagerServiceHandler, /// Seconds to initialize a local gc const uint64_t local_gc_interval_ns_; - /// These two classes make up the new scheduler. ClusterResourceScheduler is + /// These classes make up the new scheduler. ClusterResourceScheduler is /// responsible for maintaining a view of the cluster state w.r.t resource /// usage. ClusterTaskManager is responsible for queuing, spilling back, and /// dispatching tasks. std::shared_ptr cluster_resource_scheduler_; - std::unique_ptr local_task_manager_; + std::shared_ptr local_task_manager_; std::shared_ptr cluster_task_manager_; absl::flat_hash_map> pinned_objects_; diff --git a/src/ray/raylet/raylet.cc b/src/ray/raylet/raylet.cc index d506be1a0b02..5c788276684e 100644 --- a/src/ray/raylet/raylet.cc +++ b/src/ray/raylet/raylet.cc @@ -28,7 +28,6 @@ #include "ray/core_worker/experimental_mutable_object_provider.h" #include "ray/object_manager/object_manager.h" #include "ray/object_manager/ownership_object_directory.h" -#include "ray/object_manager/plasma/client.h" #include "ray/util/util.h" namespace { @@ -74,138 +73,13 @@ Raylet::Raylet(instrumented_io_context &main_service, std::shared_ptr gcs_client, int metrics_export_port, bool is_head_node, - std::function shutdown_raylet_gracefully) + std::shared_ptr node_manager) : self_node_id_(self_node_id), gcs_client_(std::move(gcs_client)), + node_manager_(std::move(node_manager)), socket_name_(socket_name), acceptor_(main_service, ParseUrlEndpoint(socket_name)), - socket_(main_service), - client_call_manager_(main_service, /*record_stats=*/true), - worker_rpc_pool_([this](const rpc::Address &addr) { - return std::make_shared( - addr, - client_call_manager_, - rpc::CoreWorkerClientPool::GetDefaultUnavailableTimeoutCallback( - gcs_client_.get(), - &worker_rpc_pool_, - [this](const std::string &node_manager_address, int32_t port) { - return std::make_shared( - rpc::NodeManagerWorkerClient::make( - node_manager_address, port, client_call_manager_)); - }, - addr)); - }) { - auto core_worker_subscriber = std::make_unique( - self_node_id_, - /*channels=*/ - std::vector{rpc::ChannelType::WORKER_OBJECT_EVICTION, - rpc::ChannelType::WORKER_REF_REMOVED_CHANNEL, - rpc::ChannelType::WORKER_OBJECT_LOCATIONS_CHANNEL}, - RayConfig::instance().max_command_batch_size(), - /*get_client=*/ - [this](const rpc::Address &address) { - return worker_rpc_pool_.GetOrConnect(address); - }, - &main_service); - auto object_directory = std::make_unique( - main_service, - gcs_client_, - core_worker_subscriber.get(), - &worker_rpc_pool_, - [this](const ObjectID &obj_id, const ErrorType &error_type) { - rpc::ObjectReference ref; - ref.set_object_id(obj_id.Binary()); - this->node_manager_->MarkObjectsAsFailed(error_type, {ref}, JobID::Nil()); - }); - auto object_manager = std::make_unique( - main_service, - self_node_id, - object_manager_config, - gcs_client_, - object_directory.get(), - /*restore_spilled_object=*/ - [this](const ObjectID &object_id, - int64_t object_size, - const std::string &object_url, - std::function callback) { - this->node_manager_->GetLocalObjectManager().AsyncRestoreSpilledObject( - object_id, object_size, object_url, std::move(callback)); - }, - /*get_spilled_object_url=*/ - [this](const ObjectID &object_id) { - return this->node_manager_->GetLocalObjectManager().GetLocalSpilledObjectURL( - object_id); - }, - /*spill_objects_callback=*/ - [this, &main_service]() { - // This callback is called from the plasma store thread. - // NOTE: It means the local object manager should be thread-safe. - main_service.post( - [this]() { - this->node_manager_->GetLocalObjectManager().SpillObjectUptoMaxThroughput(); - }, - "NodeManager.SpillObjects"); - return this->node_manager_->GetLocalObjectManager().IsSpillingInProgress(); - }, - /*object_store_full_callback=*/ - [this, &main_service]() { - // Post on the node manager's event loop since this - // callback is called from the plasma store thread. - // This will help keep node manager lock-less. - main_service.post([this]() { this->node_manager_->TriggerGlobalGC(); }, - "NodeManager.GlobalGC"); - }, - /*add_object_callback=*/ - [this](const ObjectInfo &object_info) { - this->node_manager_->HandleObjectLocal(object_info); - }, - /*delete_object_callback=*/ - [this](const ObjectID &object_id) { - this->node_manager_->HandleObjectMissing(object_id); - }, - /*pin_object=*/ - [this](const ObjectID &object_id) { - std::vector object_ids = {object_id}; - std::vector> results; - std::unique_ptr result; - if (this->node_manager_->GetObjectsFromPlasma(object_ids, &results) && - results.size() > 0) { - result = std::move(results[0]); - } - return result; - }, - /*fail_pull_request=*/ - [this](const ObjectID &object_id, rpc::ErrorType error_type) { - rpc::ObjectReference ref; - ref.set_object_id(object_id.Binary()); - this->node_manager_->MarkObjectsAsFailed(error_type, {ref}, JobID::Nil()); - }); - auto raylet_client_factory = [this](const NodeID &node_id, - rpc::ClientCallManager &client_call_manager) { - const rpc::GcsNodeInfo *node_info = gcs_client_->Nodes().Get(node_id); - RAY_CHECK(node_info) << "No GCS info for node " << node_id; - std::shared_ptr raylet_client = - rpc::NodeManagerWorkerClient::make(node_info->node_manager_address(), - node_info->node_manager_port(), - client_call_manager); - return std::make_shared(std::move(raylet_client)); - }; - node_manager_ = std::make_unique( - main_service, - self_node_id, - node_name, - node_manager_config, - gcs_client_, - client_call_manager_, - worker_rpc_pool_, - std::move(core_worker_subscriber), - std::move(object_directory), - std::move(object_manager), - plasma_client_, - std::make_unique( - plasma_client_, std::move(raylet_client_factory), /*check_signals=*/nullptr), - std::move(shutdown_raylet_gracefully)); - + socket_(main_service) { SetCloseOnExec(acceptor_); self_node_info_.set_node_id(self_node_id_.Binary()); self_node_info_.set_state(GcsNodeInfo::ALIVE); diff --git a/src/ray/raylet/raylet.h b/src/ray/raylet/raylet.h index 8f78e3f9fdb7..63b369baa531 100644 --- a/src/ray/raylet/raylet.h +++ b/src/ray/raylet/raylet.h @@ -15,7 +15,6 @@ #pragma once #include -#include #include #include @@ -55,7 +54,7 @@ class Raylet { std::shared_ptr gcs_client, int metrics_export_port, bool is_head_node, - std::function shutdown_raylet_gracefully); + std::shared_ptr node_manager); /// Start this raylet. void Start(); @@ -97,7 +96,7 @@ class Raylet { /// A client connection to the GCS. std::shared_ptr gcs_client_; /// Manages client requests for task submission and execution. - std::unique_ptr node_manager_; + std::shared_ptr node_manager_; /// The name of the socket this raylet listens on. std::string socket_name_; @@ -105,12 +104,6 @@ class Raylet { boost::asio::basic_socket_acceptor acceptor_; /// The socket to listen on for new clients. local_stream_socket socket_; - - rpc::ClientCallManager client_call_manager_; - - rpc::CoreWorkerClientPool worker_rpc_pool_; - - plasma::PlasmaClient plasma_client_; }; } // namespace ray::raylet diff --git a/src/ray/raylet/scheduling/cluster_task_manager_test.cc b/src/ray/raylet/scheduling/cluster_task_manager_test.cc index 093988f351ef..8f9f8db8104e 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager_test.cc +++ b/src/ray/raylet/scheduling/cluster_task_manager_test.cc @@ -126,6 +126,126 @@ class MockWorkerPool : public WorkerPoolInterface { } } + std::shared_ptr GetRegisteredWorker( + const std::shared_ptr &connection) const override { + RAY_CHECK(false) << "Not used."; + return nullptr; + } + + std::shared_ptr GetRegisteredDriver( + const std::shared_ptr &connection) const override { + RAY_CHECK(false) << "Not used."; + return nullptr; + } + + void HandleJobStarted(const JobID &job_id, const rpc::JobConfig &job_config) override { + RAY_CHECK(false) << "Not used."; + } + + void HandleJobFinished(const JobID &job_id) override { + RAY_CHECK(false) << "Not used."; + } + + void Start() override { RAY_CHECK(false) << "Not used."; } + + void SetNodeManagerPort(int node_manager_port) override { + RAY_CHECK(false) << "Not used."; + } + + void SetRuntimeEnvAgentClient( + std::unique_ptr runtime_env_agent_client) override { + RAY_CHECK(false) << "Not used."; + } + + std::vector> GetAllRegisteredDrivers( + bool filter_dead_drivers) const override { + RAY_CHECK(false) << "Not used."; + return {}; + } + + Status RegisterDriver(const std::shared_ptr &worker, + const rpc::JobConfig &job_config, + std::function send_reply_callback) override { + RAY_CHECK(false) << "Not used."; + return Status::Invalid("Not used."); + } + + Status RegisterWorker(const std::shared_ptr &worker, + pid_t pid, + StartupToken worker_startup_token, + std::function send_reply_callback) override { + RAY_CHECK(false) << "Not used."; + return Status::Invalid("Not used."); + } + + Status RegisterWorker(const std::shared_ptr &worker, + pid_t pid, + StartupToken worker_startup_token) override { + RAY_CHECK(false) << "Not used."; + return Status::Invalid("Not used."); + } + + boost::optional GetJobConfig( + const JobID &job_id) const override { + RAY_CHECK(false) << "Not used."; + return boost::none; + } + + void OnWorkerStarted(const std::shared_ptr &worker) override { + RAY_CHECK(false) << "Not used."; + } + + void PushSpillWorker(const std::shared_ptr &worker) override { + RAY_CHECK(false) << "Not used."; + } + + void PushRestoreWorker(const std::shared_ptr &worker) override { + RAY_CHECK(false) << "Not used."; + } + + void DisconnectWorker(const std::shared_ptr &worker, + rpc::WorkerExitType disconnect_type) override { + RAY_CHECK(false) << "Not used."; + } + + void DisconnectDriver(const std::shared_ptr &driver) override { + RAY_CHECK(false) << "Not used."; + } + + void PrestartWorkers(const TaskSpecification &task_spec, + int64_t backlog_size) override { + RAY_CHECK(false) << "Not used."; + } + + void StartNewWorker( + const std::shared_ptr &pop_worker_request) override { + RAY_CHECK(false) << "Not used."; + } + + std::string DebugString() const override { + RAY_CHECK(false) << "Not used."; + return ""; + } + + void PopSpillWorker( + std::function)> callback) override { + RAY_CHECK(false) << "Not used."; + } + + void PopRestoreWorker( + std::function)> callback) override { + RAY_CHECK(false) << "Not used."; + } + + void PushDeleteWorker(const std::shared_ptr &worker) override { + RAY_CHECK(false) << "Not used."; + } + + void PopDeleteWorker( + std::function)> callback) override { + RAY_CHECK(false) << "Not used."; + } + size_t CallbackSize(int runtime_env_hash) { auto cb_it = callbacks.find(runtime_env_hash); if (cb_it != callbacks.end()) { diff --git a/src/ray/raylet/test/node_manager_test.cc b/src/ray/raylet/test/node_manager_test.cc index 8499d46d1698..0b6c258b14a4 100644 --- a/src/ray/raylet/test/node_manager_test.cc +++ b/src/ray/raylet/test/node_manager_test.cc @@ -18,6 +18,7 @@ #include #include #include +#include #include "gmock/gmock.h" #include "mock/ray/core_worker/experimental_mutable_object_provider.h" @@ -29,6 +30,7 @@ #include "mock/ray/raylet/local_task_manager.h" #include "mock/ray/raylet/worker_pool.h" #include "mock/ray/rpc/worker/core_worker_client.h" +#include "ray/raylet/scheduling/cluster_task_manager.h" #include "ray/raylet/test/util.h" namespace ray::raylet { @@ -165,9 +167,8 @@ TEST(NodeManagerStaticTest, TestHandleReportWorkerBacklog) { EXPECT_CALL(worker_pool, GetRegisteredWorker(worker_id)) .Times(1) .WillOnce(Return(worker)); - EXPECT_CALL(worker_pool, GetRegisteredDriver(worker_id)) - .Times(0) - .WillOnce(Return(nullptr)); + EXPECT_CALL(worker_pool, GetRegisteredDriver(worker_id)).Times(0); + EXPECT_CALL(local_task_manager, ClearWorkerBacklog(worker_id)).Times(1); EXPECT_CALL(local_task_manager, SetWorkerBacklog(task_spec_1.GetSchedulingClass(), worker_id, 1)) @@ -206,19 +207,130 @@ class NodeManagerTest : public ::testing::Test { mock_object_directory_ = object_directory.get(); auto object_manager = std::make_unique(); mock_object_manager_ = object_manager.get(); + + EXPECT_CALL(*mock_object_manager_, GetMemoryCapacity()).WillRepeatedly(Return(0)); + + EXPECT_CALL(mock_store_client_, Connect(node_manager_config.store_socket_name, _, _)) + .WillOnce(Return(Status::OK())); + auto mutable_object_provider = std::make_unique(); mock_mutable_object_provider_ = mutable_object_provider.get(); + + EXPECT_CALL(mock_worker_pool_, SetNodeManagerPort(_)).Times(1); + EXPECT_CALL(mock_worker_pool_, SetRuntimeEnvAgentClient(_)).Times(1); + EXPECT_CALL(mock_worker_pool_, Start()).Times(1); + + EXPECT_CALL(mock_worker_pool_, DebugString()).WillRepeatedly(Return("")); + EXPECT_CALL(*mock_gcs_client_, DebugString()).WillRepeatedly(Return("")); + EXPECT_CALL(*mock_object_manager_, DebugString()).WillRepeatedly(Return("")); + EXPECT_CALL(*mock_object_directory_, DebugString()).WillRepeatedly(Return("")); + EXPECT_CALL(*core_worker_subscriber, DebugString()).WillRepeatedly(Return("")); + + raylet_node_id_ = NodeID::FromRandom(); + + local_object_manager_ = std::make_shared( + raylet_node_id_, + node_manager_config.node_manager_address, + node_manager_config.node_manager_port, + io_service_, + RayConfig::instance().free_objects_batch_size(), + RayConfig::instance().free_objects_period_milliseconds(), + mock_worker_pool_, + worker_rpc_pool_, + /*max_io_workers*/ node_manager_config.max_io_workers, + /*is_external_storage_type_fs*/ + RayConfig::instance().is_external_storage_type_fs(), + /*max_fused_object_count*/ RayConfig::instance().max_fused_object_count(), + /*on_objects_freed*/ + [&](const std::vector &object_ids) { + mock_object_manager_->FreeObjects(object_ids, + /*local_only=*/false); + }, + /*is_plasma_object_spillable*/ + [&](const ObjectID &object_id) { + return mock_object_manager_->IsPlasmaObjectSpillable(object_id); + }, + /*core_worker_subscriber_=*/core_worker_subscriber.get(), + mock_object_directory_); + + dependency_manager_ = std::make_shared(*mock_object_manager_); + + cluster_resource_scheduler_ = std::make_shared( + io_service_, + ray::scheduling::NodeID(raylet_node_id_.Binary()), + node_manager_config.resource_config.GetResourceMap(), + /*is_node_available_fn*/ + [&](ray::scheduling::NodeID node_id) { + return mock_gcs_client_->Nodes().Get(NodeID::FromBinary(node_id.Binary())) != + nullptr; + }, + /*get_used_object_store_memory*/ + [&]() { + if (RayConfig::instance().scheduler_report_pinned_bytes_only()) { + // Get the current bytes used by local primary object copies. This + // is used to help node scale down decisions. A node can only be + // safely drained when this function reports zero. + int64_t bytes_used = local_object_manager_->GetPrimaryBytes(); + // Report nonzero if we have objects spilled to the local filesystem. + if (bytes_used == 0 && local_object_manager_->HasLocallySpilledObjects()) { + bytes_used = 1; + } + return bytes_used; + } + return mock_object_manager_->GetUsedMemory(); + }, + /*get_pull_manager_at_capacity*/ + [&]() { return mock_object_manager_->PullManagerHasPullsQueued(); }, + [](const ray::rpc::NodeDeathInfo &node_death_info) {}, + /*labels*/ + node_manager_config.labels); + + auto get_node_info_func = [&](const NodeID &node_id) { + return mock_gcs_client_->Nodes().Get(node_id); + }; + + auto max_task_args_memory = static_cast( + static_cast(mock_object_manager_->GetMemoryCapacity()) * + RayConfig::instance().max_task_args_memory_fraction()); + + local_task_manager_ = std::make_shared( + raylet_node_id_, + *std::dynamic_pointer_cast(cluster_resource_scheduler_), + *dependency_manager_, + get_node_info_func, + mock_worker_pool_, + leased_workers_, + [&](const std::vector &object_ids, + std::vector> *results) { + return node_manager_->GetObjectsFromPlasma(object_ids, results); + }, + max_task_args_memory); + + cluster_task_manager_ = std::make_shared( + raylet_node_id_, + *std::dynamic_pointer_cast(cluster_resource_scheduler_), + get_node_info_func, + [](const ray::RayTask &task) {}, + *local_task_manager_); + node_manager_ = std::make_unique(io_service_, - NodeID::FromRandom(), + raylet_node_id_, "test_node_name", node_manager_config, mock_gcs_client_, client_call_manager_, worker_rpc_pool_, std::move(core_worker_subscriber), + cluster_resource_scheduler_, + local_task_manager_, + cluster_task_manager_, std::move(object_directory), std::move(object_manager), + *local_object_manager_, + *dependency_manager_, + mock_worker_pool_, + leased_workers_, mock_store_client_, std::move(mutable_object_provider), /*shutdown_raylet_gracefully=*/ @@ -229,6 +341,12 @@ class NodeManagerTest : public ::testing::Test { rpc::ClientCallManager client_call_manager_; rpc::CoreWorkerClientPool worker_rpc_pool_; + NodeID raylet_node_id_; + std::shared_ptr cluster_resource_scheduler_; + std::shared_ptr local_task_manager_; + std::shared_ptr cluster_task_manager_; + std::shared_ptr local_object_manager_; + std::shared_ptr dependency_manager_; std::shared_ptr mock_gcs_client_ = std::make_shared(); MockObjectDirectory *mock_object_directory_; @@ -237,11 +355,24 @@ class NodeManagerTest : public ::testing::Test { plasma::MockPlasmaClient mock_store_client_; std::unique_ptr node_manager_; + MockWorkerPool mock_worker_pool_; + absl::flat_hash_map> leased_workers_; }; TEST_F(NodeManagerTest, TestRegisterGcsAndCheckSelfAlive) { EXPECT_CALL(*mock_gcs_client_->mock_node_accessor, AsyncSubscribeToNodeChange(_, _)) .WillOnce(Return(Status::OK())); + EXPECT_CALL(*mock_gcs_client_->mock_worker_accessor, + AsyncSubscribeToWorkerFailures(_, _)) + .WillOnce(Return(Status::OK())); + EXPECT_CALL(*mock_gcs_client_->mock_job_accessor, AsyncSubscribeAll(_, _)) + .WillOnce(Return(Status::OK())); + EXPECT_CALL(mock_worker_pool_, GetAllRegisteredWorkers(_, _)) + .WillRepeatedly(Return(std::vector>{})); + EXPECT_CALL(mock_worker_pool_, GetAllRegisteredDrivers(_)) + .WillRepeatedly(Return(std::vector>{})); + EXPECT_CALL(mock_worker_pool_, IsWorkerAvailableForScheduling()) + .WillRepeatedly(Return(false)); std::promise promise; EXPECT_CALL(*mock_gcs_client_->mock_node_accessor, AsyncCheckSelfAlive(_, _)) .WillOnce([&promise](const auto &, const auto &) { diff --git a/src/ray/raylet/worker_pool.h b/src/ray/raylet/worker_pool.h index fd1bdddf5a32..f84290a3a2ed 100644 --- a/src/ray/raylet/worker_pool.h +++ b/src/ray/raylet/worker_pool.h @@ -123,10 +123,33 @@ struct PopWorkerRequest { callback(std::move(callback)) {} }; +/// \class IOWorkerPoolInterface +/// +/// Used for object spilling manager unit tests. +class IOWorkerPoolInterface { + public: + virtual void PushSpillWorker(const std::shared_ptr &worker) = 0; + + virtual void PopSpillWorker( + std::function)> callback) = 0; + + virtual void PushRestoreWorker(const std::shared_ptr &worker) = 0; + + virtual void PopRestoreWorker( + std::function)> callback) = 0; + + virtual void PushDeleteWorker(const std::shared_ptr &worker) = 0; + + virtual void PopDeleteWorker( + std::function)> callback) = 0; + + virtual ~IOWorkerPoolInterface() = default; +}; + /// \class WorkerPoolInterface /// /// Used for new scheduler unit tests. -class WorkerPoolInterface { +class WorkerPoolInterface : public IOWorkerPoolInterface { public: /// Pop an idle worker from the pool. The caller is responsible for pushing /// the worker back onto the pool once the worker has completed its work. @@ -170,34 +193,63 @@ class WorkerPoolInterface { virtual std::shared_ptr GetRegisteredWorker( const WorkerID &worker_id) const = 0; + virtual std::shared_ptr GetRegisteredWorker( + const std::shared_ptr &connection) const = 0; + /// Get registered driver process by id or nullptr if not found. virtual std::shared_ptr GetRegisteredDriver( const WorkerID &worker_id) const = 0; + virtual std::shared_ptr GetRegisteredDriver( + const std::shared_ptr &connection) const = 0; + virtual ~WorkerPoolInterface() = default; -}; -/// \class IOWorkerPoolInterface -/// -/// Used for object spilling manager unit tests. -class IOWorkerPoolInterface { - public: - virtual void PushSpillWorker(const std::shared_ptr &worker) = 0; + virtual void HandleJobStarted(const JobID &job_id, + const rpc::JobConfig &job_config) = 0; - virtual void PopSpillWorker( - std::function)> callback) = 0; + virtual void HandleJobFinished(const JobID &job_id) = 0; - virtual void PushRestoreWorker(const std::shared_ptr &worker) = 0; + virtual void Start() = 0; - virtual void PopRestoreWorker( - std::function)> callback) = 0; + virtual void SetNodeManagerPort(int node_manager_port) = 0; - virtual void PushDeleteWorker(const std::shared_ptr &worker) = 0; + virtual void SetRuntimeEnvAgentClient( + std::unique_ptr runtime_env_agent_client) = 0; - virtual void PopDeleteWorker( - std::function)> callback) = 0; + virtual std::vector> GetAllRegisteredDrivers( + bool filter_dead_drivers = false) const = 0; - virtual ~IOWorkerPoolInterface() = default; + virtual Status RegisterDriver(const std::shared_ptr &worker, + const rpc::JobConfig &job_config, + std::function send_reply_callback) = 0; + + virtual Status RegisterWorker(const std::shared_ptr &worker, + pid_t pid, + StartupToken worker_startup_token, + std::function send_reply_callback) = 0; + + virtual Status RegisterWorker(const std::shared_ptr &worker, + pid_t pid, + StartupToken worker_startup_token) = 0; + + virtual boost::optional GetJobConfig( + const JobID &job_id) const = 0; + + virtual void OnWorkerStarted(const std::shared_ptr &worker) = 0; + + virtual void DisconnectWorker(const std::shared_ptr &worker, + rpc::WorkerExitType disconnect_type) = 0; + + virtual void DisconnectDriver(const std::shared_ptr &driver) = 0; + + virtual void PrestartWorkers(const TaskSpecification &task_spec, + int64_t backlog_size) = 0; + + virtual void StartNewWorker( + const std::shared_ptr &pop_worker_request) = 0; + + virtual std::string DebugString() const = 0; }; class WorkerInterface; @@ -228,7 +280,7 @@ inline std::ostream &operator<<(std::ostream &os, /// /// The WorkerPool is responsible for managing a pool of Workers. Each Worker /// is a container for a unit of work. -class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { +class WorkerPool : public WorkerPoolInterface { public: /// Create a pool and asynchronously start at least the specified number of workers per /// language. @@ -281,28 +333,28 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { ~WorkerPool() override; /// Start the worker pool. Could only be called once. - void Start(); + void Start() override; /// Set the node manager port. /// \param node_manager_port The port Raylet uses for listening to incoming connections. - void SetNodeManagerPort(int node_manager_port); + void SetNodeManagerPort(int node_manager_port) override; /// Set Runtime Env Manager Client. void SetRuntimeEnvAgentClient( - std::unique_ptr runtime_env_agent_client); + std::unique_ptr runtime_env_agent_client) override; /// Handles the event that a job is started. /// /// \param job_id ID of the started job. /// \param job_config The config of the started job. /// \return Void - void HandleJobStarted(const JobID &job_id, const rpc::JobConfig &job_config); + void HandleJobStarted(const JobID &job_id, const rpc::JobConfig &job_config) override; /// Handles the event that a job is finished. /// /// \param job_id ID of the finished job. /// \return Void. - void HandleJobFinished(const JobID &job_id); + void HandleJobFinished(const JobID &job_id) override; /// \brief Get the job config by job id. /// @@ -310,7 +362,8 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { /// /// \param job_id ID of the job. /// \return Job config if given job is running, else nullptr. - boost::optional GetJobConfig(const JobID &job_id) const; + boost::optional GetJobConfig( + const JobID &job_id) const override; /// Register a new worker. The Worker should be added by the caller to the /// pool after it becomes idle (e.g., requests a work assignment). @@ -326,20 +379,20 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { Status RegisterWorker(const std::shared_ptr &worker, pid_t pid, StartupToken worker_startup_token, - std::function send_reply_callback); + std::function send_reply_callback) override; // Similar to the above function overload, but the port has been assigned, but directly // returns registration status without taking a callback. Status RegisterWorker(const std::shared_ptr &worker, pid_t pid, - StartupToken worker_startup_token); + StartupToken worker_startup_token) override; /// To be invoked when a worker is started. This method should be called when the worker /// announces its port. /// /// \param[in] worker The worker which is started. /// \return void - void OnWorkerStarted(const std::shared_ptr &worker); + void OnWorkerStarted(const std::shared_ptr &worker) override; /// Register a new driver. /// @@ -350,7 +403,7 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { /// \return If the registration is successful. Status RegisterDriver(const std::shared_ptr &worker, const rpc::JobConfig &job_config, - std::function send_reply_callback); + std::function send_reply_callback) override; /// Get the client connection's registered worker. /// @@ -358,7 +411,7 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { /// \return The Worker that owns the given client connection. Returns nullptr /// if the client has not registered a worker yet. std::shared_ptr GetRegisteredWorker( - const std::shared_ptr &connection) const; + const std::shared_ptr &connection) const override; /// Get the registered worker by worker id or nullptr if not found. std::shared_ptr GetRegisteredWorker( @@ -370,7 +423,7 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { /// \return The Worker that owns the given client connection. Returns nullptr /// if the client has not registered a driver. std::shared_ptr GetRegisteredDriver( - const std::shared_ptr &connection) const; + const std::shared_ptr &connection) const override; /// Get the registered driver by worker id or nullptr if not found. std::shared_ptr GetRegisteredDriver( @@ -381,12 +434,12 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { /// \param worker The worker to disconnect. The worker must be registered. /// \param disconnect_type Type of a worker exit. void DisconnectWorker(const std::shared_ptr &worker, - rpc::WorkerExitType disconnect_type); + rpc::WorkerExitType disconnect_type) override; /// Disconnect a registered driver. /// /// \param The driver to disconnect. The driver must be registered. - void DisconnectDriver(const std::shared_ptr &driver); + void DisconnectDriver(const std::shared_ptr &driver) override; /// Add an idle spill I/O worker to the pool. /// @@ -447,7 +500,7 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { /// \param task_spec The returned worker must be able to execute this task. /// \param backlog_size The number of tasks in the client backlog of this shape. /// We aim to prestart 1 worker per CPU, up to the backlog size. - void PrestartWorkers(const TaskSpecification &task_spec, int64_t backlog_size); + void PrestartWorkers(const TaskSpecification &task_spec, int64_t backlog_size) override; void PrestartWorkersInternal(const TaskSpecification &task_spec, int64_t num_needed); @@ -477,12 +530,12 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { /// /// \return A list containing all the drivers. std::vector> GetAllRegisteredDrivers( - bool filter_dead_drivers = false) const; + bool filter_dead_drivers = false) const override; /// Returns debug string for class. /// /// \return string. - std::string DebugString() const; + std::string DebugString() const override; /// Try killing idle workers to ensure the running workers are in a /// reasonable size. @@ -506,7 +559,8 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { // // Note: NONE of these methods guarantee that pop_worker_request.callback will be called // with the started worker. It may be called with any fitting workers. - void StartNewWorker(const std::shared_ptr &pop_worker_request); + void StartNewWorker( + const std::shared_ptr &pop_worker_request) override; protected: void update_worker_startup_token_counter();