diff --git a/src/mock/ray/gcs_client/accessor.h b/src/mock/ray/gcs_client/accessor.h index 53ce06c5cfe3..d2c30ba65754 100644 --- a/src/mock/ray/gcs_client/accessor.h +++ b/src/mock/ray/gcs_client/accessor.h @@ -118,7 +118,7 @@ class MockNodeInfoAccessor : public NodeInfoAccessor { public: MOCK_METHOD(void, RegisterSelf, - (const rpc::GcsNodeInfo &local_node_info, const StatusCallback &callback), + (rpc::GcsNodeInfo && local_node_info, const StatusCallback &callback), (override)); MOCK_METHOD(void, AsyncRegister, diff --git a/src/ray/gcs_rpc_client/accessor.cc b/src/ray/gcs_rpc_client/accessor.cc index cb5dfa62e41a..afb5380685ba 100644 --- a/src/ray/gcs_rpc_client/accessor.cc +++ b/src/ray/gcs_rpc_client/accessor.cc @@ -468,18 +468,17 @@ bool ActorInfoAccessor::IsActorUnsubscribed(const ActorID &actor_id) { NodeInfoAccessor::NodeInfoAccessor(GcsClient *client_impl) : client_impl_(client_impl) {} -void NodeInfoAccessor::RegisterSelf(const rpc::GcsNodeInfo &local_node_info, +void NodeInfoAccessor::RegisterSelf(rpc::GcsNodeInfo &&local_node_info, const StatusCallback &callback) { auto node_id = NodeID::FromBinary(local_node_info.node_id()); RAY_LOG(DEBUG).WithField(node_id) << "Registering node info, address is = " << local_node_info.node_manager_address(); RAY_CHECK(local_node_info.state() == rpc::GcsNodeInfo::ALIVE); rpc::RegisterNodeRequest request; - request.mutable_node_info()->CopyFrom(local_node_info); + *request.mutable_node_info() = std::move(local_node_info); client_impl_->GetGcsRpcClient().RegisterNode( std::move(request), - [node_id, local_node_info, callback](const Status &status, - rpc::RegisterNodeReply &&) { + [node_id, callback](const Status &status, rpc::RegisterNodeReply &&) { if (callback) { callback(status); } diff --git a/src/ray/gcs_rpc_client/accessor.h b/src/ray/gcs_rpc_client/accessor.h index 5bef125c0d89..a193506c5ed8 100644 --- a/src/ray/gcs_rpc_client/accessor.h +++ b/src/ray/gcs_rpc_client/accessor.h @@ -302,7 +302,7 @@ class NodeInfoAccessor { /// /// \param node_info The information of node to register to GCS. /// \param callback Callback that will be called when registration is complete. - virtual void RegisterSelf(const rpc::GcsNodeInfo &local_node_info, + virtual void RegisterSelf(rpc::GcsNodeInfo &&local_node_info, const StatusCallback &callback); /// Unregister local node to GCS asynchronously. diff --git a/src/ray/gcs_rpc_client/tests/gcs_client_test.cc b/src/ray/gcs_rpc_client/tests/gcs_client_test.cc index 149d6501aadc..0d7fe2a71b73 100644 --- a/src/ray/gcs_rpc_client/tests/gcs_client_test.cc +++ b/src/ray/gcs_rpc_client/tests/gcs_client_test.cc @@ -375,8 +375,8 @@ class GcsClientTest : public ::testing::TestWithParam { return WaitReady(promise.get_future(), timeout_ms_); } - void RegisterSelf(const rpc::GcsNodeInfo &local_node_info) { - gcs_client_->Nodes().RegisterSelf(local_node_info, nullptr); + void RegisterSelf(rpc::GcsNodeInfo local_node_info) { + gcs_client_->Nodes().RegisterSelf(std::move(local_node_info), nullptr); } bool RegisterNode(const rpc::GcsNodeInfo &node_info) { diff --git a/src/ray/raylet/BUILD.bazel b/src/ray/raylet/BUILD.bazel index 24b63d9d604e..5bff3c75d069 100644 --- a/src/ray/raylet/BUILD.bazel +++ b/src/ray/raylet/BUILD.bazel @@ -208,7 +208,7 @@ ray_cc_library( ) ray_cc_library( - name = "node_manager", + name = "raylet_lib", srcs = [ "node_manager.cc", ], @@ -267,21 +267,6 @@ ray_cc_library( ], ) -ray_cc_library( - name = "raylet_lib", - srcs = ["raylet.cc"], - hdrs = ["raylet.h"], - visibility = ["//visibility:private"], - deps = [ - ":node_manager", - "//src/ray/common:asio", - "//src/ray/object_manager", - "//src/ray/util:network_util", - "//src/ray/util:time", - "@boost//:asio", - ], -) - ray_cc_binary( name = "raylet", srcs = ["main.cc"], diff --git a/src/ray/raylet/main.cc b/src/ray/raylet/main.cc index 54c14f411601..8ebae01e05a3 100644 --- a/src/ray/raylet/main.cc +++ b/src/ray/raylet/main.cc @@ -42,7 +42,8 @@ #include "ray/object_manager_rpc_client/object_manager_client.h" #include "ray/raylet/local_object_manager.h" #include "ray/raylet/local_object_manager_interface.h" -#include "ray/raylet/raylet.h" +#include "ray/raylet/node_manager.h" +#include "ray/raylet_ipc_client/client_connection.h" #include "ray/raylet_rpc_client/raylet_client.h" #include "ray/stats/stats.h" #include "ray/stats/tag_defs.h" @@ -326,7 +327,6 @@ int main(int argc, char *argv[]) { gcs_client = std::make_unique(client_options, node_ip_address); RAY_CHECK_OK(gcs_client->Connect(main_service)); - std::unique_ptr raylet; ray::stats::Gauge task_by_state_counter = ray::core::GetTaskByStateGaugeMetric(); std::shared_ptr plasma_client; @@ -396,7 +396,7 @@ int main(int argc, char *argv[]) { auto shutdown_raylet_gracefully = [raylet_node_id, &shutting_down, - &raylet, + &node_manager, &main_service, &raylet_socket_name, &gcs_client, @@ -412,12 +412,12 @@ int main(int argc, char *argv[]) { auto unregister_done_callback = [&main_service, &raylet_socket_name, - &raylet, + &node_manager, &gcs_client, &object_manager_rpc_threads]() { // We should stop the service and remove the local socket // file. - raylet->Stop(); + node_manager->Stop(); gcs_client->Disconnect(); ray::stats::Shutdown(); main_service.stop(); @@ -912,6 +912,10 @@ int main(int argc, char *argv[]) { }; plasma_client = std::make_shared(); + boost::asio::basic_socket_acceptor acceptor( + main_service, ray::ParseUrlEndpoint(raylet_socket_name)); + ray::local_stream_socket socket(main_service); + ray::SetCloseOnExec(acceptor); node_manager = std::make_unique( main_service, raylet_node_id, @@ -939,20 +943,9 @@ int main(int argc, char *argv[]) { shutdown_raylet_gracefully, std::move(add_process_to_system_cgroup_hook), std::move(cgroup_manager), - shutting_down); - - // Initialize the node manager. - raylet = std::make_unique(main_service, - raylet_node_id, - raylet_socket_name, - node_ip_address, - node_name, - node_manager_config, - object_manager_config, - *gcs_client, - metrics_export_port, - is_head_node, - *node_manager); + shutting_down, + std::move(acceptor), + std::move(socket)); // Initializing stats should be done after the node manager is initialized because // . Metrics exported before this call will be buffered until `Init` is @@ -977,20 +970,48 @@ int main(int argc, char *argv[]) { const std::vector source_types = { ray::rpc::Event_SourceType::Event_SourceType_RAYLET}; ray::RayEventInit(source_types, - {{"node_id", raylet->GetNodeId().Hex()}}, + {{"node_id", raylet_node_id.Hex()}}, log_dir, RayConfig::instance().event_level(), RayConfig::instance().emit_event_to_log_file()); }; - raylet->Start(); + ray::rpc::GcsNodeInfo self_node_info; + self_node_info.set_node_id(raylet_node_id.Binary()); + self_node_info.set_state(ray::rpc::GcsNodeInfo::ALIVE); + self_node_info.set_node_manager_address(node_ip_address); + self_node_info.set_node_name(node_name); + self_node_info.set_raylet_socket_name(raylet_socket_name); + self_node_info.set_object_store_socket_name(object_manager_config.store_socket_name); + self_node_info.set_object_manager_port(object_manager->GetServerPort()); + self_node_info.set_node_manager_port(node_manager->GetServerPort()); + self_node_info.set_node_manager_hostname(boost::asio::ip::host_name()); + self_node_info.set_metrics_export_port(metrics_export_port); + self_node_info.set_runtime_env_agent_port(node_manager_config.runtime_env_agent_port); + self_node_info.mutable_state_snapshot()->set_state(ray::rpc::NodeSnapshot::ACTIVE); + auto resource_map = node_manager_config.resource_config.GetResourceMap(); + self_node_info.mutable_resources_total()->insert(resource_map.begin(), + resource_map.end()); + self_node_info.set_start_time_ms(ray::current_sys_time_ms()); + self_node_info.set_is_head_node(is_head_node); + self_node_info.mutable_labels()->insert(node_manager_config.labels.begin(), + node_manager_config.labels.end()); + // Setting up autoscaler related fields from ENV + auto instance_id = std::getenv(kNodeCloudInstanceIdEnv); + self_node_info.set_instance_id(instance_id ? instance_id : ""); + auto cloud_node_type_name = std::getenv(kNodeTypeNameEnv); + self_node_info.set_node_type_name(cloud_node_type_name ? cloud_node_type_name : ""); + auto instance_type_name = std::getenv(kNodeCloudInstanceTypeNameEnv); + self_node_info.set_instance_type_name(instance_type_name ? instance_type_name : ""); + + node_manager->Start(std::move(self_node_info)); }); - auto signal_handler = [&raylet, shutdown_raylet_gracefully]( + auto signal_handler = [&node_manager, shutdown_raylet_gracefully]( const boost::system::error_code &error, int signal_number) { ray::rpc::NodeDeathInfo node_death_info; std::optional drain_request = - raylet->node_manager().GetLocalDrainRequest(); + node_manager->GetLocalDrainRequest(); RAY_LOG(INFO) << "received SIGTERM. Existing local drain request = " << (drain_request.has_value() ? drain_request->DebugString() : "None"); if (drain_request.has_value() && diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 9731264347f4..9af9c8251063 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -15,6 +15,7 @@ #include "ray/raylet/node_manager.h" #include +#include #include #include #include @@ -121,6 +122,33 @@ void CleanupProcessGroupSend(pid_t saved_pgid, } #endif +std::vector GenerateEnumNames(const char *const *enum_names_ptr, + int start_index, + int end_index) { + std::vector enum_names; + enum_names.reserve(start_index); + for (int i = 0; i < start_index; ++i) { + enum_names.emplace_back("EmptyMessageType"); + } + size_t i = 0; + while (true) { + const char *name = enum_names_ptr[i]; + if (name == nullptr) { + break; + } + enum_names.emplace_back(name); + i++; + } + RAY_CHECK(static_cast(end_index) == enum_names.size() - 1) + << "Message Type mismatch!"; + return enum_names; +} + +const std::vector node_manager_message_enum = + GenerateEnumNames(ray::protocol::EnumNamesMessageType(), + static_cast(ray::protocol::MessageType::MIN), + static_cast(ray::protocol::MessageType::MAX)); + } // namespace NodeManager::NodeManager( @@ -148,7 +176,9 @@ NodeManager::NodeManager( std::function shutdown_raylet_gracefully, AddProcessToCgroupHook add_process_to_system_cgroup_hook, std::unique_ptr cgroup_manager, - std::atomic_bool &shutting_down) + std::atomic_bool &shutting_down, + boost::asio::basic_socket_acceptor acceptor, + local_stream_socket socket) : self_node_id_(self_node_id), self_node_name_(std::move(self_node_name)), io_service_(io_service), @@ -202,7 +232,9 @@ NodeManager::NodeManager( CreateMemoryUsageRefreshCallback())), add_process_to_system_cgroup_hook_(std::move(add_process_to_system_cgroup_hook)), cgroup_manager_(std::move(cgroup_manager)), - shutting_down_(shutting_down) { + shutting_down_(shutting_down), + acceptor_(std::move(acceptor)), + socket_(std::move(socket)) { RAY_LOG(INFO).WithField(kLogKeyNodeID, self_node_id_) << "Initializing NodeManager"; placement_group_resource_manager_ = @@ -251,6 +283,29 @@ NodeManager::NodeManager( "NodeManager.GCTaskFailureReason"); } +void NodeManager::Start(rpc::GcsNodeInfo &&self_node_info) { + auto register_callback = + [this, + object_manager_port = self_node_info.object_manager_port()](const Status &status) { + RAY_CHECK_OK(status); + RAY_LOG(INFO) << "Raylet of id, " << self_node_id_ + << " started. Raylet consists of node_manager and object_manager." + << " node_manager address: " + << BuildAddress(initial_config_.node_manager_address, + initial_config_.node_manager_port) + << " object_manager address: " + << BuildAddress(initial_config_.node_manager_address, + object_manager_port) + << " hostname: " << boost::asio::ip::host_name(); + this->RegisterGcs(); + }; + gcs_client_.Nodes().RegisterSelf(std::move(self_node_info), register_callback); + + acceptor_.async_accept( + socket_, + boost::bind(&NodeManager::HandleAccept, this, boost::asio::placeholders::error)); +} + void NodeManager::RegisterGcs() { auto on_node_change = [this](const NodeID &node_id, const rpc::GcsNodeAddressAndLiveness &data) { @@ -410,6 +465,45 @@ void NodeManager::RegisterGcs() { "NodeManager.GcsCheckAlive"); } +void NodeManager::HandleAccept(const boost::system::error_code &error) { + if (!error) { + ConnectionErrorHandler error_handler = + [this](const std::shared_ptr &client, + const boost::system::error_code &err) { + this->HandleClientConnectionError(client, err); + }; + + MessageHandler message_handler = [this]( + const std::shared_ptr &client, + int64_t message_type, + const std::vector &message) { + this->ProcessClientMessage(client, message_type, message.data()); + }; + + // Accept a new local client and dispatch it to the node manager. + auto conn = ClientConnection::Create(message_handler, + error_handler, + std::move(socket_), + "worker", + node_manager_message_enum); + + // Begin processing messages. The message handler above is expected to call this to + // continue processing messages. + conn->ProcessMessages(); + } else { + RAY_LOG(ERROR) << "Raylet failed to accept new connection: " << error.message(); + if (error == boost::asio::error::operation_aborted) { + // The server is being destroyed. Don't continue accepting connections. + return; + } + }; + + // We're ready to accept another client. + acceptor_.async_accept( + socket_, + boost::bind(&NodeManager::HandleAccept, this, boost::asio::placeholders::error)); +} + void NodeManager::DestroyWorker(std::shared_ptr worker, rpc::WorkerExitType disconnect_type, const std::string &disconnect_detail, @@ -1298,7 +1392,6 @@ void NodeManager::HandleWorkerAvailable(const std::shared_ptr & } namespace { - void SendDisconnectClientReply(const WorkerID &worker_id, const std::shared_ptr &client) { flatbuffers::FlatBufferBuilder fbb; @@ -2821,8 +2914,8 @@ void NodeManager::Stop() { #if !defined(_WIN32) // Best-effort process-group cleanup for any remaining workers before shutdown. if (RayConfig::instance().process_group_cleanup_enabled()) { - auto workers = worker_pool_.GetAllRegisteredWorkers(/* filter_dead_worker */ true, - /* filter_io_workers */ false); + auto workers = worker_pool_.GetAllRegisteredWorkers(/* filter_dead_workers=*/true, + /* filter_io_workers=*/false); for (const auto &w : workers) { auto saved = w->GetSavedProcessGroupId(); if (saved.has_value()) { @@ -2840,6 +2933,7 @@ void NodeManager::Stop() { object_manager_.Stop(); dashboard_agent_manager_.reset(); runtime_env_agent_manager_.reset(); + acceptor_.close(); } void NodeManager::RecordMetrics() { diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 64315a8205df..b0fce2fd8d60 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -163,15 +163,11 @@ class NodeManager : public rpc::NodeManagerServiceHandler, std::function shutdown_raylet_gracefully, AddProcessToCgroupHook add_process_to_system_cgroup_hook, std::unique_ptr cgroup_manager, - std::atomic_bool &shutting_down); + std::atomic_bool &shutting_down, + boost::asio::basic_socket_acceptor acceptor, + local_stream_socket socket); - /// Handle an unexpected error that occurred on a client connection. - /// The client will be disconnected and no more messages will be processed. - /// - /// \param client The client whose connection the error occurred on. - /// \param error The error details. - void HandleClientConnectionError(const std::shared_ptr &client, - const boost::system::error_code &error); + void Start(rpc::GcsNodeInfo &&self_node_info); /// Process a message from a client. This method is responsible for /// explicitly listening for more messages from the client if the client is @@ -209,10 +205,6 @@ class NodeManager : public rpc::NodeManagerServiceHandler, std::optional CreateSyncMessage( int64_t after_version, syncer::MessageType message_type) const override; - int GetObjectManagerPort() const { return object_manager_.GetServerPort(); } - - LocalObjectManagerInterface &GetLocalObjectManager() { return local_object_manager_; } - /// Trigger global GC across the cluster to free up references to actors or /// object ids. void TriggerGlobalGC(); @@ -310,6 +302,17 @@ class NodeManager : public rpc::NodeManagerServiceHandler, private: FRIEND_TEST(NodeManagerStaticTest, TestHandleReportWorkerBacklog); + /// Handle an accepted client connection. + void HandleAccept(const boost::system::error_code &error); + + /// Handle an unexpected error that occurred on a client connection. + /// The client will be disconnected and no more messages will be processed. + /// + /// \param client The client whose connection the error occurred on. + /// \param error The error details. + void HandleClientConnectionError(const std::shared_ptr &client, + const boost::system::error_code &error); + // Removes the worker from node_manager's leased_workers_ map. // Warning: this does NOT release the worker's resources, or put the leased worker // back to the worker pool, or destroy the worker. The caller must handle the worker's @@ -894,6 +897,12 @@ class NodeManager : public rpc::NodeManagerServiceHandler, std::unique_ptr cgroup_manager_; std::atomic_bool &shutting_down_; + + /// An acceptor for new clients. + boost::asio::basic_socket_acceptor acceptor_; + + /// The socket to listen on for new clients. + local_stream_socket socket_; }; } // namespace ray::raylet diff --git a/src/ray/raylet/raylet.cc b/src/ray/raylet/raylet.cc deleted file mode 100644 index 555646d77858..000000000000 --- a/src/ray/raylet/raylet.cc +++ /dev/null @@ -1,184 +0,0 @@ -// Copyright 2017 The Ray Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "ray/raylet/raylet.h" - -#include -#include -#include -#include -#include -#include -#include - -#include "ray/common/scheduling/resource_set.h" -#include "ray/common/status.h" -#include "ray/object_manager/object_manager.h" -#include "ray/raylet_ipc_client/client_connection.h" -#include "ray/util/network_util.h" -#include "ray/util/time.h" - -namespace { - -std::vector GenerateEnumNames(const char *const *enum_names_ptr, - int start_index, - int end_index) { - std::vector enum_names; - enum_names.reserve(start_index); - for (int i = 0; i < start_index; ++i) { - enum_names.emplace_back("EmptyMessageType"); - } - size_t i = 0; - while (true) { - const char *name = enum_names_ptr[i]; - if (name == nullptr) { - break; - } - enum_names.emplace_back(name); - i++; - } - RAY_CHECK(static_cast(end_index) == enum_names.size() - 1) - << "Message Type mismatch!"; - return enum_names; -} - -const std::vector node_manager_message_enum = - GenerateEnumNames(ray::protocol::EnumNamesMessageType(), - static_cast(ray::protocol::MessageType::MIN), - static_cast(ray::protocol::MessageType::MAX)); -} // namespace - -namespace ray::raylet { - -Raylet::Raylet(instrumented_io_context &main_service, - const NodeID &self_node_id, - const std::string &socket_name, - const std::string &node_ip_address, - const std::string &node_name, - const NodeManagerConfig &node_manager_config, - const ObjectManagerConfig &object_manager_config, - gcs::GcsClient &gcs_client, - int metrics_export_port, - bool is_head_node, - NodeManager &node_manager) - : self_node_id_(self_node_id), - gcs_client_(gcs_client), - node_manager_(node_manager), - socket_name_(socket_name), - acceptor_(main_service, ParseUrlEndpoint(socket_name)), - socket_(main_service) { - SetCloseOnExec(acceptor_); - self_node_info_.set_node_id(self_node_id_.Binary()); - self_node_info_.set_state(GcsNodeInfo::ALIVE); - self_node_info_.set_node_manager_address(node_ip_address); - self_node_info_.set_node_name(node_name); - self_node_info_.set_raylet_socket_name(socket_name); - self_node_info_.set_object_store_socket_name(object_manager_config.store_socket_name); - self_node_info_.set_object_manager_port(node_manager_.GetObjectManagerPort()); - self_node_info_.set_node_manager_port(node_manager_.GetServerPort()); - self_node_info_.set_node_manager_hostname(boost::asio::ip::host_name()); - self_node_info_.set_metrics_export_port(metrics_export_port); - self_node_info_.set_runtime_env_agent_port(node_manager_config.runtime_env_agent_port); - self_node_info_.mutable_state_snapshot()->set_state(NodeSnapshot::ACTIVE); - auto resource_map = node_manager_config.resource_config.GetResourceMap(); - self_node_info_.mutable_resources_total()->insert(resource_map.begin(), - resource_map.end()); - self_node_info_.set_start_time_ms(current_sys_time_ms()); - self_node_info_.set_is_head_node(is_head_node); - self_node_info_.mutable_labels()->insert(node_manager_config.labels.begin(), - node_manager_config.labels.end()); - - // Setting up autoscaler related fields from ENV - auto instance_id = std::getenv(kNodeCloudInstanceIdEnv); - self_node_info_.set_instance_id(instance_id ? instance_id : ""); - auto cloud_node_type_name = std::getenv(kNodeTypeNameEnv); - self_node_info_.set_node_type_name(cloud_node_type_name ? cloud_node_type_name : ""); - auto instance_type_name = std::getenv(kNodeCloudInstanceTypeNameEnv); - self_node_info_.set_instance_type_name(instance_type_name ? instance_type_name : ""); -} - -void Raylet::Start() { - RegisterGcs(); - - // Start listening for clients. - DoAccept(); -} - -void Raylet::Stop() { - node_manager_.Stop(); - acceptor_.close(); -} - -void Raylet::RegisterGcs() { - auto register_callback = [this](const Status &status) { - RAY_CHECK_OK(status); - RAY_LOG(INFO) << "Raylet of id, " << self_node_id_ - << " started. Raylet consists of node_manager and object_manager." - << " node_manager address: " - << BuildAddress(self_node_info_.node_manager_address(), - self_node_info_.node_manager_port()) - << " object_manager address: " - << BuildAddress(self_node_info_.node_manager_address(), - self_node_info_.object_manager_port()) - << " hostname: " << self_node_info_.node_manager_hostname(); - node_manager_.RegisterGcs(); - }; - - gcs_client_.Nodes().RegisterSelf(self_node_info_, register_callback); -} - -void Raylet::DoAccept() { - acceptor_.async_accept( - socket_, - boost::bind(&Raylet::HandleAccept, this, boost::asio::placeholders::error)); -} - -void Raylet::HandleAccept(const boost::system::error_code &error) { - if (!error) { - ConnectionErrorHandler error_handler = - [this](const std::shared_ptr &client, - const boost::system::error_code &err) { - node_manager_.HandleClientConnectionError(client, err); - }; - - MessageHandler message_handler = [this]( - const std::shared_ptr &client, - int64_t message_type, - const std::vector &message) { - node_manager_.ProcessClientMessage(client, message_type, message.data()); - }; - - // Accept a new local client and dispatch it to the node manager. - auto conn = ClientConnection::Create(message_handler, - error_handler, - std::move(socket_), - "worker", - node_manager_message_enum); - - // Begin processing messages. The message handler above is expected to call this to - // continue processing messages. - conn->ProcessMessages(); - } else { - RAY_LOG(ERROR) << "Raylet failed to accept new connection: " << error.message(); - if (error == boost::asio::error::operation_aborted) { - // The server is being destroyed. Don't continue accepting connections. - return; - } - }; - - // We're ready to accept another client. - DoAccept(); -} - -} // namespace ray::raylet diff --git a/src/ray/raylet/raylet.h b/src/ray/raylet/raylet.h deleted file mode 100644 index 441634e17c26..000000000000 --- a/src/ray/raylet/raylet.h +++ /dev/null @@ -1,98 +0,0 @@ -// Copyright 2017 The Ray Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once - -#include -#include -#include - -#include "ray/common/asio/instrumented_io_context.h" -#include "ray/object_manager/object_manager.h" -#include "ray/raylet/node_manager.h" - -namespace ray::raylet { - -using rpc::GcsNodeInfo; -using rpc::NodeSnapshot; - -class NodeManager; - -class Raylet { - public: - /// Create a raylet server and listen for local clients. - /// - /// \param main_service The event loop to run the server on. - /// \param object_manager_service The asio io_service tied to the object manager. - /// \param socket_name The Unix domain socket to listen on for local clients. - /// \param node_ip_address The IP address of this node. - /// \param node_manager_config Configuration to initialize the node manager. - /// scheduler with. - /// \param object_manager_config Configuration to initialize the object - /// manager. - /// \param gcs_client A client connection to the GCS. - /// \param metrics_export_port A port at which metrics are exposed to. - /// \param is_head_node Whether this node is the head node. - Raylet(instrumented_io_context &main_service, - const NodeID &self_node_id, - const std::string &socket_name, - const std::string &node_ip_address, - const std::string &node_name, - const NodeManagerConfig &node_manager_config, - const ObjectManagerConfig &object_manager_config, - gcs::GcsClient &gcs_client, - int metrics_export_port, - bool is_head_node, - NodeManager &node_manager); - - /// Start this raylet. - void Start(); - - /// Stop this raylet. - void Stop(); - - NodeID GetNodeId() const { return self_node_id_; } - - NodeManager &node_manager() { return node_manager_; } - - private: - /// Register GCS client. - void RegisterGcs(); - - /// Accept a client connection. - void DoAccept(); - /// Handle an accepted client connection. - void HandleAccept(const boost::system::error_code &error); - - friend class TestObjectManagerIntegration; - - /// ID of this node. - NodeID self_node_id_; - /// Information of this node. - GcsNodeInfo self_node_info_; - - /// A client connection to the GCS. - gcs::GcsClient &gcs_client_; - /// Manages client requests for task submission and execution. - NodeManager &node_manager_; - /// The name of the socket this raylet listens on. - std::string socket_name_; - - /// An acceptor for new clients. - boost::asio::basic_socket_acceptor acceptor_; - /// The socket to listen on for new clients. - local_stream_socket socket_; -}; - -} // namespace ray::raylet diff --git a/src/ray/raylet/tests/BUILD.bazel b/src/ray/raylet/tests/BUILD.bazel index 2e076815de52..d19a35475dd4 100644 --- a/src/ray/raylet/tests/BUILD.bazel +++ b/src/ray/raylet/tests/BUILD.bazel @@ -163,7 +163,7 @@ ray_cc_test( "//src/ray/observability:fake_metric", "//src/ray/pubsub:fake_subscriber", "//src/ray/raylet:local_object_manager_interface", - "//src/ray/raylet:node_manager", + "//src/ray/raylet:raylet_lib", "//src/ray/raylet/scheduling:cluster_lease_manager", "//src/ray/raylet_rpc_client:fake_raylet_client", "//src/ray/rpc:utils", diff --git a/src/ray/raylet/tests/node_manager_test.cc b/src/ray/raylet/tests/node_manager_test.cc index 089912d1a33e..4b92014c1594 100644 --- a/src/ray/raylet/tests/node_manager_test.cc +++ b/src/ray/raylet/tests/node_manager_test.cc @@ -422,7 +422,9 @@ class NodeManagerTest : public ::testing::Test { [](const auto &) {}, [](const std::string &) {}, nullptr, - shutting_down_); + shutting_down_, + boost::asio::basic_socket_acceptor(io_service_), + boost::asio::basic_stream_socket(io_service_)); } instrumented_io_context io_service_;