Skip to content

Commit c91a56e

Browse files
committed
[core] Fix RAY_CHECK failure during shutdown due to plasma store race condition
Signed-off-by: Sagar Sumit <[email protected]>
1 parent 96036f7 commit c91a56e

File tree

3 files changed

+39
-2
lines changed

3 files changed

+39
-2
lines changed

src/ray/core_worker/core_worker.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -538,6 +538,10 @@ void CoreWorker::Shutdown() {
538538
RAY_LOG(INFO) << "Core worker ready to be deallocated.";
539539
}
540540

541+
bool CoreWorker::IsShuttingDown() const {
542+
return is_shutdown_.load();
543+
}
544+
541545
void CoreWorker::ConnectToRayletInternal() {
542546
// Tell the raylet the port that we are listening on.
543547
// NOTE: This also marks the worker as available in Raylet. We do this at the

src/ray/core_worker/core_worker.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,12 @@ class CoreWorker {
246246
///
247247
void Shutdown();
248248

249+
/// Check if the core worker is currently shutting down.
250+
/// This can be used to avoid operations that might fail during shutdown.
251+
///
252+
/// \return true if shutdown has been initiated, false otherwise.
253+
bool IsShuttingDown() const;
254+
249255
/// Start receiving and executing tasks.
250256
void RunTaskExecutionLoop();
251257

src/ray/core_worker/core_worker_process.cc

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -423,8 +423,35 @@ std::shared_ptr<CoreWorker> CoreWorkerProcessImpl::CreateCoreWorker(
423423
/*put_in_local_plasma_callback=*/
424424
[this](const RayObject &object, const ObjectID &object_id) {
425425
auto core_worker = GetCoreWorker();
426-
RAY_CHECK_OK(
427-
core_worker->PutInLocalPlasmaStore(object, object_id, /*pin_object=*/true));
426+
427+
// Check if the core worker is shutting down before attempting plasma operations.
428+
// During shutdown, the plasma store connection may already be broken, so we
429+
// should avoid plasma operations entirely.
430+
if (core_worker->IsShuttingDown()) {
431+
RAY_LOG(INFO) << "Skipping plasma store operation for error object " << object_id
432+
<< " because core worker is shutting down.";
433+
return;
434+
}
435+
436+
auto status = core_worker->PutInLocalPlasmaStore(object, object_id, /*pin_object=*/true);
437+
if (!status.ok()) {
438+
if (status.IsIOError() &&
439+
(status.message().find("Broken pipe") != std::string::npos ||
440+
status.message().find("Connection reset") != std::string::npos ||
441+
status.message().find("Bad file descriptor") != std::string::npos)) {
442+
// This is likely a shutdown race where the plasma store
443+
// connection was closed before we could complete the operation.
444+
// Log as warning since this is expected during shutdown scenarios.
445+
RAY_LOG(WARNING) << "Failed to put error object " << object_id
446+
<< " in plasma store due to connection error (likely shutdown): "
447+
<< status.ToString();
448+
} else {
449+
// For other types of errors, maintain the original
450+
// behavior with RAY_CHECK_OK to catch real issues.
451+
RAY_CHECK_OK(status) << "Failed to put error object " << object_id
452+
<< " in plasma store: " << status.ToString();
453+
}
454+
}
428455
},
429456
/* retry_task_callback= */
430457
[this](TaskSpecification &spec, bool object_recovery, uint32_t delay_ms) {

0 commit comments

Comments
 (0)