Skip to content

Commit

Permalink
[core] Move checkalive to NodeInfoManager in GCS. (#29389)
Browse files Browse the repository at this point in the history
The CheckAlive is used by the dashboard and ray health-check to check the healthiness of the ray cluster and raylet.
It's implemented in HeartbeatManager which means if we disable heartbeat manager, it's not going to work.
Heartbeat manager should only do the heartbeat and health check and no other things. So this feature fit into NodeInfoManager better.

This PR just move this feature to NodeInfoManager and still keep the semantics of the rpc.
  • Loading branch information
fishbone authored Oct 19, 2022
1 parent 2bb96b7 commit 3e357c8
Show file tree
Hide file tree
Showing 10 changed files with 49 additions and 37 deletions.
6 changes: 3 additions & 3 deletions python/ray/_private/gcs_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def check_health(address: str, timeout=2, skip_version_check=False) -> bool:
req = gcs_service_pb2.CheckAliveRequest()
try:
channel = create_gcs_channel(address)
stub = gcs_service_pb2_grpc.HeartbeatInfoGcsServiceStub(channel)
stub = gcs_service_pb2_grpc.NodeInfoGcsServiceStub(channel)
resp = stub.CheckAlive(req, timeout=timeout)
except grpc.RpcError:
traceback.print_exc()
Expand Down Expand Up @@ -412,7 +412,7 @@ def _connect(self):
self._kv_stub = gcs_service_pb2_grpc.InternalKVGcsServiceStub(
self._channel.channel()
)
self._heartbeat_info_stub = gcs_service_pb2_grpc.HeartbeatInfoGcsServiceStub(
self._node_info_stub = gcs_service_pb2_grpc.NodeInfoGcsServiceStub(
self._channel.channel()
)
self._job_info_stub = gcs_service_pb2_grpc.JobInfoGcsServiceStub(
Expand All @@ -427,7 +427,7 @@ async def check_alive(
self, node_ips: List[bytes], timeout: Optional[float] = None
) -> List[bool]:
req = gcs_service_pb2.CheckAliveRequest(raylet_address=node_ips)
reply = await self._heartbeat_info_stub.CheckAlive(req, timeout=timeout)
reply = await self._node_info_stub.CheckAlive(req, timeout=timeout)

if reply.status.code != GcsCode.OK:
raise RuntimeError(
Expand Down
4 changes: 2 additions & 2 deletions src/ray/gcs/gcs_client/test/gcs_client_reconnection_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ class GcsClientReconnectionTest : public ::testing::Test {
auto channel =
grpc::CreateChannel(absl::StrCat("127.0.0.1:", config_.grpc_server_port),
grpc::InsecureChannelCredentials());
std::unique_ptr<rpc::HeartbeatInfoGcsService::Stub> stub =
rpc::HeartbeatInfoGcsService::NewStub(std::move(channel));
std::unique_ptr<rpc::NodeInfoGcsService::Stub> stub =
rpc::NodeInfoGcsService::NewStub(std::move(channel));
grpc::ClientContext context;
context.set_deadline(std::chrono::system_clock::now() + 1s);
const rpc::CheckAliveRequest request;
Expand Down
5 changes: 2 additions & 3 deletions src/ray/gcs/gcs_client/test/gcs_client_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,7 @@ class GcsClientTest : public ::testing::TestWithParam<bool> {
auto channel =
grpc::CreateChannel(absl::StrCat("127.0.0.1:", gcs_server_->GetPort()),
grpc::InsecureChannelCredentials());
std::unique_ptr<rpc::HeartbeatInfoGcsService::Stub> stub =
rpc::HeartbeatInfoGcsService::NewStub(std::move(channel));
auto stub = rpc::NodeInfoGcsService::NewStub(std::move(channel));
grpc::ClientContext context;
context.set_deadline(std::chrono::system_clock::now() + 1s);
const rpc::CheckAliveRequest request;
Expand Down Expand Up @@ -471,7 +470,7 @@ TEST_P(GcsClientTest, TestCheckAlive) {

auto channel = grpc::CreateChannel(absl::StrCat("127.0.0.1:", gcs_server_->GetPort()),
grpc::InsecureChannelCredentials());
auto stub = rpc::HeartbeatInfoGcsService::NewStub(std::move(channel));
auto stub = rpc::NodeInfoGcsService::NewStub(std::move(channel));
rpc::CheckAliveRequest request;
*(request.mutable_raylet_address()->Add()) = "172.1.2.3:31292";
*(request.mutable_raylet_address()->Add()) = "172.1.2.4:31293";
Expand Down
11 changes: 0 additions & 11 deletions src/ray/gcs/gcs_server/gcs_heartbeat_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -114,17 +114,6 @@ void GcsHeartbeatManager::HandleReportHeartbeat(
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
}

void GcsHeartbeatManager::HandleCheckAlive(rpc::CheckAliveRequest request,
rpc::CheckAliveReply *reply,
rpc::SendReplyCallback send_reply_callback) {
reply->set_ray_version(kRayVersion);
for (const auto &addr : request.raylet_address()) {
reply->mutable_raylet_alive()->Add(node_map_.right.count(addr) != 0);
}

GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
}

void GcsHeartbeatManager::DetectDeadNodes() {
std::vector<NodeID> dead_nodes;
for (auto &current : heartbeats_) {
Expand Down
5 changes: 0 additions & 5 deletions src/ray/gcs/gcs_server/gcs_heartbeat_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,6 @@ class GcsHeartbeatManager : public rpc::HeartbeatInfoHandler {
rpc::ReportHeartbeatReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

/// Handle check alive request for GCS.
void HandleCheckAlive(rpc::CheckAliveRequest request,
rpc::CheckAliveReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

/// Initialize with the gcs tables data synchronously.
/// This should be called when GCS server restarts after a failure.
///
Expand Down
16 changes: 15 additions & 1 deletion src/ray/gcs/gcs_server/gcs_node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,17 @@ void GcsNodeManager::HandleRegisterNode(rpc::RegisterNodeRequest request,
++counts_[CountType::REGISTER_NODE_REQUEST];
}

void GcsNodeManager::HandleCheckAlive(rpc::CheckAliveRequest request,
rpc::CheckAliveReply *reply,
rpc::SendReplyCallback send_reply_callback) {
reply->set_ray_version(kRayVersion);
for (const auto &addr : request.raylet_address()) {
reply->mutable_raylet_alive()->Add(node_map_.right.count(addr) != 0);
}

GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
}

void GcsNodeManager::HandleDrainNode(rpc::DrainNodeRequest request,
rpc::DrainNodeReply *reply,
rpc::SendReplyCallback send_reply_callback) {
Expand Down Expand Up @@ -181,8 +192,10 @@ void GcsNodeManager::AddNode(std::shared_ptr<rpc::GcsNodeInfo> node) {
auto node_id = NodeID::FromBinary(node->node_id());
auto iter = alive_nodes_.find(node_id);
if (iter == alive_nodes_.end()) {
auto node_addr =
node->node_manager_address() + ":" + std::to_string(node->node_manager_port());
node_map_.insert(NodeIDAddrBiMap::value_type(node_id, node_addr));
alive_nodes_.emplace(node_id, node);

// Notify all listeners.
for (auto &listener : node_added_listeners_) {
listener(node);
Expand All @@ -202,6 +215,7 @@ std::shared_ptr<rpc::GcsNodeInfo> GcsNodeManager::RemoveNode(
stats::NodeFailureTotal.Record(1);
// Remove from alive nodes.
alive_nodes_.erase(iter);
node_map_.left.erase(node_id);
if (!is_intended) {
// Broadcast a warning to all of the drivers indicating that the node
// has been marked as dead.
Expand Down
14 changes: 14 additions & 0 deletions src/ray/gcs/gcs_server/gcs_node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@

#pragma once

#include <boost/bimap.hpp>
#include <boost/bimap/unordered_set_of.hpp>

#include "absl/container/flat_hash_map.h"
#include "absl/container/flat_hash_set.h"
#include "ray/common/id.h"
Expand Down Expand Up @@ -63,6 +66,11 @@ class GcsNodeManager : public rpc::NodeInfoHandler {
rpc::GetInternalConfigReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

/// Handle check alive request for GCS.
void HandleCheckAlive(rpc::CheckAliveRequest request,
rpc::CheckAliveReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

void OnNodeFailure(const NodeID &node_id);

/// Add an alive node.
Expand Down Expand Up @@ -159,6 +167,12 @@ class GcsNodeManager : public rpc::NodeInfoHandler {
CountType_MAX = 4,
};
uint64_t counts_[CountType::CountType_MAX] = {0};

/// A map of NodeId <-> ip:port of raylet
using NodeIDAddrBiMap =
boost::bimap<boost::bimaps::unordered_set_of<NodeID, std::hash<NodeID>>,
boost::bimaps::unordered_set_of<std::string>>;
NodeIDAddrBiMap node_map_;
};

} // namespace gcs
Expand Down
4 changes: 2 additions & 2 deletions src/ray/protobuf/gcs_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,6 @@ message GetGcsSchedulingStatsReply {
service HeartbeatInfoGcsService {
// Report heartbeat of a node to GCS Service.
rpc ReportHeartbeat(ReportHeartbeatRequest) returns (ReportHeartbeatReply);
// Check alive.
rpc CheckAlive(CheckAliveRequest) returns (CheckAliveReply);
}

message AddProfileDataRequest {
Expand Down Expand Up @@ -632,5 +630,7 @@ service NodeInfoGcsService {
rpc GetAllNodeInfo(GetAllNodeInfoRequest) returns (GetAllNodeInfoReply);
// Get cluster internal config.
rpc GetInternalConfig(GetInternalConfigRequest) returns (GetInternalConfigReply);
// Check alive.
rpc CheckAlive(CheckAliveRequest) returns (CheckAliveReply);
}
///////////////////////////////////////////////////////////////////////////////
12 changes: 6 additions & 6 deletions src/ray/rpc/gcs_server/gcs_rpc_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,12 @@ class GcsRpcClient {
node_info_grpc_client_,
/*method_timeout_ms*/ -1, )

/// Check GCS is alive.
VOID_GCS_RPC_CLIENT_METHOD(NodeInfoGcsService,
CheckAlive,
node_info_grpc_client_,
/*method_timeout_ms*/ -1, )

/// Get internal config of the node from the GCS Service.
VOID_GCS_RPC_CLIENT_METHOD(NodeInfoGcsService,
GetInternalConfig,
Expand Down Expand Up @@ -360,12 +366,6 @@ class GcsRpcClient {
heartbeat_info_grpc_client_,
/*method_timeout_ms*/ -1, )

/// Check GCS is alive.
VOID_GCS_RPC_CLIENT_METHOD(HeartbeatInfoGcsService,
CheckAlive,
heartbeat_info_grpc_client_,
/*method_timeout_ms*/ -1, )

/// Add profile data to GCS Service.
VOID_GCS_RPC_CLIENT_METHOD(StatsGcsService,
AddProfileData,
Expand Down
9 changes: 5 additions & 4 deletions src/ray/rpc/gcs_server/gcs_rpc_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,10 @@ class NodeInfoGcsServiceHandler {
RegisterNodeReply *reply,
SendReplyCallback send_reply_callback) = 0;

virtual void HandleCheckAlive(CheckAliveRequest request,
CheckAliveReply *reply,
SendReplyCallback send_reply_callback) = 0;

virtual void HandleDrainNode(DrainNodeRequest request,
DrainNodeReply *reply,
SendReplyCallback send_reply_callback) = 0;
Expand Down Expand Up @@ -251,6 +255,7 @@ class NodeInfoGrpcService : public GrpcService {
NODE_INFO_SERVICE_RPC_HANDLER(DrainNode);
NODE_INFO_SERVICE_RPC_HANDLER(GetAllNodeInfo);
NODE_INFO_SERVICE_RPC_HANDLER(GetInternalConfig);
NODE_INFO_SERVICE_RPC_HANDLER(CheckAlive);
}

private:
Expand Down Expand Up @@ -322,9 +327,6 @@ class HeartbeatInfoGcsServiceHandler {
virtual void HandleReportHeartbeat(ReportHeartbeatRequest request,
ReportHeartbeatReply *reply,
SendReplyCallback send_reply_callback) = 0;
virtual void HandleCheckAlive(CheckAliveRequest request,
CheckAliveReply *reply,
SendReplyCallback send_reply_callback) = 0;
};
/// The `GrpcService` for `HeartbeatInfoGcsService`.
class HeartbeatInfoGrpcService : public GrpcService {
Expand All @@ -342,7 +344,6 @@ class HeartbeatInfoGrpcService : public GrpcService {
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
std::vector<std::unique_ptr<ServerCallFactory>> *server_call_factories) override {
HEARTBEAT_INFO_SERVICE_RPC_HANDLER(ReportHeartbeat);
HEARTBEAT_INFO_SERVICE_RPC_HANDLER(CheckAlive);
}

private:
Expand Down

0 comments on commit 3e357c8

Please sign in to comment.