diff --git a/python/ray/tests/test_failure_3.py b/python/ray/tests/test_failure_3.py index 1c8a5fe28561..759c8c38ada6 100644 --- a/python/ray/tests/test_failure_3.py +++ b/python/ray/tests/test_failure_3.py @@ -47,6 +47,31 @@ def get_raylet_pid(self): wait_for_pid_to_exit(worker_pid) +def test_plasma_store_operation_after_raylet_dies(ray_start_cluster): + """ + Test that the operation on the plasma store after the raylet dies will not fail the + task with an application level error (RayTaskError) but a system level error + (RayletDiedError). + """ + cluster = ray_start_cluster + cluster.add_node(num_cpus=1) + cluster.wait_for_nodes() + + ray.init(address=cluster.address) + + @ray.remote + def get_after_raylet_dies(): + raylet_pid = int(os.environ["RAY_RAYLET_PID"]) + os.kill(raylet_pid, SIGKILL) + wait_for_pid_to_exit(raylet_pid) + ray.put([0] * 100000) + + try: + ray.get(get_after_raylet_dies.remote(), timeout=10) + except Exception as e: + assert isinstance(e, ray.exceptions.LocalRayletDiedError) + + @pytest.mark.parametrize( "ray_start_cluster_head", [ diff --git a/src/ray/common/client_connection.h b/src/ray/common/client_connection.h index 09290d6315f0..14c572d189f1 100644 --- a/src/ray/common/client_connection.h +++ b/src/ray/common/client_connection.h @@ -99,7 +99,7 @@ class ServerConnection : public std::enable_shared_from_this { /// /// \param buffer The buffer. /// \return Status. - Status WriteBuffer(const std::vector &buffer); + virtual Status WriteBuffer(const std::vector &buffer); /// Write a buffer to this connection asynchronously. /// @@ -113,7 +113,7 @@ class ServerConnection : public std::enable_shared_from_this { /// /// \param buffer The buffer. /// \return Status. - Status ReadBuffer(const std::vector &buffer); + virtual Status ReadBuffer(const std::vector &buffer); /// Read a buffer from this connection asynchronously. /// diff --git a/src/ray/core_worker/store_provider/plasma_store_provider.cc b/src/ray/core_worker/store_provider/plasma_store_provider.cc index b428338e5fe9..6014bac8ef78 100644 --- a/src/ray/core_worker/store_provider/plasma_store_provider.cc +++ b/src/ray/core_worker/store_provider/plasma_store_provider.cc @@ -65,7 +65,13 @@ CoreWorkerPlasmaStoreProvider::CoreWorkerPlasmaStoreProvider( bool warmup, std::function get_current_call_site) : raylet_client_(raylet_client), - store_client_(std::make_shared()), + // We can turn on exit_on_connection_failure on for the core worker plasma + // client to early exit core worker after the raylet's death because on the + // raylet side, we never proactively close the plasma store connection even + // during shutdown. So any error from the raylet side should be a sign of raylet + // death. + store_client_( + std::make_shared(/*exit_on_connection_failure*/ true)), reference_counter_(reference_counter), check_signals_(std::move(check_signals)) { if (get_current_call_site != nullptr) { diff --git a/src/ray/object_manager/plasma/client.cc b/src/ray/object_manager/plasma/client.cc index 5b5380d495be..a1d8613aaba6 100644 --- a/src/ray/object_manager/plasma/client.cc +++ b/src/ray/object_manager/plasma/client.cc @@ -97,6 +97,7 @@ struct ObjectInUseEntry { class PlasmaClient::Impl : public std::enable_shared_from_this { public: Impl(); + explicit Impl(bool exit_on_connection_failure); ~Impl(); // PlasmaClient method implementations @@ -235,11 +236,17 @@ class PlasmaClient::Impl : public std::enable_shared_from_this deletion_cache_; /// A mutex which protects this class. std::recursive_mutex client_mutex_; + /// Whether the current process should exit when read or write to the connection fails. + /// It should only be turned on when the plasma client is in a core worker. + bool exit_on_connection_failure_; }; PlasmaBuffer::~PlasmaBuffer() { RAY_UNUSED(client_->Release(object_id_)); } -PlasmaClient::Impl::Impl() : store_capacity_(0) {} +PlasmaClient::Impl::Impl() : store_capacity_(0), exit_on_connection_failure_(false) {} + +PlasmaClient::Impl::Impl(bool exit_on_connection_failure) + : store_capacity_(0), exit_on_connection_failure_(exit_on_connection_failure) {} PlasmaClient::Impl::~Impl() {} @@ -868,7 +875,7 @@ Status PlasmaClient::Impl::Connect(const std::string &store_socket_name, /// The local stream socket that connects to store. ray::local_stream_socket socket(main_service_); RAY_RETURN_NOT_OK(ray::ConnectSocketRetry(socket, store_socket_name)); - store_conn_.reset(new StoreConn(std::move(socket))); + store_conn_.reset(new StoreConn(std::move(socket), exit_on_connection_failure_)); // Send a ConnectRequest to the store to get its memory capacity. RAY_RETURN_NOT_OK(SendConnectRequest(store_conn_)); std::vector buffer; @@ -912,6 +919,9 @@ std::string PlasmaClient::Impl::DebugString() { PlasmaClient::PlasmaClient() : impl_(std::make_shared()) {} +PlasmaClient::PlasmaClient(bool exit_on_connection_failure) + : impl_(std::make_shared(exit_on_connection_failure)) {} + Status PlasmaClient::Connect(const std::string &store_socket_name, const std::string &manager_socket_name, int num_retries) { diff --git a/src/ray/object_manager/plasma/client.h b/src/ray/object_manager/plasma/client.h index 04b1dfbe9c1b..aba45e0069f5 100644 --- a/src/ray/object_manager/plasma/client.h +++ b/src/ray/object_manager/plasma/client.h @@ -242,6 +242,8 @@ class PlasmaClient : public PlasmaClientInterface { public: PlasmaClient(); + explicit PlasmaClient(bool exit_on_connection_failure); + Status Connect(const std::string &store_socket_name, const std::string &manager_socket_name = "", int num_retries = -1) override; diff --git a/src/ray/object_manager/plasma/connection.cc b/src/ray/object_manager/plasma/connection.cc index 91526fef3df4..b8c7549af91b 100644 --- a/src/ray/object_manager/plasma/connection.cc +++ b/src/ray/object_manager/plasma/connection.cc @@ -170,7 +170,11 @@ Status Client::SendFd(MEMFD_TYPE fd) { } StoreConn::StoreConn(ray::local_stream_socket &&socket) - : ray::ServerConnection(std::move(socket)) {} + : ray::ServerConnection(std::move(socket)), exit_on_connection_failure_(false) {} + +StoreConn::StoreConn(ray::local_stream_socket &&socket, bool exit_on_connection_failure) + : ray::ServerConnection(std::move(socket)), + exit_on_connection_failure_(exit_on_connection_failure) {} Status StoreConn::RecvFd(MEMFD_TYPE_NON_UNIQUE *fd) { #ifdef _WIN32 @@ -192,4 +196,28 @@ Status StoreConn::RecvFd(MEMFD_TYPE_NON_UNIQUE *fd) { return Status::OK(); } +ray::Status StoreConn::WriteBuffer(const std::vector &buffer) { + auto status = ray::ServerConnection::WriteBuffer(buffer); + ExitIfErrorStatus(status); + return status; +} + +ray::Status StoreConn::ReadBuffer( + const std::vector &buffer) { + auto status = ray::ServerConnection::ReadBuffer(buffer); + ExitIfErrorStatus(status); + return status; +} + +void StoreConn::ExitIfErrorStatus(const ray::Status &status) { + if (!status.ok() && exit_on_connection_failure_) { + RAY_LOG(WARNING) << "The connection to the plasma store is failed. Terminate the " + << "process. Status: " << status; + ray::QuickExit(); + RAY_LOG(FATAL) + << "Accessing unreachable code. This line should never be reached " + << "after quick process exit due to plasma store connection failure. Please " + "create a github issue at https://github.com/ray-project/ray."; + } +} } // namespace plasma diff --git a/src/ray/object_manager/plasma/connection.h b/src/ray/object_manager/plasma/connection.h index be163f5cd6ed..296e8778ce84 100644 --- a/src/ray/object_manager/plasma/connection.h +++ b/src/ray/object_manager/plasma/connection.h @@ -164,10 +164,28 @@ class StoreConn : public ray::ServerConnection { public: explicit StoreConn(ray::local_stream_socket &&socket); + explicit StoreConn(ray::local_stream_socket &&socket, bool exit_on_connection_failure); + /// Receive a file descriptor for the store. /// /// \return A file descriptor. ray::Status RecvFd(MEMFD_TYPE_NON_UNIQUE *fd); + + ray::Status WriteBuffer(const std::vector &buffer) override; + + ray::Status ReadBuffer(const std::vector &buffer) override; + + private: + // Whether the current process should exit when WriteBuffer or ReadBuffer fails. + // Currently it is only turned on when the plasma client is in a core worker. + // TODO(myan): The better way is to handle the failure outside of the plasma client + // and inside the core worker's logic and propogate the correct exception to the user. + bool exit_on_connection_failure_ = false; + + // Shutdown the current process if the passed in status is not OK and the client is + // configured to exit on failure. + // @param status: The status to check. + void ExitIfErrorStatus(const ray::Status &status); }; std::ostream &operator<<(std::ostream &os, const std::shared_ptr &store_conn);