diff --git a/python/ray/ray_constants.py b/python/ray/ray_constants.py index 45c56c815efc..04527f2ebd4a 100644 --- a/python/ray/ray_constants.py +++ b/python/ray/ray_constants.py @@ -56,6 +56,7 @@ def env_integer(key, default): LOG_MONITOR_DIED_ERROR = "log_monitor_died" REPORTER_DIED_ERROR = "reporter_died" DASHBOARD_DIED_ERROR = "dashboard_died" +RAYLET_CONNECTION_ERROR = "raylet_connection_error" # Abort autoscaling if more than this number of errors are encountered. This # is a safety feature to prevent e.g. runaway node launches. diff --git a/python/ray/tests/cluster_utils.py b/python/ray/tests/cluster_utils.py index 503087abb1c4..0a7984d69740 100644 --- a/python/ray/tests/cluster_utils.py +++ b/python/ray/tests/cluster_utils.py @@ -102,7 +102,7 @@ def add_node(self, **node_args): return node - def remove_node(self, node): + def remove_node(self, node, allow_graceful=False): """Kills all processes associated with worker node. Args: @@ -110,11 +110,13 @@ def remove_node(self, node): will be removed. """ if self.head_node == node: - self.head_node.kill_all_processes(check_alive=False) + self.head_node.kill_all_processes( + check_alive=False, allow_graceful=allow_graceful) self.head_node = None # TODO(rliaw): Do we need to kill all worker processes? else: - node.kill_all_processes(check_alive=False) + node.kill_all_processes( + check_alive=False, allow_graceful=allow_graceful) self.worker_nodes.remove(node) assert not node.any_processes_alive(), ( diff --git a/python/ray/tests/test_failure.py b/python/ray/tests/test_failure.py index 19dfbbc6cb45..905b1ee28b4a 100644 --- a/python/ray/tests/test_failure.py +++ b/python/ray/tests/test_failure.py @@ -722,3 +722,31 @@ def sleep_to_kill_raylet(): with pytest.raises(Exception, match=r".*Connection closed unexpectedly.*"): ray.get(nonexistent_id) thread.join() + + +def test_connect_with_disconnected_node(shutdown_only): + config = json.dumps({ + "num_heartbeats_timeout": 50, + "heartbeat_timeout_milliseconds": 10, + }) + cluster = Cluster() + cluster.add_node(num_cpus=0, _internal_config=config) + ray.init(redis_address=cluster.redis_address) + info = relevant_errors(ray_constants.REMOVED_NODE_ERROR) + assert len(info) == 0 + # This node is killed by SIGKILL, ray_monitor will mark it to dead. + dead_node = cluster.add_node(num_cpus=0, _internal_config=config) + cluster.remove_node(dead_node, allow_graceful=False) + wait_for_errors(ray_constants.REMOVED_NODE_ERROR, 1, timeout=2) + # This node is killed by SIGKILL, ray_monitor will mark it to dead. + dead_node = cluster.add_node(num_cpus=0, _internal_config=config) + cluster.remove_node(dead_node, allow_graceful=False) + wait_for_errors(ray_constants.REMOVED_NODE_ERROR, 2, timeout=2) + # This node is killed by SIGTERM, ray_monitor will not mark it again. + removing_node = cluster.add_node(num_cpus=0, _internal_config=config) + cluster.remove_node(removing_node, allow_graceful=True) + with pytest.raises(Exception, match=('Timing out of wait.')): + wait_for_errors(ray_constants.REMOVED_NODE_ERROR, 3, timeout=2) + # There is no connection error to a dead node. + info = relevant_errors(ray_constants.RAYLET_CONNECTION_ERROR) + assert len(info) == 0 diff --git a/src/ray/gcs/tables.cc b/src/ray/gcs/tables.cc index e10048827714..87a72258ba2a 100644 --- a/src/ray/gcs/tables.cc +++ b/src/ray/gcs/tables.cc @@ -412,8 +412,26 @@ Status ClientTable::Connect(const ClientTableDataT &local_client) { AsyncGcsClient *client, const UniqueID &log_key, const std::vector ¬ifications) { RAY_CHECK(log_key == client_log_key_); + std::unordered_map connected_nodes; + std::unordered_map disconnected_nodes; for (auto ¬ification : notifications) { - HandleNotification(client, notification); + // This is temporary fix for Issue 4140 to avoid connect to dead nodes. + // TODO(yuhguo): remove this temporary fix after GCS entry is removable. + if (notification.is_insertion) { + connected_nodes.emplace(notification.client_id, notification); + } else { + auto iter = connected_nodes.find(notification.client_id); + if (iter != connected_nodes.end()) { + connected_nodes.erase(iter); + } + disconnected_nodes.emplace(notification.client_id, notification); + } + } + for (const auto &pair : connected_nodes) { + HandleNotification(client, pair.second); + } + for (const auto &pair : disconnected_nodes) { + HandleNotification(client, pair.second); } }; // Callback to request notifications from the client table once we've @@ -428,13 +446,16 @@ Status ClientTable::Connect(const ClientTableDataT &local_client) { return Append(JobID::nil(), client_log_key_, data, add_callback); } -Status ClientTable::Disconnect() { +Status ClientTable::Disconnect(const DisconnectCallback &callback) { auto data = std::make_shared(local_client_); data->is_insertion = false; - auto add_callback = [this](AsyncGcsClient *client, const ClientID &id, - const ClientTableDataT &data) { + auto add_callback = [this, callback](AsyncGcsClient *client, const ClientID &id, + const ClientTableDataT &data) { HandleConnected(client, data); RAY_CHECK_OK(CancelNotifications(JobID::nil(), client_log_key_, id)); + if (callback != nullptr) { + callback(); + } }; RAY_RETURN_NOT_OK(Append(JobID::nil(), client_log_key_, data, add_callback)); // We successfully added the deletion entry. Mark ourselves as disconnected. @@ -464,6 +485,11 @@ const std::unordered_map &ClientTable::GetAllClients return client_cache_; } +Status ClientTable::Lookup(const Callback &lookup) { + RAY_CHECK(lookup != nullptr); + return Log::Lookup(JobID::nil(), client_log_key_, lookup); +} + std::string ClientTable::DebugString() const { std::stringstream result; result << Log::DebugString(); diff --git a/src/ray/gcs/tables.h b/src/ray/gcs/tables.h index cfe280b97c8f..71e1c39d6da7 100644 --- a/src/ray/gcs/tables.h +++ b/src/ray/gcs/tables.h @@ -578,6 +578,7 @@ class ClientTable : private Log { public: using ClientTableCallback = std::function; + using DisconnectCallback = std::function; ClientTable(const std::vector> &contexts, AsyncGcsClient *client, const ClientID &client_id) : Log(contexts, client), @@ -606,7 +607,7 @@ class ClientTable : private Log { /// registration should never be reused after disconnecting. /// /// \return Status - ray::Status Disconnect(); + ray::Status Disconnect(const DisconnectCallback &callback = nullptr); /// Mark a different client as disconnected. The client ID should never be /// reused for a new client. @@ -656,6 +657,13 @@ class ClientTable : private Log { /// \return The client ID to client information map. const std::unordered_map &GetAllClients() const; + /// Lookup the client data in the client table. + /// + /// \param lookup Callback that is called after lookup. If the callback is + /// called with an empty vector, then there was no data at the key. + /// \return Status. + Status Lookup(const Callback &lookup); + /// Returns debug string for class. /// /// \return string. diff --git a/src/ray/raylet/main.cc b/src/ray/raylet/main.cc index 02f7027892cd..8942e68acc45 100644 --- a/src/ray/raylet/main.cc +++ b/src/ray/raylet/main.cc @@ -127,17 +127,30 @@ int main(int argc, char *argv[]) { RAY_LOG(DEBUG) << "Initializing GCS client " << gcs_client->client_table().GetLocalClientId(); - ray::raylet::Raylet server(main_service, raylet_socket_name, node_ip_address, - redis_address, redis_port, redis_password, - node_manager_config, object_manager_config, gcs_client); + std::unique_ptr server(new ray::raylet::Raylet( + main_service, raylet_socket_name, node_ip_address, redis_address, redis_port, + redis_password, node_manager_config, object_manager_config, gcs_client)); // Destroy the Raylet on a SIGTERM. The pointer to main_service is // guaranteed to be valid since this function will run the event loop // instead of returning immediately. // We should stop the service and remove the local socket file. - auto handler = [&main_service, &raylet_socket_name]( + auto handler = [&main_service, &raylet_socket_name, &server, &gcs_client]( const boost::system::error_code &error, int signal_number) { - main_service.stop(); + auto shutdown_callback = [&server, &main_service]() { + server.reset(); + main_service.stop(); + }; + RAY_CHECK_OK(gcs_client->client_table().Disconnect(shutdown_callback)); + // Give a timeout for this Disconnect operation. + boost::posix_time::milliseconds stop_timeout(800); + boost::asio::deadline_timer timer(main_service); + timer.expires_from_now(stop_timeout); + timer.async_wait([shutdown_callback](const boost::system::error_code &error) { + if (!error) { + shutdown_callback(); + } + }); remove(raylet_socket_name.c_str()); }; boost::asio::signal_set signals(main_service, SIGTERM); diff --git a/src/ray/raylet/monitor.cc b/src/ray/raylet/monitor.cc index baaa7070022d..30f05de226c4 100644 --- a/src/ray/raylet/monitor.cc +++ b/src/ray/raylet/monitor.cc @@ -45,22 +45,35 @@ void Monitor::Tick() { it->second--; if (it->second == 0) { if (dead_clients_.count(it->first) == 0) { - RAY_LOG(WARNING) << "Client timed out: " << it->first; - RAY_CHECK_OK(gcs_client_.client_table().MarkDisconnected(it->first)); - - // Broadcast a warning to all of the drivers indicating that the node - // has been marked as dead. - // TODO(rkn): Define this constant somewhere else. - std::string type = "node_removed"; - std::ostringstream error_message; - error_message << "The node with client ID " << it->first << " has been marked " - << "dead because the monitor has missed too many heartbeats " - << "from it."; - // We use the nil JobID to broadcast the message to all drivers. - RAY_CHECK_OK(gcs_client_.error_table().PushErrorToDriver( - JobID::nil(), type, error_message.str(), current_time_ms())); - - dead_clients_.insert(it->first); + auto client_id = it->first; + RAY_LOG(WARNING) << "Client timed out: " << client_id; + auto lookup_callback = [this, client_id]( + gcs::AsyncGcsClient *client, const ClientID &id, + const std::vector &all_data) { + bool marked = false; + for (const auto &data : all_data) { + if (client_id.binary() == data.client_id && !data.is_insertion) { + // The node has been marked dead by itself. + marked = true; + } + } + if (!marked) { + RAY_CHECK_OK(gcs_client_.client_table().MarkDisconnected(client_id)); + // Broadcast a warning to all of the drivers indicating that the node + // has been marked as dead. + // TODO(rkn): Define this constant somewhere else. + std::string type = "node_removed"; + std::ostringstream error_message; + error_message << "The node with client ID " << client_id + << " has been marked dead because the monitor" + << " has missed too many heartbeats from it."; + // We use the nil JobID to broadcast the message to all drivers. + RAY_CHECK_OK(gcs_client_.error_table().PushErrorToDriver( + JobID::nil(), type, error_message.str(), current_time_ms())); + } + }; + RAY_CHECK_OK(gcs_client_.client_table().Lookup(lookup_callback)); + dead_clients_.insert(client_id); } it = heartbeats_.erase(it); } else { diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 74d5c9dee974..684cad003b87 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -341,15 +341,17 @@ void NodeManager::ClientAdded(const ClientTableDataT &client_data) { // Establish a new NodeManager connection to this GCS client. auto status = ConnectRemoteNodeManager(client_id, client_data.node_manager_address, client_data.node_manager_port); - // A disconnected client has 2 entries in the client table (one for being - // inserted and one for being removed). When a new raylet starts, ClientAdded - // will be called with the disconnected client's first entry, which will cause - // IOError and "Connection refused". if (!status.ok()) { - RAY_LOG(WARNING) << "Failed to connect to client " << client_id - << " in ClientAdded. TcpConnect returned status: " - << status.ToString() << ". This may be caused by " - << "trying to connect to a node manager that has failed."; + // This is not a fatal error for raylet, but it should not happen. + // We need to broadcase this message. + std::string type = "raylet_connection_error"; + std::ostringstream error_message; + error_message << "Failed to connect to ray node " << client_id + << " with status: " << status.ToString() + << ". This may be since the node was recently removed."; + // We use the nil JobID to broadcast the message to all drivers. + RAY_CHECK_OK(gcs_client_->error_table().PushErrorToDriver( + JobID::nil(), type, error_message.str(), current_time_ms())); return; } diff --git a/src/ray/raylet/raylet.cc b/src/ray/raylet/raylet.cc index 288f0a80b481..59a048a32893 100644 --- a/src/ray/raylet/raylet.cc +++ b/src/ray/raylet/raylet.cc @@ -68,7 +68,7 @@ Raylet::Raylet(boost::asio::io_service &main_service, const std::string &socket_ RAY_CHECK_OK(RegisterPeriodicTimer(main_service)); } -Raylet::~Raylet() { RAY_CHECK_OK(gcs_client_->client_table().Disconnect()); } +Raylet::~Raylet() {} ray::Status Raylet::RegisterPeriodicTimer(boost::asio::io_service &io_service) { boost::posix_time::milliseconds timer_period_ms(100);