Skip to content

Commit af6a1dc

Browse files
codopedstrodtman
authored andcommitted
[core] Fix RAY_CHECK failure during shutdown due to plasma store race condition (#55367)
## Why are these changes needed? Workers crash with a fatal `RAY_CHECK` failure when the plasma store connection is broken during shutdown, causing the following error: ``` RAY_CHECK failed: PutInLocalPlasmaStore(object, object_id, true) Status not OK: IOError: Broken pipe ``` Stacktrace: ``` core_worker.cc:720 C Check failed: PutInLocalPlasmaStore(object, object_id, true) Status not OK: IOError: Broken pipe *** StackTrace Information *** /home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0x141789a) [0x7924dd2c689a] ray::operator<<() /home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(_ZN3ray6RayLogD1Ev+0x479) [0x7924dd2c9319] ray::RayLog::~RayLog() /home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0x95cc8a) [0x7924dc80bc8a] ray::core::CoreWorker::CoreWorker()::{lambda()#13}::operator()() /home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(_ZN3ray4core11TaskManager27MarkTaskReturnObjectsFailedERKNS_17TaskSpecificationENS_3rpc9ErrorTypeEPKNS5_12RayErrorInfoERKN4absl12lts_2023080213flat_hash_setINS_8ObjectIDENSB_13hash_internal4HashISD_EESt8equal_toISD_ESaISD_EEE+0x679) [0x7924dc868f29] ray::core::TaskManager::MarkTaskReturnObjectsFailed() /home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(_ZN3ray4core11TaskManager15FailPendingTaskERKNS_6TaskIDENS_3rpc9ErrorTypeEPKNS_6StatusEPKNS5_12RayErrorInfoE+0x416) [0x7924dc86f186] ray::core::TaskManager::FailPendingTask() /home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0x9a90e6) [0x7924dc8580e6] ray::core::NormalTaskSubmitter::RequestNewWorkerIfNeeded()::{lambda()#1}::operator()() /home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(_ZN3ray3rpc14ClientCallImplINS0_23RequestWorkerLeaseReplyEE15OnReplyReceivedEv+0x68) [0x7924dc94aa48] ray::rpc::ClientCallImpl<>::OnReplyReceived() /home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(_ZNSt17_Function_handlerIFvvEZN3ray3rpc17ClientCallManager29PollEventsFromCompletionQueueEiEUlvE_E9_M_invokeERKSt9_Any_data+0x15) [0x7924dc79e285] std::_Function_handler<>::_M_invoke() /home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0xd9b4c8) [0x7924dcc4a4c8] EventTracker::RecordExecution() /home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0xd4648e) [0x7924dcbf548e] std::_Function_handler<>::_M_invoke() /home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0xd46906) [0x7924dcbf5906] boost::asio::detail::completion_handler<>::do_complete() /home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0x13f417b) [0x7924dd2a317b] boost::asio::detail::scheduler::do_run_one() /home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0x13f5af9) [0x7924dd2a4af9] boost::asio::detail::scheduler::run() /home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0x13f6202) [0x7924dd2a5202] boost::asio::io_context::run() /home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(_ZN3ray4core10CoreWorker12RunIOServiceEv+0x91) [0x7924dc793a61] ray::core::CoreWorker::RunIOService() /home/ray/anaconda3/lib/python3.11/site-packages/ray/_raylet.so(+0xcba0b0) [0x7924dcb690b0] thread_proxy /lib/x86_64-linux-gnu/libc.so.6(+0x94ac3) [0x7924dde71ac3] /lib/x86_64-linux-gnu/libc.so.6(+0x126850) [0x7924ddf03850] ``` Stack trace flow: 1. Task lease request fails -> `NormalTaskSubmitter::RequestNewWorkerIfNeeded()` callback. 2. Triggers `TaskManager::FailPendingTask()` -> `MarkTaskReturnObjectsFailed()`. 3. System attempts to store error objects in plasma via `put_in_local_plasma_callback_`. 4. Plasma connection is broken (raylet/plasma store already shut down). 5. `RAY_CHECK_OK()` in the callback causes fatal crash instead of graceful handling. Root Cause: This is a shutdown ordering race condition: 1. Raylet shuts down first: The raylet stops its IO context ([main_service_.stop()](https://github.com/ray-project/ray/blob/77c5475195e56a26891d88460973198391d20edf/src/ray/object_manager/plasma/store_runner.cc#L146)) which closes plasma store connections. 2. Worker still processes callbacks: Core worker continues processing pending callbacks on separate threads. 3. Broken connection: When the callback tries to store error objects in plasma, the connection is already closed. 4. Fatal crash: The `RAY_CHECK_OK()` treats this as an unexpected error and crashes the process. Fix: 1. Shutdown-aware plasma operations - Add `CoreWorker::IsShuttingDown()` method to check shutdown state. - Skip plasma operations entirely when shutdown is in progress. - Prevents attempting operations on already-closed connections. 2. Targeted error handling for connection failures - Replace blanket `RAY_CHECK_OK()` with specific error type checking. - Handle connection errors (Broken pipe, Connection reset, Bad file descriptor) as warnings during shutdown scenarios. - Maintain `RAY_CHECK_OK()` for other error types to catch real issues. --------- Signed-off-by: Sagar Sumit <[email protected]> Signed-off-by: Douglas Strodtman <[email protected]>
1 parent 22175ae commit af6a1dc

File tree

5 files changed

+376
-39
lines changed

5 files changed

+376
-39
lines changed

src/ray/core_worker/core_worker.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1494,6 +1494,19 @@ class CoreWorker {
14941494
std::string *application_error);
14951495

14961496
/// Put an object in the local plasma store.
1497+
///
1498+
/// Return status semantics:
1499+
/// - Status::OK(): The object was created (or already existed) and bookkeeping was
1500+
/// updated. Note: an internal ObjectExists from the plasma provider is treated
1501+
/// as OK and does not surface here.
1502+
/// - Status::ObjectStoreFull(): The local plasma store is out of memory (or out of
1503+
/// disk when spilling). The error message contains context and a short memory
1504+
/// report.
1505+
/// - Status::IOError(): IPC/connection failures while talking to the plasma store
1506+
/// (e.g., broken pipe/connection reset during shutdown, store not reachable).
1507+
///
1508+
/// Call sites that run during shutdown may choose to tolerate IOError specifically,
1509+
/// but should treat all other statuses as real failures.
14971510
Status PutInLocalPlasmaStore(const RayObject &object,
14981511
const ObjectID &object_id,
14991512
bool pin_object);

src/ray/core_worker/core_worker_process.cc

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -423,8 +423,14 @@ std::shared_ptr<CoreWorker> CoreWorkerProcessImpl::CreateCoreWorker(
423423
/*put_in_local_plasma_callback=*/
424424
[this](const RayObject &object, const ObjectID &object_id) {
425425
auto core_worker = GetCoreWorker();
426-
RAY_CHECK_OK(
427-
core_worker->PutInLocalPlasmaStore(object, object_id, /*pin_object=*/true));
426+
auto put_status =
427+
core_worker->PutInLocalPlasmaStore(object, object_id, /*pin_object=*/true);
428+
if (!put_status.ok()) {
429+
RAY_LOG(WARNING).WithField(object_id)
430+
<< "Failed to put object in plasma store: " << put_status;
431+
return put_status;
432+
}
433+
return Status::OK();
428434
},
429435
/* retry_task_callback= */
430436
[this](TaskSpecification &spec, bool object_recovery, uint32_t delay_ms) {

src/ray/core_worker/task_manager.cc

Lines changed: 92 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -533,10 +533,10 @@ size_t TaskManager::NumPendingTasks() const {
533533
return num_pending_tasks_;
534534
}
535535

536-
bool TaskManager::HandleTaskReturn(const ObjectID &object_id,
537-
const rpc::ReturnObject &return_object,
538-
const NodeID &worker_node_id,
539-
bool store_in_plasma) {
536+
StatusOr<bool> TaskManager::HandleTaskReturn(const ObjectID &object_id,
537+
const rpc::ReturnObject &return_object,
538+
const NodeID &worker_node_id,
539+
bool store_in_plasma) {
540540
bool direct_return = false;
541541
reference_counter_.UpdateObjectSize(object_id, return_object.size());
542542
RAY_LOG(DEBUG) << "Task return object " << object_id << " has size "
@@ -579,7 +579,15 @@ bool TaskManager::HandleTaskReturn(const ObjectID &object_id,
579579
/*copy_data=*/false,
580580
tensor_transport.value_or(rpc::TensorTransport::OBJECT_STORE));
581581
if (store_in_plasma) {
582-
put_in_local_plasma_callback_(object, object_id);
582+
Status s = put_in_local_plasma_callback_(object, object_id);
583+
int retry_count = 0;
584+
while (!s.ok() && s.IsTransientObjectStoreFull() && retry_count < 3) {
585+
retry_count++;
586+
s = put_in_local_plasma_callback_(object, object_id);
587+
}
588+
if (!s.ok()) {
589+
return s;
590+
}
583591
} else {
584592
in_memory_store_.Put(object, object_id);
585593
direct_return = true;
@@ -813,10 +821,15 @@ bool TaskManager::HandleReportGeneratorItemReturns(
813821
}
814822
// When an object is reported, the object is ready to be fetched.
815823
reference_counter_.UpdateObjectPendingCreation(object_id, false);
816-
HandleTaskReturn(object_id,
817-
return_object,
818-
NodeID::FromBinary(request.worker_addr().node_id()),
819-
/*store_in_plasma=*/store_in_plasma_ids.contains(object_id));
824+
StatusOr<bool> put_res =
825+
HandleTaskReturn(object_id,
826+
return_object,
827+
NodeID::FromBinary(request.worker_addr().node_id()),
828+
/*store_in_plasma=*/store_in_plasma_ids.contains(object_id));
829+
if (!put_res.ok()) {
830+
RAY_LOG(WARNING).WithField(object_id)
831+
<< "Failed to handle streaming dynamic return: " << put_res.status();
832+
}
820833
}
821834

822835
// Handle backpressure if needed.
@@ -900,23 +913,54 @@ void TaskManager::CompletePendingTask(const TaskID &task_id,
900913
reference_counter_.AddDynamicReturn(object_id, generator_id);
901914
dynamic_return_ids.push_back(object_id);
902915
}
903-
if (!HandleTaskReturn(object_id,
904-
return_object,
905-
NodeID::FromBinary(worker_addr.node_id()),
906-
store_in_plasma_ids.contains(object_id))) {
907-
if (first_execution) {
908-
dynamic_returns_in_plasma.push_back(object_id);
916+
StatusOr<bool> direct_or =
917+
HandleTaskReturn(object_id,
918+
return_object,
919+
NodeID::FromBinary(worker_addr.node_id()),
920+
store_in_plasma_ids.contains(object_id));
921+
if (!direct_or.ok()) {
922+
RAY_LOG(WARNING).WithField(object_id)
923+
<< "Failed to handle dynamic task return: " << direct_or.status();
924+
Status st = direct_or.status();
925+
rpc::ErrorType err_type = rpc::ErrorType::WORKER_DIED;
926+
if (st.IsObjectStoreFull() || st.IsTransientObjectStoreFull()) {
927+
err_type = rpc::ErrorType::OUT_OF_MEMORY;
909928
}
929+
rpc::RayErrorInfo err_info;
930+
err_info.set_error_message(st.ToString());
931+
FailOrRetryPendingTask(task_id,
932+
err_type,
933+
&st,
934+
/*ray_error_info=*/&err_info,
935+
/*mark_task_object_failed=*/true,
936+
/*fail_immediately=*/true);
937+
return;
938+
} else if (!direct_or.value() && first_execution) {
939+
dynamic_returns_in_plasma.push_back(object_id);
910940
}
911941
}
912942
}
913943

914944
for (const auto &return_object : reply.return_objects()) {
915945
const auto object_id = ObjectID::FromBinary(return_object.object_id());
916-
if (HandleTaskReturn(object_id,
917-
return_object,
918-
NodeID::FromBinary(worker_addr.node_id()),
919-
store_in_plasma_ids.contains(object_id))) {
946+
StatusOr<bool> direct_or = HandleTaskReturn(object_id,
947+
return_object,
948+
NodeID::FromBinary(worker_addr.node_id()),
949+
store_in_plasma_ids.contains(object_id));
950+
if (!direct_or.ok()) {
951+
RAY_LOG(WARNING).WithField(object_id)
952+
<< "Failed to handle task return: " << direct_or.status();
953+
// If storing return in plasma failed, treat as system failure for this attempt.
954+
// Do not proceed with normal completion. Mark task failed immediately.
955+
Status st = direct_or.status();
956+
FailOrRetryPendingTask(task_id,
957+
rpc::ErrorType::WORKER_DIED,
958+
&st,
959+
/*ray_error_info=*/nullptr,
960+
/*mark_task_object_failed=*/true,
961+
/*fail_immediately=*/true);
962+
return;
963+
} else if (direct_or.value()) {
920964
direct_return_ids.push_back(object_id);
921965
}
922966
}
@@ -1040,10 +1084,16 @@ void TaskManager::CompletePendingTask(const TaskID &task_id,
10401084
const auto generator_return_id = spec.StreamingGeneratorReturnId(i);
10411085
RAY_CHECK_EQ(reply.return_objects_size(), 1);
10421086
const auto &return_object = reply.return_objects(0);
1043-
HandleTaskReturn(generator_return_id,
1044-
return_object,
1045-
NodeID::FromBinary(worker_addr.node_id()),
1046-
store_in_plasma_ids.contains(generator_return_id));
1087+
StatusOr<bool> res =
1088+
HandleTaskReturn(generator_return_id,
1089+
return_object,
1090+
NodeID::FromBinary(worker_addr.node_id()),
1091+
store_in_plasma_ids.contains(generator_return_id));
1092+
if (!res.ok()) {
1093+
RAY_LOG(WARNING).WithField(generator_return_id)
1094+
<< "Failed to handle generator return during app error propagation: "
1095+
<< res.status();
1096+
}
10471097
}
10481098
}
10491099
}
@@ -1454,18 +1504,26 @@ void TaskManager::MarkTaskReturnObjectsFailed(
14541504
int64_t num_returns = spec.NumReturns();
14551505
for (int i = 0; i < num_returns; i++) {
14561506
const auto object_id = ObjectID::FromIndex(task_id, /*index=*/i + 1);
1507+
// Always place an error marker in local memory to unblock waiters quickly.
1508+
in_memory_store_.Put(error, object_id);
1509+
// Best-effort plasma put if the object was meant to be in plasma.
14571510
if (store_in_plasma_ids.contains(object_id)) {
1458-
put_in_local_plasma_callback_(error, object_id);
1459-
} else {
1460-
in_memory_store_.Put(error, object_id);
1511+
Status s = put_in_local_plasma_callback_(error, object_id);
1512+
if (!s.ok()) {
1513+
RAY_LOG(WARNING).WithField(object_id)
1514+
<< "Failed to put error object in plasma: " << s;
1515+
}
14611516
}
14621517
}
14631518
if (spec.ReturnsDynamic()) {
14641519
for (const auto &dynamic_return_id : spec.DynamicReturnIds()) {
1520+
in_memory_store_.Put(error, dynamic_return_id);
14651521
if (store_in_plasma_ids.contains(dynamic_return_id)) {
1466-
put_in_local_plasma_callback_(error, dynamic_return_id);
1467-
} else {
1468-
in_memory_store_.Put(error, dynamic_return_id);
1522+
Status s = put_in_local_plasma_callback_(error, dynamic_return_id);
1523+
if (!s.ok()) {
1524+
RAY_LOG(WARNING).WithField(dynamic_return_id)
1525+
<< "Failed to put error object in plasma: " << s;
1526+
}
14691527
}
14701528
}
14711529
}
@@ -1488,7 +1546,11 @@ void TaskManager::MarkTaskReturnObjectsFailed(
14881546
for (size_t i = 0; i < num_streaming_generator_returns; i++) {
14891547
const auto generator_return_id = spec.StreamingGeneratorReturnId(i);
14901548
if (store_in_plasma_ids.contains(generator_return_id)) {
1491-
put_in_local_plasma_callback_(error, generator_return_id);
1549+
Status s = put_in_local_plasma_callback_(error, generator_return_id);
1550+
if (!s.ok()) {
1551+
RAY_LOG(WARNING).WithField(generator_return_id)
1552+
<< "Failed to put error object in plasma: " << s;
1553+
}
14921554
} else {
14931555
in_memory_store_.Put(error, generator_return_id);
14941556
}

src/ray/core_worker/task_manager.h

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
#pragma once
1616

17+
#include <functional>
1718
#include <memory>
1819
#include <string>
1920
#include <tuple>
@@ -25,6 +26,7 @@
2526
#include "absl/container/flat_hash_map.h"
2627
#include "absl/synchronization/mutex.h"
2728
#include "ray/common/id.h"
29+
#include "ray/common/status.h"
2830
#include "ray/core_worker/store_provider/memory_store/memory_store.h"
2931
#include "ray/core_worker/task_event_buffer.h"
3032
#include "ray/core_worker/task_manager_interface.h"
@@ -42,7 +44,7 @@ class ActorManager;
4244

4345
using TaskStatusCounter = CounterMap<std::tuple<std::string, rpc::TaskStatus, bool>>;
4446
using PutInLocalPlasmaCallback =
45-
std::function<void(const RayObject &object, const ObjectID &object_id)>;
47+
std::function<Status(const RayObject &object, const ObjectID &object_id)>;
4648
using RetryTaskCallback =
4749
std::function<void(TaskSpecification &spec, bool object_recovery, uint32_t delay_ms)>;
4850
using ReconstructObjectCallback = std::function<void(const ObjectID &object_id)>;
@@ -608,12 +610,12 @@ class TaskManager : public TaskManagerInterface {
608610
ABSL_LOCKS_EXCLUDED(mu_);
609611

610612
/// Update nested ref count info and store the in-memory value for a task's
611-
/// return object. Returns true if the task's return object was returned
612-
/// directly by value.
613-
bool HandleTaskReturn(const ObjectID &object_id,
614-
const rpc::ReturnObject &return_object,
615-
const NodeID &worker_node_id,
616-
bool store_in_plasma) ABSL_LOCKS_EXCLUDED(mu_);
613+
/// return object. On success, sets direct_return_out to true if the object's value
614+
/// was returned directly by value (not stored in plasma).
615+
StatusOr<bool> HandleTaskReturn(const ObjectID &object_id,
616+
const rpc::ReturnObject &return_object,
617+
const NodeID &worker_node_id,
618+
bool store_in_plasma) ABSL_LOCKS_EXCLUDED(mu_);
617619

618620
/// Remove a lineage reference to this object ID. This should be called
619621
/// whenever a task that depended on this object ID can no longer be retried.

0 commit comments

Comments
 (0)