Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions python/ray/ray_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def env_integer(key, default):
LOG_MONITOR_DIED_ERROR = "log_monitor_died"
REPORTER_DIED_ERROR = "reporter_died"
DASHBOARD_DIED_ERROR = "dashboard_died"
RAYLET_CONNECTION_ERROR = "raylet_connection_error"

# Abort autoscaling if more than this number of errors are encountered. This
# is a safety feature to prevent e.g. runaway node launches.
Expand Down
8 changes: 5 additions & 3 deletions python/ray/tests/cluster_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,19 +102,21 @@ def add_node(self, **node_args):

return node

def remove_node(self, node):
def remove_node(self, node, allow_graceful=False):
"""Kills all processes associated with worker node.

Args:
node (Node): Worker node of which all associated processes
will be removed.
"""
if self.head_node == node:
self.head_node.kill_all_processes(check_alive=False)
self.head_node.kill_all_processes(
check_alive=False, allow_graceful=allow_graceful)
self.head_node = None
# TODO(rliaw): Do we need to kill all worker processes?
else:
node.kill_all_processes(check_alive=False)
node.kill_all_processes(
check_alive=False, allow_graceful=allow_graceful)
self.worker_nodes.remove(node)

assert not node.any_processes_alive(), (
Expand Down
28 changes: 28 additions & 0 deletions python/ray/tests/test_failure.py
Original file line number Diff line number Diff line change
Expand Up @@ -722,3 +722,31 @@ def sleep_to_kill_raylet():
with pytest.raises(Exception, match=r".*Connection closed unexpectedly.*"):
ray.get(nonexistent_id)
thread.join()


def test_connect_with_disconnected_node(shutdown_only):
config = json.dumps({
"num_heartbeats_timeout": 50,
"heartbeat_timeout_milliseconds": 10,
})
cluster = Cluster()
cluster.add_node(num_cpus=0, _internal_config=config)
ray.init(redis_address=cluster.redis_address)
info = relevant_errors(ray_constants.REMOVED_NODE_ERROR)
assert len(info) == 0
# This node is killed by SIGKILL, ray_monitor will mark it to dead.
dead_node = cluster.add_node(num_cpus=0, _internal_config=config)
cluster.remove_node(dead_node, allow_graceful=False)
wait_for_errors(ray_constants.REMOVED_NODE_ERROR, 1, timeout=2)
# This node is killed by SIGKILL, ray_monitor will mark it to dead.
dead_node = cluster.add_node(num_cpus=0, _internal_config=config)
cluster.remove_node(dead_node, allow_graceful=False)
wait_for_errors(ray_constants.REMOVED_NODE_ERROR, 2, timeout=2)
# This node is killed by SIGTERM, ray_monitor will not mark it again.
removing_node = cluster.add_node(num_cpus=0, _internal_config=config)
cluster.remove_node(removing_node, allow_graceful=True)
with pytest.raises(Exception, match=('Timing out of wait.')):
wait_for_errors(ray_constants.REMOVED_NODE_ERROR, 3, timeout=2)
# There is no connection error to a dead node.
info = relevant_errors(ray_constants.RAYLET_CONNECTION_ERROR)
assert len(info) == 0
34 changes: 30 additions & 4 deletions src/ray/gcs/tables.cc
Original file line number Diff line number Diff line change
Expand Up @@ -412,8 +412,26 @@ Status ClientTable::Connect(const ClientTableDataT &local_client) {
AsyncGcsClient *client, const UniqueID &log_key,
const std::vector<ClientTableDataT> &notifications) {
RAY_CHECK(log_key == client_log_key_);
std::unordered_map<std::string, ClientTableDataT> connected_nodes;
std::unordered_map<std::string, ClientTableDataT> disconnected_nodes;
for (auto &notification : notifications) {
HandleNotification(client, notification);
// This is temporary fix for Issue 4140 to avoid connect to dead nodes.
// TODO(yuhguo): remove this temporary fix after GCS entry is removable.
if (notification.is_insertion) {
connected_nodes.emplace(notification.client_id, notification);
} else {
auto iter = connected_nodes.find(notification.client_id);
if (iter != connected_nodes.end()) {
connected_nodes.erase(iter);
}
disconnected_nodes.emplace(notification.client_id, notification);
}
}
for (const auto &pair : connected_nodes) {
HandleNotification(client, pair.second);
}
for (const auto &pair : disconnected_nodes) {
HandleNotification(client, pair.second);
}
};
// Callback to request notifications from the client table once we've
Expand All @@ -428,13 +446,16 @@ Status ClientTable::Connect(const ClientTableDataT &local_client) {
return Append(JobID::nil(), client_log_key_, data, add_callback);
}

Status ClientTable::Disconnect() {
Status ClientTable::Disconnect(const DisconnectCallback &callback) {
auto data = std::make_shared<ClientTableDataT>(local_client_);
data->is_insertion = false;
auto add_callback = [this](AsyncGcsClient *client, const ClientID &id,
const ClientTableDataT &data) {
auto add_callback = [this, callback](AsyncGcsClient *client, const ClientID &id,
const ClientTableDataT &data) {
HandleConnected(client, data);
RAY_CHECK_OK(CancelNotifications(JobID::nil(), client_log_key_, id));
if (callback != nullptr) {
callback();
}
};
RAY_RETURN_NOT_OK(Append(JobID::nil(), client_log_key_, data, add_callback));
// We successfully added the deletion entry. Mark ourselves as disconnected.
Expand Down Expand Up @@ -464,6 +485,11 @@ const std::unordered_map<ClientID, ClientTableDataT> &ClientTable::GetAllClients
return client_cache_;
}

Status ClientTable::Lookup(const Callback &lookup) {
RAY_CHECK(lookup != nullptr);
return Log::Lookup(JobID::nil(), client_log_key_, lookup);
}

std::string ClientTable::DebugString() const {
std::stringstream result;
result << Log<UniqueID, ClientTableData>::DebugString();
Expand Down
10 changes: 9 additions & 1 deletion src/ray/gcs/tables.h
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,7 @@ class ClientTable : private Log<UniqueID, ClientTableData> {
public:
using ClientTableCallback = std::function<void(
AsyncGcsClient *client, const ClientID &id, const ClientTableDataT &data)>;
using DisconnectCallback = std::function<void(void)>;
ClientTable(const std::vector<std::shared_ptr<RedisContext>> &contexts,
AsyncGcsClient *client, const ClientID &client_id)
: Log(contexts, client),
Expand Down Expand Up @@ -606,7 +607,7 @@ class ClientTable : private Log<UniqueID, ClientTableData> {
/// registration should never be reused after disconnecting.
///
/// \return Status
ray::Status Disconnect();
ray::Status Disconnect(const DisconnectCallback &callback = nullptr);

/// Mark a different client as disconnected. The client ID should never be
/// reused for a new client.
Expand Down Expand Up @@ -656,6 +657,13 @@ class ClientTable : private Log<UniqueID, ClientTableData> {
/// \return The client ID to client information map.
const std::unordered_map<ClientID, ClientTableDataT> &GetAllClients() const;

/// Lookup the client data in the client table.
///
/// \param lookup Callback that is called after lookup. If the callback is
/// called with an empty vector, then there was no data at the key.
/// \return Status.
Status Lookup(const Callback &lookup);

/// Returns debug string for class.
///
/// \return string.
Expand Down
23 changes: 18 additions & 5 deletions src/ray/raylet/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -127,17 +127,30 @@ int main(int argc, char *argv[]) {
RAY_LOG(DEBUG) << "Initializing GCS client "
<< gcs_client->client_table().GetLocalClientId();

ray::raylet::Raylet server(main_service, raylet_socket_name, node_ip_address,
redis_address, redis_port, redis_password,
node_manager_config, object_manager_config, gcs_client);
std::unique_ptr<ray::raylet::Raylet> server(new ray::raylet::Raylet(
main_service, raylet_socket_name, node_ip_address, redis_address, redis_port,
redis_password, node_manager_config, object_manager_config, gcs_client));

// Destroy the Raylet on a SIGTERM. The pointer to main_service is
// guaranteed to be valid since this function will run the event loop
// instead of returning immediately.
// We should stop the service and remove the local socket file.
auto handler = [&main_service, &raylet_socket_name](
auto handler = [&main_service, &raylet_socket_name, &server, &gcs_client](
const boost::system::error_code &error, int signal_number) {
main_service.stop();
auto shutdown_callback = [&server, &main_service]() {
server.reset();
main_service.stop();
};
RAY_CHECK_OK(gcs_client->client_table().Disconnect(shutdown_callback));
// Give a timeout for this Disconnect operation.
boost::posix_time::milliseconds stop_timeout(800);
boost::asio::deadline_timer timer(main_service);
timer.expires_from_now(stop_timeout);
timer.async_wait([shutdown_callback](const boost::system::error_code &error) {
if (!error) {
shutdown_callback();
}
});
remove(raylet_socket_name.c_str());
};
boost::asio::signal_set signals(main_service, SIGTERM);
Expand Down
45 changes: 29 additions & 16 deletions src/ray/raylet/monitor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,22 +45,35 @@ void Monitor::Tick() {
it->second--;
if (it->second == 0) {
if (dead_clients_.count(it->first) == 0) {
RAY_LOG(WARNING) << "Client timed out: " << it->first;
RAY_CHECK_OK(gcs_client_.client_table().MarkDisconnected(it->first));

// Broadcast a warning to all of the drivers indicating that the node
// has been marked as dead.
// TODO(rkn): Define this constant somewhere else.
std::string type = "node_removed";
std::ostringstream error_message;
error_message << "The node with client ID " << it->first << " has been marked "
<< "dead because the monitor has missed too many heartbeats "
<< "from it.";
// We use the nil JobID to broadcast the message to all drivers.
RAY_CHECK_OK(gcs_client_.error_table().PushErrorToDriver(
JobID::nil(), type, error_message.str(), current_time_ms()));

dead_clients_.insert(it->first);
auto client_id = it->first;
RAY_LOG(WARNING) << "Client timed out: " << client_id;
auto lookup_callback = [this, client_id](
gcs::AsyncGcsClient *client, const ClientID &id,
const std::vector<ClientTableDataT> &all_data) {
bool marked = false;
for (const auto &data : all_data) {
if (client_id.binary() == data.client_id && !data.is_insertion) {
// The node has been marked dead by itself.
marked = true;
}
}
if (!marked) {
RAY_CHECK_OK(gcs_client_.client_table().MarkDisconnected(client_id));
// Broadcast a warning to all of the drivers indicating that the node
// has been marked as dead.
// TODO(rkn): Define this constant somewhere else.
std::string type = "node_removed";
std::ostringstream error_message;
error_message << "The node with client ID " << client_id
<< " has been marked dead because the monitor"
<< " has missed too many heartbeats from it.";
// We use the nil JobID to broadcast the message to all drivers.
RAY_CHECK_OK(gcs_client_.error_table().PushErrorToDriver(
JobID::nil(), type, error_message.str(), current_time_ms()));
}
};
RAY_CHECK_OK(gcs_client_.client_table().Lookup(lookup_callback));
dead_clients_.insert(client_id);
}
it = heartbeats_.erase(it);
} else {
Expand Down
18 changes: 10 additions & 8 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -341,15 +341,17 @@ void NodeManager::ClientAdded(const ClientTableDataT &client_data) {
// Establish a new NodeManager connection to this GCS client.
auto status = ConnectRemoteNodeManager(client_id, client_data.node_manager_address,
client_data.node_manager_port);
// A disconnected client has 2 entries in the client table (one for being
// inserted and one for being removed). When a new raylet starts, ClientAdded
// will be called with the disconnected client's first entry, which will cause
// IOError and "Connection refused".
if (!status.ok()) {
RAY_LOG(WARNING) << "Failed to connect to client " << client_id
<< " in ClientAdded. TcpConnect returned status: "
<< status.ToString() << ". This may be caused by "
<< "trying to connect to a node manager that has failed.";
// This is not a fatal error for raylet, but it should not happen.
// We need to broadcase this message.
std::string type = "raylet_connection_error";
std::ostringstream error_message;
error_message << "Failed to connect to ray node " << client_id
<< " with status: " << status.ToString()
<< ". This may be since the node was recently removed.";
// We use the nil JobID to broadcast the message to all drivers.
RAY_CHECK_OK(gcs_client_->error_table().PushErrorToDriver(
JobID::nil(), type, error_message.str(), current_time_ms()));
return;
}

Expand Down
2 changes: 1 addition & 1 deletion src/ray/raylet/raylet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ Raylet::Raylet(boost::asio::io_service &main_service, const std::string &socket_
RAY_CHECK_OK(RegisterPeriodicTimer(main_service));
}

Raylet::~Raylet() { RAY_CHECK_OK(gcs_client_->client_table().Disconnect()); }
Raylet::~Raylet() {}

ray::Status Raylet::RegisterPeriodicTimer(boost::asio::io_service &io_service) {
boost::posix_time::milliseconds timer_period_ms(100);
Expand Down