Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/mock/ray/gcs_client/accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ class MockNodeInfoAccessor : public NodeInfoAccessor {
public:
MOCK_METHOD(void,
RegisterSelf,
(const rpc::GcsNodeInfo &local_node_info, const StatusCallback &callback),
(rpc::GcsNodeInfo && local_node_info, const StatusCallback &callback),
(override));
MOCK_METHOD(void,
AsyncRegister,
Expand Down
7 changes: 3 additions & 4 deletions src/ray/gcs_rpc_client/accessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -468,18 +468,17 @@ bool ActorInfoAccessor::IsActorUnsubscribed(const ActorID &actor_id) {

NodeInfoAccessor::NodeInfoAccessor(GcsClient *client_impl) : client_impl_(client_impl) {}

void NodeInfoAccessor::RegisterSelf(const rpc::GcsNodeInfo &local_node_info,
void NodeInfoAccessor::RegisterSelf(rpc::GcsNodeInfo &&local_node_info,
const StatusCallback &callback) {
auto node_id = NodeID::FromBinary(local_node_info.node_id());
RAY_LOG(DEBUG).WithField(node_id)
<< "Registering node info, address is = " << local_node_info.node_manager_address();
RAY_CHECK(local_node_info.state() == rpc::GcsNodeInfo::ALIVE);
rpc::RegisterNodeRequest request;
request.mutable_node_info()->CopyFrom(local_node_info);
*request.mutable_node_info() = std::move(local_node_info);
client_impl_->GetGcsRpcClient().RegisterNode(
std::move(request),
[node_id, local_node_info, callback](const Status &status,
rpc::RegisterNodeReply &&) {
[node_id, callback](const Status &status, rpc::RegisterNodeReply &&) {
if (callback) {
callback(status);
}
Expand Down
2 changes: 1 addition & 1 deletion src/ray/gcs_rpc_client/accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ class NodeInfoAccessor {
///
/// \param node_info The information of node to register to GCS.
/// \param callback Callback that will be called when registration is complete.
virtual void RegisterSelf(const rpc::GcsNodeInfo &local_node_info,
virtual void RegisterSelf(rpc::GcsNodeInfo &&local_node_info,
const StatusCallback &callback);

/// Unregister local node to GCS asynchronously.
Expand Down
4 changes: 2 additions & 2 deletions src/ray/gcs_rpc_client/tests/gcs_client_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -375,8 +375,8 @@ class GcsClientTest : public ::testing::TestWithParam<bool> {
return WaitReady(promise.get_future(), timeout_ms_);
}

void RegisterSelf(const rpc::GcsNodeInfo &local_node_info) {
gcs_client_->Nodes().RegisterSelf(local_node_info, nullptr);
void RegisterSelf(rpc::GcsNodeInfo local_node_info) {
gcs_client_->Nodes().RegisterSelf(std::move(local_node_info), nullptr);
}

bool RegisterNode(const rpc::GcsNodeInfo &node_info) {
Expand Down
17 changes: 1 addition & 16 deletions src/ray/raylet/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ ray_cc_library(
)

ray_cc_library(
name = "node_manager",
name = "raylet_lib",
srcs = [
"node_manager.cc",
],
Expand Down Expand Up @@ -267,21 +267,6 @@ ray_cc_library(
],
)

ray_cc_library(
name = "raylet_lib",
srcs = ["raylet.cc"],
hdrs = ["raylet.h"],
visibility = ["//visibility:private"],
deps = [
":node_manager",
"//src/ray/common:asio",
"//src/ray/object_manager",
"//src/ray/util:network_util",
"//src/ray/util:time",
"@boost//:asio",
],
)

ray_cc_binary(
name = "raylet",
srcs = ["main.cc"],
Expand Down
67 changes: 44 additions & 23 deletions src/ray/raylet/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@
#include "ray/object_manager_rpc_client/object_manager_client.h"
#include "ray/raylet/local_object_manager.h"
#include "ray/raylet/local_object_manager_interface.h"
#include "ray/raylet/raylet.h"
#include "ray/raylet/node_manager.h"
#include "ray/raylet_ipc_client/client_connection.h"
#include "ray/raylet_rpc_client/raylet_client.h"
#include "ray/stats/stats.h"
#include "ray/stats/tag_defs.h"
Expand Down Expand Up @@ -326,7 +327,6 @@ int main(int argc, char *argv[]) {
gcs_client = std::make_unique<ray::gcs::GcsClient>(client_options, node_ip_address);

RAY_CHECK_OK(gcs_client->Connect(main_service));
std::unique_ptr<ray::raylet::Raylet> raylet;

ray::stats::Gauge task_by_state_counter = ray::core::GetTaskByStateGaugeMetric();
std::shared_ptr<plasma::PlasmaClient> plasma_client;
Expand Down Expand Up @@ -396,7 +396,7 @@ int main(int argc, char *argv[]) {
auto shutdown_raylet_gracefully =
[raylet_node_id,
&shutting_down,
&raylet,
&node_manager,
&main_service,
&raylet_socket_name,
&gcs_client,
Expand All @@ -412,12 +412,12 @@ int main(int argc, char *argv[]) {

auto unregister_done_callback = [&main_service,
&raylet_socket_name,
&raylet,
&node_manager,
&gcs_client,
&object_manager_rpc_threads]() {
// We should stop the service and remove the local socket
// file.
raylet->Stop();
node_manager->Stop();
gcs_client->Disconnect();
ray::stats::Shutdown();
main_service.stop();
Expand Down Expand Up @@ -912,6 +912,10 @@ int main(int argc, char *argv[]) {
};

plasma_client = std::make_shared<plasma::PlasmaClient>();
boost::asio::basic_socket_acceptor<ray::local_stream_protocol> acceptor(
main_service, ray::ParseUrlEndpoint(raylet_socket_name));
ray::local_stream_socket socket(main_service);
ray::SetCloseOnExec(acceptor);
node_manager = std::make_unique<ray::raylet::NodeManager>(
main_service,
raylet_node_id,
Expand Down Expand Up @@ -939,20 +943,9 @@ int main(int argc, char *argv[]) {
shutdown_raylet_gracefully,
std::move(add_process_to_system_cgroup_hook),
std::move(cgroup_manager),
shutting_down);

// Initialize the node manager.
raylet = std::make_unique<ray::raylet::Raylet>(main_service,
raylet_node_id,
raylet_socket_name,
node_ip_address,
node_name,
node_manager_config,
object_manager_config,
*gcs_client,
metrics_export_port,
is_head_node,
*node_manager);
shutting_down,
std::move(acceptor),
std::move(socket));

// Initializing stats should be done after the node manager is initialized because
// <explain why>. Metrics exported before this call will be buffered until `Init` is
Expand All @@ -977,20 +970,48 @@ int main(int argc, char *argv[]) {
const std::vector<ray::SourceTypeVariant> source_types = {
ray::rpc::Event_SourceType::Event_SourceType_RAYLET};
ray::RayEventInit(source_types,
{{"node_id", raylet->GetNodeId().Hex()}},
{{"node_id", raylet_node_id.Hex()}},
log_dir,
RayConfig::instance().event_level(),
RayConfig::instance().emit_event_to_log_file());
};

raylet->Start();
ray::rpc::GcsNodeInfo self_node_info;
self_node_info.set_node_id(raylet_node_id.Binary());
self_node_info.set_state(ray::rpc::GcsNodeInfo::ALIVE);
self_node_info.set_node_manager_address(node_ip_address);
self_node_info.set_node_name(node_name);
self_node_info.set_raylet_socket_name(raylet_socket_name);
self_node_info.set_object_store_socket_name(object_manager_config.store_socket_name);
self_node_info.set_object_manager_port(object_manager->GetServerPort());
self_node_info.set_node_manager_port(node_manager->GetServerPort());
self_node_info.set_node_manager_hostname(boost::asio::ip::host_name());
self_node_info.set_metrics_export_port(metrics_export_port);
self_node_info.set_runtime_env_agent_port(node_manager_config.runtime_env_agent_port);
self_node_info.mutable_state_snapshot()->set_state(ray::rpc::NodeSnapshot::ACTIVE);
auto resource_map = node_manager_config.resource_config.GetResourceMap();
self_node_info.mutable_resources_total()->insert(resource_map.begin(),
resource_map.end());
self_node_info.set_start_time_ms(ray::current_sys_time_ms());
self_node_info.set_is_head_node(is_head_node);
self_node_info.mutable_labels()->insert(node_manager_config.labels.begin(),
node_manager_config.labels.end());
// Setting up autoscaler related fields from ENV
auto instance_id = std::getenv(kNodeCloudInstanceIdEnv);
self_node_info.set_instance_id(instance_id ? instance_id : "");
auto cloud_node_type_name = std::getenv(kNodeTypeNameEnv);
self_node_info.set_node_type_name(cloud_node_type_name ? cloud_node_type_name : "");
auto instance_type_name = std::getenv(kNodeCloudInstanceTypeNameEnv);
self_node_info.set_instance_type_name(instance_type_name ? instance_type_name : "");

node_manager->Start(std::move(self_node_info));
});

auto signal_handler = [&raylet, shutdown_raylet_gracefully](
auto signal_handler = [&node_manager, shutdown_raylet_gracefully](
const boost::system::error_code &error, int signal_number) {
ray::rpc::NodeDeathInfo node_death_info;
std::optional<ray::rpc::DrainRayletRequest> drain_request =
raylet->node_manager().GetLocalDrainRequest();
node_manager->GetLocalDrainRequest();
RAY_LOG(INFO) << "received SIGTERM. Existing local drain request = "
<< (drain_request.has_value() ? drain_request->DebugString() : "None");
if (drain_request.has_value() &&
Expand Down
104 changes: 99 additions & 5 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "ray/raylet/node_manager.h"

#include <algorithm>
#include <boost/bind/bind.hpp>
#include <cctype>
#include <cerrno>
#include <csignal>
Expand Down Expand Up @@ -121,6 +122,33 @@ void CleanupProcessGroupSend(pid_t saved_pgid,
}
#endif

std::vector<std::string> GenerateEnumNames(const char *const *enum_names_ptr,
int start_index,
int end_index) {
std::vector<std::string> enum_names;
enum_names.reserve(start_index);
for (int i = 0; i < start_index; ++i) {
enum_names.emplace_back("EmptyMessageType");
}
size_t i = 0;
while (true) {
const char *name = enum_names_ptr[i];
if (name == nullptr) {
break;
}
enum_names.emplace_back(name);
i++;
}
RAY_CHECK(static_cast<size_t>(end_index) == enum_names.size() - 1)
<< "Message Type mismatch!";
return enum_names;
}

const std::vector<std::string> node_manager_message_enum =
GenerateEnumNames(ray::protocol::EnumNamesMessageType(),
static_cast<int>(ray::protocol::MessageType::MIN),
static_cast<int>(ray::protocol::MessageType::MAX));

} // namespace

NodeManager::NodeManager(
Expand Down Expand Up @@ -148,7 +176,9 @@ NodeManager::NodeManager(
std::function<void(const rpc::NodeDeathInfo &)> shutdown_raylet_gracefully,
AddProcessToCgroupHook add_process_to_system_cgroup_hook,
std::unique_ptr<CgroupManagerInterface> cgroup_manager,
std::atomic_bool &shutting_down)
std::atomic_bool &shutting_down,
boost::asio::basic_socket_acceptor<local_stream_protocol> acceptor,
local_stream_socket socket)
: self_node_id_(self_node_id),
self_node_name_(std::move(self_node_name)),
io_service_(io_service),
Expand Down Expand Up @@ -202,7 +232,9 @@ NodeManager::NodeManager(
CreateMemoryUsageRefreshCallback())),
add_process_to_system_cgroup_hook_(std::move(add_process_to_system_cgroup_hook)),
cgroup_manager_(std::move(cgroup_manager)),
shutting_down_(shutting_down) {
shutting_down_(shutting_down),
acceptor_(std::move(acceptor)),
socket_(std::move(socket)) {
RAY_LOG(INFO).WithField(kLogKeyNodeID, self_node_id_) << "Initializing NodeManager";

placement_group_resource_manager_ =
Expand Down Expand Up @@ -251,6 +283,29 @@ NodeManager::NodeManager(
"NodeManager.GCTaskFailureReason");
}

void NodeManager::Start(rpc::GcsNodeInfo &&self_node_info) {
auto register_callback =
[this,
object_manager_port = self_node_info.object_manager_port()](const Status &status) {
RAY_CHECK_OK(status);
RAY_LOG(INFO) << "Raylet of id, " << self_node_id_
<< " started. Raylet consists of node_manager and object_manager."
<< " node_manager address: "
<< BuildAddress(initial_config_.node_manager_address,
initial_config_.node_manager_port)
<< " object_manager address: "
<< BuildAddress(initial_config_.node_manager_address,
object_manager_port)
<< " hostname: " << boost::asio::ip::host_name();
this->RegisterGcs();
};
gcs_client_.Nodes().RegisterSelf(std::move(self_node_info), register_callback);

acceptor_.async_accept(
socket_,
boost::bind(&NodeManager::HandleAccept, this, boost::asio::placeholders::error));
}

void NodeManager::RegisterGcs() {
auto on_node_change = [this](const NodeID &node_id,
const rpc::GcsNodeAddressAndLiveness &data) {
Expand Down Expand Up @@ -410,6 +465,45 @@ void NodeManager::RegisterGcs() {
"NodeManager.GcsCheckAlive");
}

void NodeManager::HandleAccept(const boost::system::error_code &error) {
if (!error) {
ConnectionErrorHandler error_handler =
[this](const std::shared_ptr<ClientConnection> &client,
const boost::system::error_code &err) {
this->HandleClientConnectionError(client, err);
};

MessageHandler message_handler = [this](
const std::shared_ptr<ClientConnection> &client,
int64_t message_type,
const std::vector<uint8_t> &message) {
this->ProcessClientMessage(client, message_type, message.data());
};

// Accept a new local client and dispatch it to the node manager.
auto conn = ClientConnection::Create(message_handler,
error_handler,
std::move(socket_),
"worker",
node_manager_message_enum);

// Begin processing messages. The message handler above is expected to call this to
// continue processing messages.
conn->ProcessMessages();
} else {
RAY_LOG(ERROR) << "Raylet failed to accept new connection: " << error.message();
if (error == boost::asio::error::operation_aborted) {
// The server is being destroyed. Don't continue accepting connections.
return;
}
};

// We're ready to accept another client.
acceptor_.async_accept(
socket_,
boost::bind(&NodeManager::HandleAccept, this, boost::asio::placeholders::error));
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Socket Reuse After Move Causes Undefined State

In NodeManager::HandleAccept, the socket_ member is moved into a ClientConnection object. Immediately after, the same socket_ is reused for the next async_accept call. This use-after-move leaves the socket in an undefined state, which can lead to runtime errors and prevent subsequent client connections.

Fix in Cursor Fix in Web

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm... this does look right -- how is it working now?! Maybe the socket move constructor is fake news and just copies?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ya this looks p wrong, it's what was there before and i have no idea how it works... the move constructor is maybe just a noop? Will eventually clean up around it


void NodeManager::DestroyWorker(std::shared_ptr<WorkerInterface> worker,
rpc::WorkerExitType disconnect_type,
const std::string &disconnect_detail,
Expand Down Expand Up @@ -1298,7 +1392,6 @@ void NodeManager::HandleWorkerAvailable(const std::shared_ptr<WorkerInterface> &
}

namespace {

void SendDisconnectClientReply(const WorkerID &worker_id,
const std::shared_ptr<ClientConnection> &client) {
flatbuffers::FlatBufferBuilder fbb;
Expand Down Expand Up @@ -2821,8 +2914,8 @@ void NodeManager::Stop() {
#if !defined(_WIN32)
// Best-effort process-group cleanup for any remaining workers before shutdown.
if (RayConfig::instance().process_group_cleanup_enabled()) {
auto workers = worker_pool_.GetAllRegisteredWorkers(/* filter_dead_worker */ true,
/* filter_io_workers */ false);
auto workers = worker_pool_.GetAllRegisteredWorkers(/* filter_dead_workers=*/true,
/* filter_io_workers=*/false);
for (const auto &w : workers) {
auto saved = w->GetSavedProcessGroupId();
if (saved.has_value()) {
Expand All @@ -2840,6 +2933,7 @@ void NodeManager::Stop() {
object_manager_.Stop();
dashboard_agent_manager_.reset();
runtime_env_agent_manager_.reset();
acceptor_.close();
}

void NodeManager::RecordMetrics() {
Expand Down
Loading