From a835e261974be34a9cb53cb155dcca2387253f6c Mon Sep 17 00:00:00 2001 From: Mengjin Yan Date: Mon, 9 Jun 2025 14:49:27 -0700 Subject: [PATCH 01/10] exit early if core worker get broken pipe from raylet Signed-off-by: Mengjin Yan --- .../store_provider/plasma_store_provider.cc | 2 +- src/ray/object_manager/plasma/client.cc | 13 +++++++-- src/ray/object_manager/plasma/client.h | 2 ++ src/ray/object_manager/plasma/connection.cc | 28 ++++++++++++++++++- src/ray/object_manager/plasma/connection.h | 14 ++++++++++ 5 files changed, 55 insertions(+), 4 deletions(-) diff --git a/src/ray/core_worker/store_provider/plasma_store_provider.cc b/src/ray/core_worker/store_provider/plasma_store_provider.cc index b428338e5fe9..f843a8437524 100644 --- a/src/ray/core_worker/store_provider/plasma_store_provider.cc +++ b/src/ray/core_worker/store_provider/plasma_store_provider.cc @@ -65,7 +65,7 @@ CoreWorkerPlasmaStoreProvider::CoreWorkerPlasmaStoreProvider( bool warmup, std::function get_current_call_site) : raylet_client_(raylet_client), - store_client_(std::make_shared()), + store_client_(std::make_shared(/*is_in_core_worker*/ true)), reference_counter_(reference_counter), check_signals_(std::move(check_signals)) { if (get_current_call_site != nullptr) { diff --git a/src/ray/object_manager/plasma/client.cc b/src/ray/object_manager/plasma/client.cc index 5b5380d495be..182fa8ea3ef6 100644 --- a/src/ray/object_manager/plasma/client.cc +++ b/src/ray/object_manager/plasma/client.cc @@ -97,6 +97,7 @@ struct ObjectInUseEntry { class PlasmaClient::Impl : public std::enable_shared_from_this { public: Impl(); + explicit Impl(bool is_in_core_worker); ~Impl(); // PlasmaClient method implementations @@ -235,11 +236,16 @@ class PlasmaClient::Impl : public std::enable_shared_from_this deletion_cache_; /// A mutex which protects this class. std::recursive_mutex client_mutex_; + /// Whether the client is in a core worker. + bool is_in_core_worker_; }; PlasmaBuffer::~PlasmaBuffer() { RAY_UNUSED(client_->Release(object_id_)); } -PlasmaClient::Impl::Impl() : store_capacity_(0) {} +PlasmaClient::Impl::Impl() : store_capacity_(0), is_in_core_worker_(false) {} + +PlasmaClient::Impl::Impl(bool is_in_core_worker) + : store_capacity_(0), is_in_core_worker_(is_in_core_worker) {} PlasmaClient::Impl::~Impl() {} @@ -868,7 +874,7 @@ Status PlasmaClient::Impl::Connect(const std::string &store_socket_name, /// The local stream socket that connects to store. ray::local_stream_socket socket(main_service_); RAY_RETURN_NOT_OK(ray::ConnectSocketRetry(socket, store_socket_name)); - store_conn_.reset(new StoreConn(std::move(socket))); + store_conn_.reset(new StoreConn(std::move(socket), is_in_core_worker_)); // Send a ConnectRequest to the store to get its memory capacity. RAY_RETURN_NOT_OK(SendConnectRequest(store_conn_)); std::vector buffer; @@ -912,6 +918,9 @@ std::string PlasmaClient::Impl::DebugString() { PlasmaClient::PlasmaClient() : impl_(std::make_shared()) {} +PlasmaClient::PlasmaClient(bool is_in_core_worker) + : impl_(std::make_shared(is_in_core_worker)) {} + Status PlasmaClient::Connect(const std::string &store_socket_name, const std::string &manager_socket_name, int num_retries) { diff --git a/src/ray/object_manager/plasma/client.h b/src/ray/object_manager/plasma/client.h index 04b1dfbe9c1b..43f45ef78eee 100644 --- a/src/ray/object_manager/plasma/client.h +++ b/src/ray/object_manager/plasma/client.h @@ -242,6 +242,8 @@ class PlasmaClient : public PlasmaClientInterface { public: PlasmaClient(); + explicit PlasmaClient(bool is_in_core_worker); + Status Connect(const std::string &store_socket_name, const std::string &manager_socket_name = "", int num_retries = -1) override; diff --git a/src/ray/object_manager/plasma/connection.cc b/src/ray/object_manager/plasma/connection.cc index 91526fef3df4..1c4a11f40f66 100644 --- a/src/ray/object_manager/plasma/connection.cc +++ b/src/ray/object_manager/plasma/connection.cc @@ -170,7 +170,10 @@ Status Client::SendFd(MEMFD_TYPE fd) { } StoreConn::StoreConn(ray::local_stream_socket &&socket) - : ray::ServerConnection(std::move(socket)) {} + : ray::ServerConnection(std::move(socket)), is_in_core_worker_(false) {} + +StoreConn::StoreConn(ray::local_stream_socket &&socket, bool is_in_core_worker) + : ray::ServerConnection(std::move(socket)), is_in_core_worker_(is_in_core_worker) {} Status StoreConn::RecvFd(MEMFD_TYPE_NON_UNIQUE *fd) { #ifdef _WIN32 @@ -192,4 +195,27 @@ Status StoreConn::RecvFd(MEMFD_TYPE_NON_UNIQUE *fd) { return Status::OK(); } +ray::Status StoreConn::WriteBuffer(const std::vector &buffer) { + auto status = ray::ServerConnection::WriteBuffer(buffer); + ShutdownWorkerIfLocalRayletDisconnected(status); + return status; +} + +ray::Status StoreConn::ReadBuffer( + const std::vector &buffer) { + auto status = ray::ServerConnection::ReadBuffer(buffer); + ShutdownWorkerIfLocalRayletDisconnected(status); + return status; +} + +void StoreConn::ShutdownWorkerIfLocalRayletDisconnected(const ray::Status &status) { + if (is_in_core_worker_ && !status.ok() && + ray::IsRayletFailed(RayConfig::instance().RAYLET_PID())) { + RAY_LOG(WARNING) << "The connection to the plasma store is failed because the " + "local raylet is dead. Terminate the process. Status: " + << status; + ray::QuickExit(); + RAY_LOG(FATAL) << "Unreachable."; + } +} } // namespace plasma diff --git a/src/ray/object_manager/plasma/connection.h b/src/ray/object_manager/plasma/connection.h index be163f5cd6ed..43b87a018f47 100644 --- a/src/ray/object_manager/plasma/connection.h +++ b/src/ray/object_manager/plasma/connection.h @@ -164,10 +164,24 @@ class StoreConn : public ray::ServerConnection { public: explicit StoreConn(ray::local_stream_socket &&socket); + explicit StoreConn(ray::local_stream_socket &&socket, bool is_in_core_worker); + /// Receive a file descriptor for the store. /// /// \return A file descriptor. ray::Status RecvFd(MEMFD_TYPE_NON_UNIQUE *fd); + + ray::Status WriteBuffer(const std::vector &buffer); + + ray::Status ReadBuffer(const std::vector &buffer); + + private: + // Whether the client is in a core worker. + bool is_in_core_worker_; + + // Shutdown the current process if the plasma client is in a core worker and the + // local raylet is dead. + void ShutdownWorkerIfLocalRayletDisconnected(const ray::Status &status); }; std::ostream &operator<<(std::ostream &os, const std::shared_ptr &store_conn); From d4b182fc27c6a990fd7f87aac54cb04e48feef8e Mon Sep 17 00:00:00 2001 From: Mengjin Yan Date: Tue, 10 Jun 2025 16:23:02 -0700 Subject: [PATCH 02/10] update Signed-off-by: Mengjin Yan --- src/ray/common/client_connection.h | 4 ++-- src/ray/object_manager/plasma/connection.cc | 3 +-- src/ray/object_manager/plasma/connection.h | 4 ++-- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/src/ray/common/client_connection.h b/src/ray/common/client_connection.h index 09290d6315f0..14c572d189f1 100644 --- a/src/ray/common/client_connection.h +++ b/src/ray/common/client_connection.h @@ -99,7 +99,7 @@ class ServerConnection : public std::enable_shared_from_this { /// /// \param buffer The buffer. /// \return Status. - Status WriteBuffer(const std::vector &buffer); + virtual Status WriteBuffer(const std::vector &buffer); /// Write a buffer to this connection asynchronously. /// @@ -113,7 +113,7 @@ class ServerConnection : public std::enable_shared_from_this { /// /// \param buffer The buffer. /// \return Status. - Status ReadBuffer(const std::vector &buffer); + virtual Status ReadBuffer(const std::vector &buffer); /// Read a buffer from this connection asynchronously. /// diff --git a/src/ray/object_manager/plasma/connection.cc b/src/ray/object_manager/plasma/connection.cc index 1c4a11f40f66..ba9c418a9aae 100644 --- a/src/ray/object_manager/plasma/connection.cc +++ b/src/ray/object_manager/plasma/connection.cc @@ -209,8 +209,7 @@ ray::Status StoreConn::ReadBuffer( } void StoreConn::ShutdownWorkerIfLocalRayletDisconnected(const ray::Status &status) { - if (is_in_core_worker_ && !status.ok() && - ray::IsRayletFailed(RayConfig::instance().RAYLET_PID())) { + if (is_in_core_worker_ && !status.ok()) { RAY_LOG(WARNING) << "The connection to the plasma store is failed because the " "local raylet is dead. Terminate the process. Status: " << status; diff --git a/src/ray/object_manager/plasma/connection.h b/src/ray/object_manager/plasma/connection.h index 43b87a018f47..d216f27b3bf8 100644 --- a/src/ray/object_manager/plasma/connection.h +++ b/src/ray/object_manager/plasma/connection.h @@ -171,9 +171,9 @@ class StoreConn : public ray::ServerConnection { /// \return A file descriptor. ray::Status RecvFd(MEMFD_TYPE_NON_UNIQUE *fd); - ray::Status WriteBuffer(const std::vector &buffer); + ray::Status WriteBuffer(const std::vector &buffer) override; - ray::Status ReadBuffer(const std::vector &buffer); + ray::Status ReadBuffer(const std::vector &buffer) override; private: // Whether the client is in a core worker. From efd22a27777919eed0e8b220d56bde110888f747 Mon Sep 17 00:00:00 2001 From: Mengjin Yan Date: Tue, 10 Jun 2025 16:43:00 -0700 Subject: [PATCH 03/10] add comment Signed-off-by: Mengjin Yan --- src/ray/object_manager/plasma/connection.cc | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/ray/object_manager/plasma/connection.cc b/src/ray/object_manager/plasma/connection.cc index ba9c418a9aae..531783c976b4 100644 --- a/src/ray/object_manager/plasma/connection.cc +++ b/src/ray/object_manager/plasma/connection.cc @@ -209,6 +209,12 @@ ray::Status StoreConn::ReadBuffer( } void StoreConn::ShutdownWorkerIfLocalRayletDisconnected(const ray::Status &status) { + // Here we don't explicitly check if the local raylet is dead, for the reasons that: + // 1. If the connection is from a core worker, the local raylet should be on the other + // side of the connection. + // 2. On the raylet side, we never proactivately close the plasma store connection + // even during shutdown. So any error from the raylet side should be a sign of raylet + // death. if (is_in_core_worker_ && !status.ok()) { RAY_LOG(WARNING) << "The connection to the plasma store is failed because the " "local raylet is dead. Terminate the process. Status: " From a675a4715144efad1f40c558e26ffa52813c9b8c Mon Sep 17 00:00:00 2001 From: Mengjin Yan Date: Wed, 11 Jun 2025 12:52:30 -0700 Subject: [PATCH 04/10] Update src/ray/object_manager/plasma/connection.cc Co-authored-by: Ibrahim Rabbani Signed-off-by: Mengjin Yan --- src/ray/object_manager/plasma/connection.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ray/object_manager/plasma/connection.cc b/src/ray/object_manager/plasma/connection.cc index 531783c976b4..f864ce796371 100644 --- a/src/ray/object_manager/plasma/connection.cc +++ b/src/ray/object_manager/plasma/connection.cc @@ -209,10 +209,10 @@ ray::Status StoreConn::ReadBuffer( } void StoreConn::ShutdownWorkerIfLocalRayletDisconnected(const ray::Status &status) { - // Here we don't explicitly check if the local raylet is dead, for the reasons that: + // We don't explicitly check if the local raylet is dead because: // 1. If the connection is from a core worker, the local raylet should be on the other // side of the connection. - // 2. On the raylet side, we never proactivately close the plasma store connection + // 2. On the raylet side, we never proactively close the plasma store connection // even during shutdown. So any error from the raylet side should be a sign of raylet // death. if (is_in_core_worker_ && !status.ok()) { From dce3714b9cb663a9de99ba7999290a448b841367 Mon Sep 17 00:00:00 2001 From: Mengjin Yan Date: Wed, 11 Jun 2025 15:54:29 -0700 Subject: [PATCH 05/10] fix the review comment Signed-off-by: Mengjin Yan --- python/ray/tests/test_failure_3.py | 24 +++++++++++++++ .../store_provider/plasma_store_provider.cc | 8 ++++- src/ray/object_manager/plasma/client.cc | 19 ++++++------ src/ray/object_manager/plasma/client.h | 2 +- src/ray/object_manager/plasma/connection.cc | 29 ++++++++----------- src/ray/object_manager/plasma/connection.h | 20 ++++++++----- 6 files changed, 67 insertions(+), 35 deletions(-) diff --git a/python/ray/tests/test_failure_3.py b/python/ray/tests/test_failure_3.py index 1c8a5fe28561..ef3f42b048ad 100644 --- a/python/ray/tests/test_failure_3.py +++ b/python/ray/tests/test_failure_3.py @@ -47,6 +47,30 @@ def get_raylet_pid(self): wait_for_pid_to_exit(worker_pid) +def test_plasma_store_operation_after_raylet_dies(ray_start_cluster): + """ + Test that the operation on the plasma store after the raylet dies will not fail the + task with an application level error (RayTaskError) but a system level error + (RayletDiedError). + """ + cluster = ray_start_cluster + cluster.add_node(num_cpus=1) + cluster.wait_for_nodes() + + ray.init(address=cluster.address) + + @ray.remote + def get_after_raylet_dies(): + raylet_pid = int(os.environ["RAY_RAYLET_PID"]) + os.kill(raylet_pid, SIGKILL) + ray.put([0] * 100000) + + try: + ray.get(get_after_raylet_dies.remote(), timeout=10) + except Exception as e: + assert isinstance(e, ray.exceptions.RayletDiedError) + + @pytest.mark.parametrize( "ray_start_cluster_head", [ diff --git a/src/ray/core_worker/store_provider/plasma_store_provider.cc b/src/ray/core_worker/store_provider/plasma_store_provider.cc index f843a8437524..43b5122780a8 100644 --- a/src/ray/core_worker/store_provider/plasma_store_provider.cc +++ b/src/ray/core_worker/store_provider/plasma_store_provider.cc @@ -65,7 +65,13 @@ CoreWorkerPlasmaStoreProvider::CoreWorkerPlasmaStoreProvider( bool warmup, std::function get_current_call_site) : raylet_client_(raylet_client), - store_client_(std::make_shared(/*is_in_core_worker*/ true)), + // Here we can turn on exit_on_connection_failure on for the core worker plasma + // client to early exist core worker after the raylet's death because on the + // raylet side, we never proactivately close the plasma store connection even + // during shutdown. So any error from the raylet side should be a sign of raylet + // death. + store_client_( + std::make_shared(/*exit_on_connection_failure*/ true)), reference_counter_(reference_counter), check_signals_(std::move(check_signals)) { if (get_current_call_site != nullptr) { diff --git a/src/ray/object_manager/plasma/client.cc b/src/ray/object_manager/plasma/client.cc index 182fa8ea3ef6..c155b5f026a0 100644 --- a/src/ray/object_manager/plasma/client.cc +++ b/src/ray/object_manager/plasma/client.cc @@ -97,7 +97,7 @@ struct ObjectInUseEntry { class PlasmaClient::Impl : public std::enable_shared_from_this { public: Impl(); - explicit Impl(bool is_in_core_worker); + explicit Impl(bool exit_on_connection_failure); ~Impl(); // PlasmaClient method implementations @@ -236,16 +236,17 @@ class PlasmaClient::Impl : public std::enable_shared_from_this deletion_cache_; /// A mutex which protects this class. std::recursive_mutex client_mutex_; - /// Whether the client is in a core worker. - bool is_in_core_worker_; + /// Whether the current process should exit when read or write to the connection fails. + /// Currently it is only turned on when the plasma client is in a core worker. + bool exit_on_connection_failure_; }; PlasmaBuffer::~PlasmaBuffer() { RAY_UNUSED(client_->Release(object_id_)); } -PlasmaClient::Impl::Impl() : store_capacity_(0), is_in_core_worker_(false) {} +PlasmaClient::Impl::Impl() : store_capacity_(0), exit_on_connection_failure_(false) {} -PlasmaClient::Impl::Impl(bool is_in_core_worker) - : store_capacity_(0), is_in_core_worker_(is_in_core_worker) {} +PlasmaClient::Impl::Impl(bool exit_on_connection_failure) + : store_capacity_(0), exit_on_connection_failure_(exit_on_connection_failure) {} PlasmaClient::Impl::~Impl() {} @@ -874,7 +875,7 @@ Status PlasmaClient::Impl::Connect(const std::string &store_socket_name, /// The local stream socket that connects to store. ray::local_stream_socket socket(main_service_); RAY_RETURN_NOT_OK(ray::ConnectSocketRetry(socket, store_socket_name)); - store_conn_.reset(new StoreConn(std::move(socket), is_in_core_worker_)); + store_conn_.reset(new StoreConn(std::move(socket), exit_on_connection_failure_)); // Send a ConnectRequest to the store to get its memory capacity. RAY_RETURN_NOT_OK(SendConnectRequest(store_conn_)); std::vector buffer; @@ -918,8 +919,8 @@ std::string PlasmaClient::Impl::DebugString() { PlasmaClient::PlasmaClient() : impl_(std::make_shared()) {} -PlasmaClient::PlasmaClient(bool is_in_core_worker) - : impl_(std::make_shared(is_in_core_worker)) {} +PlasmaClient::PlasmaClient(bool exit_on_connection_failure) + : impl_(std::make_shared(exit_on_connection_failure)) {} Status PlasmaClient::Connect(const std::string &store_socket_name, const std::string &manager_socket_name, diff --git a/src/ray/object_manager/plasma/client.h b/src/ray/object_manager/plasma/client.h index 43f45ef78eee..aba45e0069f5 100644 --- a/src/ray/object_manager/plasma/client.h +++ b/src/ray/object_manager/plasma/client.h @@ -242,7 +242,7 @@ class PlasmaClient : public PlasmaClientInterface { public: PlasmaClient(); - explicit PlasmaClient(bool is_in_core_worker); + explicit PlasmaClient(bool exit_on_connection_failure); Status Connect(const std::string &store_socket_name, const std::string &manager_socket_name = "", diff --git a/src/ray/object_manager/plasma/connection.cc b/src/ray/object_manager/plasma/connection.cc index f864ce796371..3f45c6c64166 100644 --- a/src/ray/object_manager/plasma/connection.cc +++ b/src/ray/object_manager/plasma/connection.cc @@ -170,10 +170,11 @@ Status Client::SendFd(MEMFD_TYPE fd) { } StoreConn::StoreConn(ray::local_stream_socket &&socket) - : ray::ServerConnection(std::move(socket)), is_in_core_worker_(false) {} + : ray::ServerConnection(std::move(socket)), exit_on_connection_failure_(false) {} -StoreConn::StoreConn(ray::local_stream_socket &&socket, bool is_in_core_worker) - : ray::ServerConnection(std::move(socket)), is_in_core_worker_(is_in_core_worker) {} +StoreConn::StoreConn(ray::local_stream_socket &&socket, bool exit_on_connection_failure) + : ray::ServerConnection(std::move(socket)), + exit_on_connection_failure_(exit_on_connection_failure) {} Status StoreConn::RecvFd(MEMFD_TYPE_NON_UNIQUE *fd) { #ifdef _WIN32 @@ -197,30 +198,24 @@ Status StoreConn::RecvFd(MEMFD_TYPE_NON_UNIQUE *fd) { ray::Status StoreConn::WriteBuffer(const std::vector &buffer) { auto status = ray::ServerConnection::WriteBuffer(buffer); - ShutdownWorkerIfLocalRayletDisconnected(status); + ShutdownWorkerIfErrorStatus(status); return status; } ray::Status StoreConn::ReadBuffer( const std::vector &buffer) { auto status = ray::ServerConnection::ReadBuffer(buffer); - ShutdownWorkerIfLocalRayletDisconnected(status); + ShutdownWorkerIfErrorStatus(status); return status; } -void StoreConn::ShutdownWorkerIfLocalRayletDisconnected(const ray::Status &status) { - // We don't explicitly check if the local raylet is dead because: - // 1. If the connection is from a core worker, the local raylet should be on the other - // side of the connection. - // 2. On the raylet side, we never proactively close the plasma store connection - // even during shutdown. So any error from the raylet side should be a sign of raylet - // death. - if (is_in_core_worker_ && !status.ok()) { - RAY_LOG(WARNING) << "The connection to the plasma store is failed because the " - "local raylet is dead. Terminate the process. Status: " - << status; +void StoreConn::ShutdownWorkerIfErrorStatus(const ray::Status &status) { + if (!status.ok() && exit_on_connection_failure_) { + RAY_LOG(WARNING) << "The connection to the plasma store is failed. Terminate the " + << "process. Status: " << status; ray::QuickExit(); - RAY_LOG(FATAL) << "Unreachable."; + RAY_LOG(FATAL) << "Accessing unreachable code. This line should never be reached " + << "after quick process exit due to plasma store connection failure."; } } } // namespace plasma diff --git a/src/ray/object_manager/plasma/connection.h b/src/ray/object_manager/plasma/connection.h index d216f27b3bf8..cdd14c6eb0d0 100644 --- a/src/ray/object_manager/plasma/connection.h +++ b/src/ray/object_manager/plasma/connection.h @@ -164,7 +164,7 @@ class StoreConn : public ray::ServerConnection { public: explicit StoreConn(ray::local_stream_socket &&socket); - explicit StoreConn(ray::local_stream_socket &&socket, bool is_in_core_worker); + explicit StoreConn(ray::local_stream_socket &&socket, bool exit_on_connection_failure); /// Receive a file descriptor for the store. /// @@ -176,12 +176,18 @@ class StoreConn : public ray::ServerConnection { ray::Status ReadBuffer(const std::vector &buffer) override; private: - // Whether the client is in a core worker. - bool is_in_core_worker_; - - // Shutdown the current process if the plasma client is in a core worker and the - // local raylet is dead. - void ShutdownWorkerIfLocalRayletDisconnected(const ray::Status &status); + // Whether the current process should exit when WriteBuffer or ReadBuffer fails. + // Currently it is only turned on when the plasma client is in a core worker. + // TODO(myan): For better error handling, we should: (1) In the mid-term, evaluate if + // we should turn it on for the plasma client in other processes. (2) In the + // long-term, consolidate the shutdown path between core worker and raylet to make the + // shutdown procedure cleaner. + bool exit_on_connection_failure_ = false; + + // Shutdown the current process if the passed in status is not OK and the client is + // configured to exit on failure. + // @param status: The status to check. + void ShutdownWorkerIfErrorStatus(const ray::Status &status); }; std::ostream &operator<<(std::ostream &os, const std::shared_ptr &store_conn); From 93f409244c248a3cc0de583496e2ca1cf4021f53 Mon Sep 17 00:00:00 2001 From: Mengjin Yan Date: Wed, 11 Jun 2025 17:21:50 -0700 Subject: [PATCH 06/10] fic typo Signed-off-by: Mengjin Yan --- src/ray/core_worker/store_provider/plasma_store_provider.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ray/core_worker/store_provider/plasma_store_provider.cc b/src/ray/core_worker/store_provider/plasma_store_provider.cc index 43b5122780a8..53969dcaff9b 100644 --- a/src/ray/core_worker/store_provider/plasma_store_provider.cc +++ b/src/ray/core_worker/store_provider/plasma_store_provider.cc @@ -65,9 +65,9 @@ CoreWorkerPlasmaStoreProvider::CoreWorkerPlasmaStoreProvider( bool warmup, std::function get_current_call_site) : raylet_client_(raylet_client), - // Here we can turn on exit_on_connection_failure on for the core worker plasma + // We can turn on exit_on_connection_failure on for the core worker plasma // client to early exist core worker after the raylet's death because on the - // raylet side, we never proactivately close the plasma store connection even + // raylet side, we never proactively close the plasma store connection even // during shutdown. So any error from the raylet side should be a sign of raylet // death. store_client_( From 46311ad906bc30451358daa78c0f52c770444d5a Mon Sep 17 00:00:00 2001 From: Mengjin Yan Date: Wed, 11 Jun 2025 18:19:32 -0700 Subject: [PATCH 07/10] Update src/ray/core_worker/store_provider/plasma_store_provider.cc Co-authored-by: Ibrahim Rabbani Signed-off-by: Mengjin Yan --- src/ray/core_worker/store_provider/plasma_store_provider.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ray/core_worker/store_provider/plasma_store_provider.cc b/src/ray/core_worker/store_provider/plasma_store_provider.cc index 53969dcaff9b..6014bac8ef78 100644 --- a/src/ray/core_worker/store_provider/plasma_store_provider.cc +++ b/src/ray/core_worker/store_provider/plasma_store_provider.cc @@ -66,7 +66,7 @@ CoreWorkerPlasmaStoreProvider::CoreWorkerPlasmaStoreProvider( std::function get_current_call_site) : raylet_client_(raylet_client), // We can turn on exit_on_connection_failure on for the core worker plasma - // client to early exist core worker after the raylet's death because on the + // client to early exit core worker after the raylet's death because on the // raylet side, we never proactively close the plasma store connection even // during shutdown. So any error from the raylet side should be a sign of raylet // death. From 9b788e15624e5968f99ec86ab0bffc8e2cb9d174 Mon Sep 17 00:00:00 2001 From: Mengjin Yan Date: Wed, 11 Jun 2025 18:19:48 -0700 Subject: [PATCH 08/10] Update src/ray/object_manager/plasma/client.cc Co-authored-by: Ibrahim Rabbani Signed-off-by: Mengjin Yan --- src/ray/object_manager/plasma/client.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ray/object_manager/plasma/client.cc b/src/ray/object_manager/plasma/client.cc index c155b5f026a0..a1d8613aaba6 100644 --- a/src/ray/object_manager/plasma/client.cc +++ b/src/ray/object_manager/plasma/client.cc @@ -237,7 +237,7 @@ class PlasmaClient::Impl : public std::enable_shared_from_this Date: Wed, 11 Jun 2025 18:20:25 -0700 Subject: [PATCH 09/10] Update src/ray/object_manager/plasma/connection.cc Co-authored-by: Ibrahim Rabbani Signed-off-by: Mengjin Yan --- src/ray/object_manager/plasma/connection.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ray/object_manager/plasma/connection.cc b/src/ray/object_manager/plasma/connection.cc index 3f45c6c64166..b9aae1d309ee 100644 --- a/src/ray/object_manager/plasma/connection.cc +++ b/src/ray/object_manager/plasma/connection.cc @@ -215,7 +215,7 @@ void StoreConn::ShutdownWorkerIfErrorStatus(const ray::Status &status) { << "process. Status: " << status; ray::QuickExit(); RAY_LOG(FATAL) << "Accessing unreachable code. This line should never be reached " - << "after quick process exit due to plasma store connection failure."; + << "after quick process exit due to plasma store connection failure. Please create a github issue at https://github.com/ray-project/ray."; } } } // namespace plasma From e58b022aa67f9f7f401c70fe21eef72cefb43c0b Mon Sep 17 00:00:00 2001 From: Mengjin Yan Date: Wed, 11 Jun 2025 20:15:42 -0700 Subject: [PATCH 10/10] fix comment Signed-off-by: Mengjin Yan --- python/ray/tests/test_failure_3.py | 3 ++- src/ray/object_manager/plasma/connection.cc | 12 +++++++----- src/ray/object_manager/plasma/connection.h | 8 +++----- 3 files changed, 12 insertions(+), 11 deletions(-) diff --git a/python/ray/tests/test_failure_3.py b/python/ray/tests/test_failure_3.py index ef3f42b048ad..759c8c38ada6 100644 --- a/python/ray/tests/test_failure_3.py +++ b/python/ray/tests/test_failure_3.py @@ -63,12 +63,13 @@ def test_plasma_store_operation_after_raylet_dies(ray_start_cluster): def get_after_raylet_dies(): raylet_pid = int(os.environ["RAY_RAYLET_PID"]) os.kill(raylet_pid, SIGKILL) + wait_for_pid_to_exit(raylet_pid) ray.put([0] * 100000) try: ray.get(get_after_raylet_dies.remote(), timeout=10) except Exception as e: - assert isinstance(e, ray.exceptions.RayletDiedError) + assert isinstance(e, ray.exceptions.LocalRayletDiedError) @pytest.mark.parametrize( diff --git a/src/ray/object_manager/plasma/connection.cc b/src/ray/object_manager/plasma/connection.cc index b9aae1d309ee..b8c7549af91b 100644 --- a/src/ray/object_manager/plasma/connection.cc +++ b/src/ray/object_manager/plasma/connection.cc @@ -198,24 +198,26 @@ Status StoreConn::RecvFd(MEMFD_TYPE_NON_UNIQUE *fd) { ray::Status StoreConn::WriteBuffer(const std::vector &buffer) { auto status = ray::ServerConnection::WriteBuffer(buffer); - ShutdownWorkerIfErrorStatus(status); + ExitIfErrorStatus(status); return status; } ray::Status StoreConn::ReadBuffer( const std::vector &buffer) { auto status = ray::ServerConnection::ReadBuffer(buffer); - ShutdownWorkerIfErrorStatus(status); + ExitIfErrorStatus(status); return status; } -void StoreConn::ShutdownWorkerIfErrorStatus(const ray::Status &status) { +void StoreConn::ExitIfErrorStatus(const ray::Status &status) { if (!status.ok() && exit_on_connection_failure_) { RAY_LOG(WARNING) << "The connection to the plasma store is failed. Terminate the " << "process. Status: " << status; ray::QuickExit(); - RAY_LOG(FATAL) << "Accessing unreachable code. This line should never be reached " - << "after quick process exit due to plasma store connection failure. Please create a github issue at https://github.com/ray-project/ray."; + RAY_LOG(FATAL) + << "Accessing unreachable code. This line should never be reached " + << "after quick process exit due to plasma store connection failure. Please " + "create a github issue at https://github.com/ray-project/ray."; } } } // namespace plasma diff --git a/src/ray/object_manager/plasma/connection.h b/src/ray/object_manager/plasma/connection.h index cdd14c6eb0d0..296e8778ce84 100644 --- a/src/ray/object_manager/plasma/connection.h +++ b/src/ray/object_manager/plasma/connection.h @@ -178,16 +178,14 @@ class StoreConn : public ray::ServerConnection { private: // Whether the current process should exit when WriteBuffer or ReadBuffer fails. // Currently it is only turned on when the plasma client is in a core worker. - // TODO(myan): For better error handling, we should: (1) In the mid-term, evaluate if - // we should turn it on for the plasma client in other processes. (2) In the - // long-term, consolidate the shutdown path between core worker and raylet to make the - // shutdown procedure cleaner. + // TODO(myan): The better way is to handle the failure outside of the plasma client + // and inside the core worker's logic and propogate the correct exception to the user. bool exit_on_connection_failure_ = false; // Shutdown the current process if the passed in status is not OK and the client is // configured to exit on failure. // @param status: The status to check. - void ShutdownWorkerIfErrorStatus(const ray::Status &status); + void ExitIfErrorStatus(const ray::Status &status); }; std::ostream &operator<<(std::ostream &os, const std::shared_ptr &store_conn);