Skip to content

Conversation

@can-anyscale
Copy link
Contributor

@can-anyscale can-anyscale commented Jun 25, 2025

A crash occurs inside TaskManager when the ObjectRecoveryManager attempts to reconstruct an object for a task that has already been canceled. At a high level, we considered two options:

  1. ensure that ObjectRecoveryManager never attempts to reconstruct a canceled task — the guiding principle here is to rely on global state to keep TaskManager on its "happy path"; or
  2. make TaskManager resilient to non-happy-path scenarios, handling them gracefully.

The original solution followed the second approach, and we ultimately adopted it — with one key refinement: we introduced an explicit cancellation state within TaskManager, instead of relying on the number of retries left as an indirect indicator of cancellation.

We didn’t choose the first approach because the interaction between ObjectRecoveryManager and task cancellation (eventually leads to the call inside TaskManager ) methods can be interleaved in arbitrary ways. This makes it impractical to place a reliable synchronization check early; the only effective place to validate the state is right at the crash site.

Concretely the sequencing is:

  1. A task first marked as completed—i.e., no longer pending—but remains in the queue because its return object might need to be reconstructed later
  2. The task is canceled
  3. The task is triggered a retry to reconstruct its lost object

This happens to

  1. Both normal task and actor task
  2. A sequencing of breaking logic rather than a thread racing, can be reproduced reliability via a unitest

This fix still prevents the object from being reconstructed (as defined by the API contract here). Previously, without my fix, Ray would crash. With the fix, object reconstruction still fails, but the failure is now properly propagated as a TaskCanceled exception instead of causing a crash.

I added a

  1. cpp unit test
  2. e2e python test

that failed before the fix with the check failed, and pass afterwards.

Stack trace:

[2025-06-08 23:40:33,057 C 1553 1681] task_manager.cc:341:  Check failed: it->second.num_retries_left == -1 
*** StackTrace Information ***
/home/ray/anaconda3/lib/python3.12/site-packages/ray/_raylet.so(+0x1484c2a) [0x79d215843c2a] ray::operator<<()
/home/ray/anaconda3/lib/python3.12/site-packages/ray/_raylet.so(_ZN3ray6RayLogD1Ev+0x479) [0x79d2158466a9] ray::RayLog::~RayLog()
/home/ray/anaconda3/lib/python3.12/site-packages/ray/_raylet.so(_ZN3ray4core11TaskManager12ResubmitTaskERKNS_6TaskIDEPSt6vectorINS_8ObjectIDESaIS6_EE+0x271) [0x79d214dc32b1] ray::core::TaskManager::ResubmitTask()
/home/ray/anaconda3/lib/python3.12/site-packages/ray/_raylet.so(_ZN3ray4core21ObjectRecoveryManager17ReconstructObjectERKNS_8ObjectIDE+0x1aa) [0x79d214db492a] ray::core::ObjectRecoveryManager::ReconstructObject()
...

Test:

  • CI

@can-anyscale can-anyscale marked this pull request as ready for review June 26, 2025 00:16
@can-anyscale can-anyscale requested review from a team and Copilot June 26, 2025 00:16
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR addresses a crash caused by resubmitting a task after it’s been canceled by adding a guard in ResubmitTask and a corresponding test.

  • Return false early in TaskManager::ResubmitTask when a task is canceled.
  • Add TestResubmitCanceledTask to verify that resubmitting a canceled task fails gracefully.

Reviewed Changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.

File Description
task_manager.cc Added a check to bail out when num_retries_left is zero
task_manager_test.cc Added TestResubmitCanceledTask to cover the new cancellation path

@can-anyscale
Copy link
Contributor Author

cc @dayshah

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should do more than just return false out of this because we set the object error, which is visible to the user, based on this. I think if we just directly return false the error will become OBJECT_UNRECONSTRUCTABLE_MAX_ATTEMPTS_EXCEEDED but the status should be whatever we set it to when cancel happens, not this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense, i'll add another error type

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to do this in the test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test suite requires every test case to clean themselves up (line 153)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😨

@israbbani israbbani added the go add ONLY when ready to merge, run all tests label Jun 26, 2025
@can-anyscale can-anyscale requested a review from dayshah June 26, 2025 02:53
@can-anyscale
Copy link
Contributor Author

@dayshah's comments

@israbbani
Copy link
Contributor

This check fails when the returned object of a canceled task is reconstructed.

I'm missing something really obvious here, but what's the context for this? How does the the object_ref become available if the task was canceled? The possibilities I can think of are:

  • num_returns > 1 from a task
  • the task was a generator task
  • the task finished before it was canceled

In all of those cases, should objects be reconstructed if the user intended for the task to be canceled?

@can-anyscale can-anyscale force-pushed the can-p01 branch 2 times, most recently from 89dcc22 to 26d6a1d Compare June 26, 2025 05:00
@can-anyscale can-anyscale requested a review from a team as a code owner June 26, 2025 05:00
return ResubmitTaskResult::FAILED_MAX_ATTEMPT_EXCEEDED;
}

if (it->second.num_retries_left == 0) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main change is this condition; the rest is boilerplate due to the addition of a new field in common.proto


if (it->second.num_retries_left == 0) {
// This can happen when the task has been marked for cancellation.
return ResubmitTaskResult::FAILED_TASK_CANCELED;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i was thinking we could just use the existing cancel error type don't need a new custom one just for this situation.

// Indicates that an object has been cancelled.
TASK_CANCELLED = 5;

The user is trying to cancel, we just need to honor that cancel and not resubmit

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol that's much better

@dayshah
Copy link
Contributor

dayshah commented Jun 26, 2025

This check fails when the returned object of a canceled task is reconstructed.

I'm missing something really obvious here, but what's the context for this? How does the the object_ref become available if the task was canceled? The possibilities I can think of are:

  • num_returns > 1 from a task
  • the task was a generator task
  • the task finished before it was canceled

In all of those cases, should objects be reconstructed if the user intended for the task to be canceled?

I think the description needs to be rewritten a bit. The reason for this is that one thread, the thread the user actually calls ray.cancel on, can start the cancel, while another thread (the io_service of the reconstruction periodical runner) decides to resubmit the task. They both use the task manager. Cancelling doesn't atomically use the task manager to cancel, so you could do the first part of the cancel, set num_retries_left to 0, and then the resubmit happens, and then you use the task manager to try to fail the task and set the object to error.

if (cancelled_tasks_.find(task_spec.TaskId()) != cancelled_tasks_.end() ||
!task_finisher_.MarkTaskCanceled(task_spec.TaskId()) ||
!task_finisher_.IsTaskPending(task_spec.TaskId())) {
return Status::OK();
}
auto &scheduling_key_entry = scheduling_key_entries_[scheduling_key];
auto &scheduling_tasks = scheduling_key_entry.task_queue;
// This cancels tasks that have completed dependencies and are awaiting
// a worker lease.
if (!scheduling_tasks.empty()) {
for (auto spec = scheduling_tasks.begin(); spec != scheduling_tasks.end(); spec++) {
if (spec->TaskId() == task_spec.TaskId()) {
scheduling_tasks.erase(spec);
CancelWorkerLeaseIfNeeded(scheduling_key);
task_finisher_.FailPendingTask(task_spec.TaskId(),
rpc::ErrorType::TASK_CANCELLED);

@can-anyscale should probably leave a comment in the test too on how these things are called and how this interleaving can happen. Also might be worth thinking about any higher-level fixes, that could fix the submissible_tasks_ check failure issue too. I haven't put too much thought into what the implications of an atomic cancel might be.

@can-anyscale
Copy link
Contributor Author

return TASK_CANCELED as the error type

@can-anyscale
Copy link
Contributor Author

@israbbani - answer yours first, since I think it's simpler for me to explain. The crash was caused by the third case you mentioned—when "the task finished before it was canceled." My fix still prevents the object from being reconstructed (as defined by the API contract here). Previously, without my fix, Ray would crash. With the fix, object reconstruction still fails, but the failure is now properly propagated as a TaskCanceled exception instead of causing a crash.

@can-anyscale
Copy link
Contributor Author

@dayshah - after closer investigation, I don't think this is caused by the race in the cancelling logic (basically not the race between https://github.com/ray-project/ray/blob/master/src/ray/core_worker/transport/normal_task_submitter.cc#L711 and https://github.com/ray-project/ray/blob/master/src/ray/core_worker/transport/normal_task_submitter.cc#L740).

This crash only happens when the task is NOT pending, so the call to FailPendingTask is effectively unreachable. If you look at my test case, it's a simple case of a task being resubmitted after being canceled and no longer pending.

Another fix I considered was having MarkTaskCanceled remove the task from the submissible_map if it's not pending—but that would introduce another function that mutates shared state, which seems to increase the potential for race conditions. So I ended up going with: Hey ResubmitTask, protect yourself from external state—don’t assume anything—kind of approach.

But open for other suggestions.

@can-anyscale can-anyscale requested a review from dayshah June 26, 2025 16:31
@israbbani
Copy link
Contributor

israbbani commented Jun 26, 2025

@can-anyscale thanks for the explanation! I think the PR description can be updated to state these points more clearly:

  1. Which threads is the race between?
  2. Which part of the task life cycle does this happen in?
  3. Which execution models does this is happen in? (e.g. actor tasks, normal tasks, async actors etc)

The crash was caused by the third case you mentioned—when "the task finished before it was canceled."

What happens in the other cases?

@israbbani
Copy link
Contributor

. Also might be worth thinking about any higher-level fixes, that could fix the submissible_tasks_ check failure issue too. I haven't put too much thought into what the implications of an atomic cancel might be.

@dayshah what's the submissible_tasks_ check failure issue?

@israbbani israbbani self-assigned this Jun 26, 2025
@can-anyscale
Copy link
Contributor Author

found some relevant PRs lol #48661; I'll sync with @dayshah first to understand different scenarios of task cancellation then I'll update the description.

@can-anyscale
Copy link
Contributor Author

@dayshah , @israbbani : i added a python e2e test to reproduce the issue without the fix, and works with the fix; hopefully that makes the situation clearer

without the fix, the test_ray_cancel crashes:

Screenshot 2025-06-26 at 10 39 01 AM

@can-anyscale can-anyscale force-pushed the can-p01 branch 3 times, most recently from 329797c to 7474054 Compare July 1, 2025 19:08
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix this while i'm here

@can-anyscale
Copy link
Contributor Author

Discussed offline with @dayshah and @israbbani — we’ll add a boolean is_cancelled field to the TaskEntry object instead of relying on num_retries_left to represent cancellation state, as the latter is error-prone. The rest of the logic is similar to before.

@can-anyscale can-anyscale requested a review from israbbani July 1, 2025 19:09
Copy link
Contributor

@dayshah dayshah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you also write up some context on why we're not doing this check earlier in the call stack.

And also what happens if cancel after releasing the task manager mutex and before resubmitting - the retry will run to completion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit but would prefer calling it producer/consumer

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. It's easier to understand and it matches what Dhyey did with the other tests.

@can-anyscale
Copy link
Contributor Author

@dayshah's comments

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dayshah: answer your questions via a comment here

Copy link
Collaborator

@edoakes edoakes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Please update the description to reflect the commentary you provided offline.

Comment on lines 663 to 665
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Whether the task has been marked for cancellation.
bool is_cancelled;
// Whether the task has been marked for cancellation.
// Cancelled tasks will never be retried.
bool is_cancelled;

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please also specify this in the MarkTaskCanceled comment

@can-anyscale
Copy link
Contributor Author

@edoakes's comments

Copy link
Contributor

@israbbani israbbani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. 🚢 . A few easy nits.

Comment on lines 41 to 45
enum class ResubmitTaskResult {
SUCCESS,
FAILED_MAX_ATTEMPT_EXCEEDED,
FAILED_TASK_CANCELED
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea of the enum and it's in the right direction. Should this be a Status instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i end up not using this, Dhyey's pr reuses the rpc status code instead which is good

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
2. Submit a task to the worker node to generate a big object.
2. Submit a task to the worker node to generate an object big enough to store in plasma.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
6. Force a retry task to be scheduled on the new worker node to regenerate the big object.
6. Force a retry task to be scheduled on the new worker node to reconstruct the object.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. It's easier to understand and it matches what Dhyey did with the other tests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why num_cpus=2?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just so that it doesn't run on the head node

Comment on lines 364 to 369
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODOs should have linked issues that provide additional context. We can make this a little more concise e.g.

Suggested change
// TODO(can-anyscale): There is an existing race condition (even before this comment
// was added) where a task can still be retried after its retry count has reached
// zero—for example, in the case of task cancellation—because this code is still
// reachable. The root cause is that we cannot guard the callback invocation with
// the TaskManager mutex, since the callback itself acquires that mutex. As a result,
// we cannot guarantee the task's state before the callback runs.
// TODO(can-anyscale): There is a race condition where a task can still be retried
// after its retry count has reached zero. Additional information in <ISSUE>

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: cancelled is the British spelling. We should stick with Canceled (e.g. TaskManager::MarkTaskCanceled)

@can-anyscale
Copy link
Contributor Author

@israbbani's comments

@can-anyscale can-anyscale enabled auto-merge (squash) July 2, 2025 00:11
@can-anyscale can-anyscale merged commit ba6eed1 into master Jul 2, 2025
6 checks passed
@can-anyscale can-anyscale deleted the can-p01 branch July 2, 2025 01:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

go add ONLY when ready to merge, run all tests