-
Notifications
You must be signed in to change notification settings - Fork 7k
[core] Fix RAY_CHECK failure during shutdown due to plasma store race condition #55367
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
9785231
cc9921f
a1c5f04
c674e0d
136c43c
b0c6b1b
c561af8
ecc128d
0241078
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -246,6 +246,12 @@ class CoreWorker { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| void Shutdown(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// Check if the core worker is currently shutting down. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// This can be used to avoid operations that might fail during shutdown. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// \return true if shutdown has been initiated, false otherwise. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| bool IsShuttingDown() const; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// Start receiving and executing tasks. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| void RunTaskExecutionLoop(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -1493,6 +1499,19 @@ class CoreWorker { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| std::string *application_error); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// Put an object in the local plasma store. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// Return status semantics: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// - Status::OK(): The object was created (or already existed) and bookkeeping was | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// updated. Note: an internal ObjectExists from the plasma provider is treated | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// as OK and does not surface here. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// - Status::ObjectStoreFull(): The local plasma store is out of memory (or out of | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// disk when spilling). The error message contains context and a short memory | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// report. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what is standard behavior here if we're out of spill (do we just crash everywhere)... how do we recover because this could result in all kinds of problems. Also this is guaranteed to try to spill right, because the usage is always that we're storing a primary copy through this code path?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Spilling is attempted automatically on puts through this path:
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// - Status::IOError(): IPC/connection failures while talking to the plasma store | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// (e.g., broken pipe/connection reset during shutdown, store not reachable). | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+1498
to
+1506
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nice! |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// Call sites that run during shutdown may choose to tolerate IOError specifically, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// but should treat all other statuses as real failures. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Status PutInLocalPlasmaStore(const RayObject &object, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const ObjectID &object_id, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| bool pin_object); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -533,10 +533,10 @@ size_t TaskManager::NumPendingTasks() const { | |
| return num_pending_tasks_; | ||
| } | ||
|
|
||
| bool TaskManager::HandleTaskReturn(const ObjectID &object_id, | ||
| const rpc::ReturnObject &return_object, | ||
| const NodeID &worker_node_id, | ||
| bool store_in_plasma) { | ||
| StatusOr<bool> TaskManager::HandleTaskReturn(const ObjectID &object_id, | ||
| const rpc::ReturnObject &return_object, | ||
| const NodeID &worker_node_id, | ||
| bool store_in_plasma) { | ||
| bool direct_return = false; | ||
| reference_counter_.UpdateObjectSize(object_id, return_object.size()); | ||
| RAY_LOG(DEBUG) << "Task return object " << object_id << " has size " | ||
|
|
@@ -579,7 +579,15 @@ bool TaskManager::HandleTaskReturn(const ObjectID &object_id, | |
| /*copy_data=*/false, | ||
| tensor_transport.value_or(rpc::TensorTransport::OBJECT_STORE)); | ||
| if (store_in_plasma) { | ||
| put_in_local_plasma_callback_(object, object_id); | ||
| Status s = put_in_local_plasma_callback_(object, object_id); | ||
| int retry_count = 0; | ||
| while (!s.ok() && s.IsTransientObjectStoreFull() && retry_count < 3) { | ||
| retry_count++; | ||
| s = put_in_local_plasma_callback_(object, object_id); | ||
| } | ||
| if (!s.ok()) { | ||
| return s; | ||
| } | ||
| } else { | ||
| in_memory_store_.Put(object, object_id); | ||
| direct_return = true; | ||
|
|
@@ -813,10 +821,15 @@ bool TaskManager::HandleReportGeneratorItemReturns( | |
| } | ||
| // When an object is reported, the object is ready to be fetched. | ||
| reference_counter_.UpdateObjectPendingCreation(object_id, false); | ||
| HandleTaskReturn(object_id, | ||
| return_object, | ||
| NodeID::FromBinary(request.worker_addr().node_id()), | ||
| /*store_in_plasma=*/store_in_plasma_ids.contains(object_id)); | ||
| StatusOr<bool> put_res = | ||
| HandleTaskReturn(object_id, | ||
| return_object, | ||
| NodeID::FromBinary(request.worker_addr().node_id()), | ||
| /*store_in_plasma=*/store_in_plasma_ids.contains(object_id)); | ||
| if (!put_res.ok()) { | ||
| RAY_LOG(WARNING).WithField(object_id) | ||
| << "Failed to handle streaming dynamic return: " << put_res.status(); | ||
| } | ||
| } | ||
|
|
||
| // Handle backpressure if needed. | ||
|
|
@@ -900,23 +913,49 @@ void TaskManager::CompletePendingTask(const TaskID &task_id, | |
| reference_counter_.AddDynamicReturn(object_id, generator_id); | ||
| dynamic_return_ids.push_back(object_id); | ||
| } | ||
| if (!HandleTaskReturn(object_id, | ||
| return_object, | ||
| NodeID::FromBinary(worker_addr.node_id()), | ||
| store_in_plasma_ids.contains(object_id))) { | ||
| if (first_execution) { | ||
| dynamic_returns_in_plasma.push_back(object_id); | ||
| } | ||
| StatusOr<bool> direct_or = | ||
| HandleTaskReturn(object_id, | ||
| return_object, | ||
| NodeID::FromBinary(worker_addr.node_id()), | ||
| store_in_plasma_ids.contains(object_id)); | ||
| if (!direct_or.ok()) { | ||
| RAY_LOG(WARNING).WithField(object_id) | ||
| << "Failed to handle dynamic task return: " << direct_or.status(); | ||
| // Treat as system failure for this attempt and fail immediately to avoid hangs. | ||
| Status st = direct_or.status(); | ||
| FailOrRetryPendingTask(task_id, | ||
| rpc::ErrorType::WORKER_DIED, | ||
codope marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| &st, | ||
| /*ray_error_info=*/nullptr, | ||
| /*mark_task_object_failed=*/true, | ||
| /*fail_immediately=*/true); | ||
| return; | ||
| } else if (!direct_or.value() && first_execution) { | ||
| dynamic_returns_in_plasma.push_back(object_id); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| for (const auto &return_object : reply.return_objects()) { | ||
| const auto object_id = ObjectID::FromBinary(return_object.object_id()); | ||
| if (HandleTaskReturn(object_id, | ||
| return_object, | ||
| NodeID::FromBinary(worker_addr.node_id()), | ||
| store_in_plasma_ids.contains(object_id))) { | ||
| StatusOr<bool> direct_or = HandleTaskReturn(object_id, | ||
| return_object, | ||
| NodeID::FromBinary(worker_addr.node_id()), | ||
| store_in_plasma_ids.contains(object_id)); | ||
| if (!direct_or.ok()) { | ||
| RAY_LOG(WARNING).WithField(object_id) | ||
| << "Failed to handle task return: " << direct_or.status(); | ||
| // If storing return in plasma failed, treat as system failure for this attempt. | ||
| // Do not proceed with normal completion. Mark task failed immediately. | ||
| Status st = direct_or.status(); | ||
| FailOrRetryPendingTask(task_id, | ||
| rpc::ErrorType::WORKER_DIED, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. WORKER_DIED isn't the right error type here, worker died is normally referring to the executor worker dying, but here something wrong is happening on the owner, the error type should be different
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed. We now consistently map Status → ErrorType via MapStatusToErrorType (IOError->LOCAL_RAYLET_DIED, ObjectStoreFull/Transient->OUT_OF_MEMORY, OutOfDisk->OUT_OF_DISK_ERROR; default->WORKER_DIED). |
||
| &st, | ||
| /*ray_error_info=*/nullptr, | ||
| /*mark_task_object_failed=*/true, | ||
| /*fail_immediately=*/true); | ||
| return; | ||
| } else if (direct_or.value()) { | ||
| direct_return_ids.push_back(object_id); | ||
| } | ||
| } | ||
|
|
@@ -1040,10 +1079,16 @@ void TaskManager::CompletePendingTask(const TaskID &task_id, | |
| const auto generator_return_id = spec.StreamingGeneratorReturnId(i); | ||
| RAY_CHECK_EQ(reply.return_objects_size(), 1); | ||
| const auto &return_object = reply.return_objects(0); | ||
| HandleTaskReturn(generator_return_id, | ||
| return_object, | ||
| NodeID::FromBinary(worker_addr.node_id()), | ||
| store_in_plasma_ids.contains(generator_return_id)); | ||
| StatusOr<bool> res = | ||
| HandleTaskReturn(generator_return_id, | ||
| return_object, | ||
| NodeID::FromBinary(worker_addr.node_id()), | ||
| store_in_plasma_ids.contains(generator_return_id)); | ||
| if (!res.ok()) { | ||
| RAY_LOG(WARNING).WithField(generator_return_id) | ||
| << "Failed to handle generator return during app error propagation: " | ||
| << res.status(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a warning log and forget, if this is possible this seems pretty bad, a cluster could hang waiting on this return value or am i missing that something downstream is handling this
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe it makes sense for HandleTaskReturn to just handle all error types instead of just ObjectStoreFull and not propagate up so we don't have to handle at every upstream location
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Addressed. No log-and-forget remains: on any HandleTaskReturn error we immediately FailOrRetryPendingTask with the mapped error and stop, so waiters are unblocked. I recommend keeping HandleTaskReturn side‑effect free (it already returns StatusOr) and centralizing failure policy in TaskManager callsites. To reduce duplication, I added a small helper (MapStatusToErrorType) and use it in the few places we call FailOrRetryPendingTask. |
||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -1454,18 +1499,26 @@ void TaskManager::MarkTaskReturnObjectsFailed( | |
| int64_t num_returns = spec.NumReturns(); | ||
| for (int i = 0; i < num_returns; i++) { | ||
| const auto object_id = ObjectID::FromIndex(task_id, /*index=*/i + 1); | ||
| // Always place an error marker in local memory to unblock waiters quickly. | ||
| in_memory_store_.Put(error, object_id); | ||
| // Best-effort plasma put if the object was meant to be in plasma. | ||
| if (store_in_plasma_ids.contains(object_id)) { | ||
| put_in_local_plasma_callback_(error, object_id); | ||
| } else { | ||
| in_memory_store_.Put(error, object_id); | ||
| Status s = put_in_local_plasma_callback_(error, object_id); | ||
| if (!s.ok()) { | ||
| RAY_LOG(WARNING).WithField(object_id) | ||
| << "Failed to put error object in plasma: " << s; | ||
| } | ||
| } | ||
| } | ||
| if (spec.ReturnsDynamic()) { | ||
| for (const auto &dynamic_return_id : spec.DynamicReturnIds()) { | ||
| in_memory_store_.Put(error, dynamic_return_id); | ||
| if (store_in_plasma_ids.contains(dynamic_return_id)) { | ||
| put_in_local_plasma_callback_(error, dynamic_return_id); | ||
| } else { | ||
| in_memory_store_.Put(error, dynamic_return_id); | ||
| Status s = put_in_local_plasma_callback_(error, dynamic_return_id); | ||
| if (!s.ok()) { | ||
| RAY_LOG(WARNING).WithField(dynamic_return_id) | ||
| << "Failed to put error object in plasma: " << s; | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -1488,7 +1541,11 @@ void TaskManager::MarkTaskReturnObjectsFailed( | |
| for (size_t i = 0; i < num_streaming_generator_returns; i++) { | ||
| const auto generator_return_id = spec.StreamingGeneratorReturnId(i); | ||
| if (store_in_plasma_ids.contains(generator_return_id)) { | ||
| put_in_local_plasma_callback_(error, generator_return_id); | ||
| Status s = put_in_local_plasma_callback_(error, generator_return_id); | ||
| if (!s.ok()) { | ||
| RAY_LOG(WARNING).WithField(generator_return_id) | ||
| << "Failed to put error object in plasma: " << s; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is a log and forget with no memory store put, could lead to a hanging cluster
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed. We always write the error to the in-memory store first (unblocks), then best-effort plasma put with warning on failure. |
||
| } | ||
| } else { | ||
| in_memory_store_.Put(error, generator_return_id); | ||
| } | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.