Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 22 additions & 9 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ void NodeManager::Heartbeat() {
}

void NodeManager::ClientAdded(const ClientTableDataT &client_data) {
ClientID client_id = ClientID::from_binary(client_data.client_id);
const ClientID client_id = ClientID::from_binary(client_data.client_id);

RAY_LOG(DEBUG) << "[ClientAdded] received callback from client id " << client_id;
if (client_id == gcs_client_->client_table().GetLocalClientId()) {
Expand All @@ -334,21 +334,34 @@ void NodeManager::ClientAdded(const ClientTableDataT &client_data) {
return;
}

ResourceSet resources_total(client_data.resources_total_label,
client_data.resources_total_capacity);
this->cluster_resource_map_.emplace(client_id, SchedulingResources(resources_total));

// Establish a new NodeManager connection to this GCS client.
auto client_info = gcs_client_->client_table().GetClient(client_id);
RAY_LOG(DEBUG) << "[ClientAdded] CONNECTING TO: "
<< " " << client_info.node_manager_address << " "
RAY_LOG(DEBUG) << "[ClientAdded] Trying to connect to client " << client_id << " at "
<< client_info.node_manager_address << ":"
<< client_info.node_manager_port;

boost::asio::ip::tcp::socket socket(io_service_);
RAY_CHECK_OK(TcpConnect(socket, client_info.node_manager_address,
client_info.node_manager_port));
auto status =
TcpConnect(socket, client_info.node_manager_address, client_info.node_manager_port);
// A disconnected client has 2 entries in the client table (one for being
// inserted and one for being removed). When a new raylet starts, ClientAdded
// will be called with the disconnected client's first entry, which will cause
// IOError and "Connection refused".
if (!status.ok()) {
RAY_LOG(WARNING) << "Failed to connect to client " << client_id
<< " in ClientAdded. TcpConnect returned status: "
<< status.ToString() << ". This may be caused by "
<< "trying to connect to a node manager that has failed.";
return;
}

// The client is connected.
auto server_conn = TcpServerConnection(std::move(socket));
remote_server_connections_.emplace(client_id, std::move(server_conn));

ResourceSet resources_total(client_data.resources_total_label,
client_data.resources_total_capacity);
cluster_resource_map_.emplace(client_id, SchedulingResources(resources_total));
}

void NodeManager::ClientRemoved(const ClientTableDataT &client_data) {
Expand Down