From 9d57b2917998cac221e3331f72a50c59ce81091d Mon Sep 17 00:00:00 2001 From: Mengjin Yan Date: Thu, 7 Nov 2024 10:21:27 -0800 Subject: [PATCH 1/8] Make core worker exit with system error when receiving IOError from writing to local object store Signed-off-by: Mengjin Yan --- src/ray/raylet_client/raylet_client.cc | 5 +++-- src/ray/raylet_client/raylet_client.h | 3 ++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/ray/raylet_client/raylet_client.cc b/src/ray/raylet_client/raylet_client.cc index 938f20fab80e..e2ac08d5d363 100644 --- a/src/ray/raylet_client/raylet_client.cc +++ b/src/ray/raylet_client/raylet_client.cc @@ -85,9 +85,10 @@ Status raylet::RayletConnection::AtomicRequestReply(MessageType request_type, } void raylet::RayletConnection::ShutdownIfLocalRayletDisconnected(const Status &status) { - if (!status.ok() && IsRayletFailed(RayConfig::instance().RAYLET_PID())) { + if ((!status.ok() && IsRayletFailed(RayConfig::instance().RAYLET_PID())) || + status.IsIOError()) { RAY_LOG(WARNING) << "The connection is failed because the local raylet has been " - "dead. Terminate the process. Status: " + "dead or is unreachable. Terminate the process. Status: " << status; QuickExit(); RAY_LOG(FATAL) << "Unreachable."; diff --git a/src/ray/raylet_client/raylet_client.h b/src/ray/raylet_client/raylet_client.h index f40c97edf620..f800694e22f3 100644 --- a/src/ray/raylet_client/raylet_client.h +++ b/src/ray/raylet_client/raylet_client.h @@ -269,7 +269,8 @@ class RayletConnection { flatbuffers::FlatBufferBuilder *fbb = nullptr); private: - /// Shutdown the raylet if the local connection is disconnected. + /// Shutdown the raylet if the local connection is disconnected (either terminated or + // unreachable). void ShutdownIfLocalRayletDisconnected(const Status &status); /// The connection to raylet. std::shared_ptr conn_; From 2029d367a60c0e66b1a6c61ff50489f56a646a17 Mon Sep 17 00:00:00 2001 From: Mengjin Yan Date: Fri, 8 Nov 2024 14:43:11 -0800 Subject: [PATCH 2/8] Update the code to throw system exception upon IOError in free_objects Signed-off-by: Mengjin Yan --- python/ray/_raylet.pyx | 9 ++++++--- src/ray/raylet_client/raylet_client.cc | 9 +++++++-- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index fa49a88c691f..0b0136162ea8 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -3829,10 +3829,13 @@ cdef class CoreWorker: def free_objects(self, object_refs, c_bool local_only): cdef: c_vector[CObjectID] free_ids = ObjectRefsToVector(object_refs) - + with nogil: - check_status(CCoreWorkerProcess.GetCoreWorker().Delete( - free_ids, local_only)) + status = CCoreWorkerProcess.GetCoreWorker().Delete(free_ids, local_only) + if status.IsIOError(): + check_status(CRayStatus.UnexpectedSystemExit(status.ToString())) + else: + check_status(status) def get_local_ongoing_lineage_reconstruction_tasks(self): cdef: diff --git a/src/ray/raylet_client/raylet_client.cc b/src/ray/raylet_client/raylet_client.cc index e2ac08d5d363..35a391720788 100644 --- a/src/ray/raylet_client/raylet_client.cc +++ b/src/ray/raylet_client/raylet_client.cc @@ -85,8 +85,7 @@ Status raylet::RayletConnection::AtomicRequestReply(MessageType request_type, } void raylet::RayletConnection::ShutdownIfLocalRayletDisconnected(const Status &status) { - if ((!status.ok() && IsRayletFailed(RayConfig::instance().RAYLET_PID())) || - status.IsIOError()) { + if (!status.ok() && IsRayletFailed(RayConfig::instance().RAYLET_PID())) { RAY_LOG(WARNING) << "The connection is failed because the local raylet has been " "dead or is unreachable. Terminate the process. Status: " << status; @@ -184,6 +183,12 @@ Status raylet::RayletClient::Disconnect( auto status = conn_->WriteMessage(MessageType::DisconnectClient, &fbb); // Don't be too strict for disconnection errors. // Just create logs and prevent it from crash. + // TODO (myan): In the current implementation, if raylet is already terminated in the + // "WriteMessage" function above, the worker process will exit early in the function + // and will not reach here. However, the code path here is shared between graceful + // shutdown and force termination. We need to make sure the above early exit + // shouldn't happen during the graceful shutdown scenario and there shouldn't be any + // leak if early exit is triggered if (!status.ok()) { RAY_LOG(WARNING) << status.ToString() From 89dfa7ccae72a0c73e987dfbdddaf48fe8df9f98 Mon Sep 17 00:00:00 2001 From: Mengjin Yan Date: Fri, 8 Nov 2024 15:10:31 -0800 Subject: [PATCH 3/8] fix lint Signed-off-by: Mengjin Yan --- src/ray/raylet_client/raylet_client.cc | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/ray/raylet_client/raylet_client.cc b/src/ray/raylet_client/raylet_client.cc index 35a391720788..cba20da3cd6e 100644 --- a/src/ray/raylet_client/raylet_client.cc +++ b/src/ray/raylet_client/raylet_client.cc @@ -184,11 +184,11 @@ Status raylet::RayletClient::Disconnect( // Don't be too strict for disconnection errors. // Just create logs and prevent it from crash. // TODO (myan): In the current implementation, if raylet is already terminated in the - // "WriteMessage" function above, the worker process will exit early in the function - // and will not reach here. However, the code path here is shared between graceful - // shutdown and force termination. We need to make sure the above early exit + // "WriteMessage" function above, the worker process will exit early in the function + // and will not reach here. However, the code path here is shared between graceful + // shutdown and force termination. We need to make sure the above early exit // shouldn't happen during the graceful shutdown scenario and there shouldn't be any - // leak if early exit is triggered + // leak if early exit is triggered if (!status.ok()) { RAY_LOG(WARNING) << status.ToString() From d654c2f91a406458dd9e3fc5af8a974fb11d84d9 Mon Sep 17 00:00:00 2001 From: Mengjin Yan Date: Fri, 8 Nov 2024 15:12:44 -0800 Subject: [PATCH 4/8] revert comment change Signed-off-by: Mengjin Yan --- src/ray/raylet_client/raylet_client.cc | 2 +- src/ray/raylet_client/raylet_client.h | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/ray/raylet_client/raylet_client.cc b/src/ray/raylet_client/raylet_client.cc index cba20da3cd6e..7911a3ce0a86 100644 --- a/src/ray/raylet_client/raylet_client.cc +++ b/src/ray/raylet_client/raylet_client.cc @@ -87,7 +87,7 @@ Status raylet::RayletConnection::AtomicRequestReply(MessageType request_type, void raylet::RayletConnection::ShutdownIfLocalRayletDisconnected(const Status &status) { if (!status.ok() && IsRayletFailed(RayConfig::instance().RAYLET_PID())) { RAY_LOG(WARNING) << "The connection is failed because the local raylet has been " - "dead or is unreachable. Terminate the process. Status: " + "dead. Terminate the process. Status: " << status; QuickExit(); RAY_LOG(FATAL) << "Unreachable."; diff --git a/src/ray/raylet_client/raylet_client.h b/src/ray/raylet_client/raylet_client.h index f800694e22f3..f40c97edf620 100644 --- a/src/ray/raylet_client/raylet_client.h +++ b/src/ray/raylet_client/raylet_client.h @@ -269,8 +269,7 @@ class RayletConnection { flatbuffers::FlatBufferBuilder *fbb = nullptr); private: - /// Shutdown the raylet if the local connection is disconnected (either terminated or - // unreachable). + /// Shutdown the raylet if the local connection is disconnected. void ShutdownIfLocalRayletDisconnected(const Status &status); /// The connection to raylet. std::shared_ptr conn_; From 8148938a759557731aea284d0a9628a0d5069223 Mon Sep 17 00:00:00 2001 From: Mengjin Yan Date: Fri, 8 Nov 2024 17:16:42 -0800 Subject: [PATCH 5/8] fix lint Signed-off-by: Mengjin Yan --- python/ray/_raylet.pyx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 0b0136162ea8..998c8778ee68 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -3829,7 +3829,7 @@ cdef class CoreWorker: def free_objects(self, object_refs, c_bool local_only): cdef: c_vector[CObjectID] free_ids = ObjectRefsToVector(object_refs) - + with nogil: status = CCoreWorkerProcess.GetCoreWorker().Delete(free_ids, local_only) if status.IsIOError(): From eeba1147c5f243f5584e7edd9c53ed6c5209988c Mon Sep 17 00:00:00 2001 From: Mengjin Yan Date: Tue, 12 Nov 2024 13:54:31 -0800 Subject: [PATCH 6/8] make time calculation in test_disable_driver_logs_breakpoint more accurate Signed-off-by: Mengjin Yan --- python/ray/tests/test_output.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/python/ray/tests/test_output.py b/python/ray/tests/test_output.py index 54a5e9d33fa8..4b84b8285534 100644 --- a/python/ray/tests/test_output.py +++ b/python/ray/tests/test_output.py @@ -575,14 +575,20 @@ def test_disable_driver_logs_breakpoint(): @ray.remote def f(): while True: - time.sleep(1) + start_time = time.time() + while time.time() - start_time < 1: + time.sleep(0.1) print("hello there") sys.stdout.flush() def kill(): - time.sleep(5) + start_time = time.time() + while time.time() - start_time < 5: + time.sleep(0.1) sys.stdout.flush() - time.sleep(1) + start_time = time.time() + while time.time() - start_time < 1: + time.sleep(0.1) os._exit(0) t = threading.Thread(target=kill) From 01f5f11d4d08a99494bc7c8b85ebd6a3e33534b6 Mon Sep 17 00:00:00 2001 From: Mengjin Yan Date: Tue, 12 Nov 2024 14:28:52 -0800 Subject: [PATCH 7/8] Move the error check to core_worker.cc Signed-off-by: Mengjin Yan --- python/ray/_raylet.pyx | 6 +----- src/ray/core_worker/core_worker.cc | 7 ++++++- src/ray/core_worker/core_worker.h | 5 +++++ 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 998c8778ee68..9f596d33bd1f 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -3831,11 +3831,7 @@ cdef class CoreWorker: c_vector[CObjectID] free_ids = ObjectRefsToVector(object_refs) with nogil: - status = CCoreWorkerProcess.GetCoreWorker().Delete(free_ids, local_only) - if status.IsIOError(): - check_status(CRayStatus.UnexpectedSystemExit(status.ToString())) - else: - check_status(status) + check_status(CCoreWorkerProcess.GetCoreWorker().Delete(free_ids, local_only)) def get_local_ongoing_lineage_reconstruction_tasks(self): cdef: diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 85a253339cdd..8f7dd34b83e4 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -1868,7 +1868,12 @@ Status CoreWorker::Delete(const std::vector &object_ids, bool local_on } } // Also try to delete all objects locally. - return DeleteImpl(object_ids, local_only); + Status status = DeleteImpl(object_ids, local_only); + if (status.IsIOError()) { + return Status::UnexpectedSystemExit(status.ToString()); + } else { + return status; + } } Status CoreWorker::GetLocationFromOwner( diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 72b74774b43f..58146a1732eb 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -795,6 +795,11 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// This calls DeleteImpl() locally for objects we own, and DeleteImpl() remotely /// for objects we do not own. /// + /// If IOError is returned from DeleteImpl() when deleting objects locally, we will + /// return an UnexpectedSystemExit status instead. This is to make sure the tasks + /// that calls this function in application code can properly retry when hitting the + /// IOError. + /// /// \param[in] object_ids IDs of the objects to delete. /// \param[in] local_only Whether only delete the objects in local node, or all nodes in /// the cluster. From f202ac7e1d0f0610534a0c1e2b6a7c596bca2860 Mon Sep 17 00:00:00 2001 From: Mengjin Yan Date: Tue, 12 Nov 2024 17:51:14 -0800 Subject: [PATCH 8/8] fix lint Signed-off-by: Mengjin Yan --- python/ray/_raylet.pyx | 3 ++- src/ray/core_worker/core_worker.h | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 9f596d33bd1f..95e3de4a2536 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -3831,7 +3831,8 @@ cdef class CoreWorker: c_vector[CObjectID] free_ids = ObjectRefsToVector(object_refs) with nogil: - check_status(CCoreWorkerProcess.GetCoreWorker().Delete(free_ids, local_only)) + check_status(CCoreWorkerProcess.GetCoreWorker(). + Delete(free_ids, local_only)) def get_local_ongoing_lineage_reconstruction_tasks(self): cdef: diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 58146a1732eb..e722f21b484b 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -797,7 +797,7 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// /// If IOError is returned from DeleteImpl() when deleting objects locally, we will /// return an UnexpectedSystemExit status instead. This is to make sure the tasks - /// that calls this function in application code can properly retry when hitting the + /// that calls this function in application code can properly retry when hitting the /// IOError. /// /// \param[in] object_ids IDs of the objects to delete.