Skip to content

Conversation

@Sparks0219
Copy link
Contributor

@Sparks0219 Sparks0219 commented Oct 22, 2025

Description

Briefly describe what this PR accomplishes and why it's needed.

Makes CancelTask RPC Fault Tolerant. Created an intermediary RPC similar to what was done in #57648 in that when the force_exit flag is enabled for cancel, the executor worker is shut down gracefully. However we have no way of determining whether the shutdown was successful on the owner core worker, hence we send the cancel request to the raylet via a new RPC CancelLocalTask that guarantees the worker is killed. Added a python test to verify retry behavior, leaving out the cpp test after talking to @dayshah due to being a bit complicated in that we need to take into account all orderings of the owner/executor states in the cancellation process.

In the current task submission path, we don't keep track of the raylet address of the worker when we receive the PushTask RPC. It's a bit complicated to do this since the GCS keeps track of actor lifecycles including requesting leases, hence instead of touching the hot path (task submission) we decided to just complicate the cancellation path. Upon receiving a CancelTask RPC, we will query the gcs node cache to get the node info. Only if its not in the cache do we then query the GCS. Unfortunately the gcs node cache is currently not thread safe and show only be accessed on the main io service hence we refactored Normal/ActorTaskSubmitter so that it posts the portion of the code that accesses the cache onto the main io service.

There was also a race condition in CancelLocalTask/KillLocalActor where send_reply_callback could be triggered twice if we receive the response from CancelTask/KillActor, but the worker is not evicted from the raylet worker pool immediately. Hence the callback in execute_after could trigger. So added a replied boolean flag to guard against this.

CancelTask when force_kill=true behavior has been modified to trigger a SIGKILL after a set amount of time if the graceful shutdown from the worker hangs.

Lastly, the actor task retry did not use the config used in the normal task retry timer. Updated this

@Sparks0219 Sparks0219 requested a review from a team as a code owner October 22, 2025 21:38
@Sparks0219 Sparks0219 added the go add ONLY when ready to merge, run all tests label Oct 22, 2025
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request makes the CancelTask RPC fault-tolerant by introducing an intermediary CancelLocalTask RPC to the raylet. This ensures that when a task is cancelled with force=True, the worker process is guaranteed to be killed, even if the graceful shutdown fails. The changes touch both normal task and actor task submission paths, and include a new Python test to verify the fault tolerance and idempotency of the cancellation logic.

My review identifies a critical bug in the new HandleCancelLocalTask implementation where a reply callback could be invoked twice, potentially crashing the raylet. I've also pointed out a minor issue with a misleading log message. Overall, the approach is sound, but the race condition needs to be fixed.

cursor[bot]

This comment was marked as outdated.

Signed-off-by: joshlee <[email protected]>
cursor[bot]

This comment was marked as outdated.

@ray-gardener ray-gardener bot added the core Issues that should be addressed in Ray Core label Oct 23, 2025
Signed-off-by: joshlee <[email protected]>
cursor[bot]

This comment was marked as outdated.

Signed-off-by: joshlee <[email protected]>
@Sparks0219 Sparks0219 requested a review from dayshah October 24, 2025 20:50
Signed-off-by: joshlee <[email protected]>
Signed-off-by: joshlee <[email protected]>
Signed-off-by: joshlee <[email protected]>
@edoakes
Copy link
Collaborator

edoakes commented Nov 10, 2025

@dayshah PTAL

Signed-off-by: joshlee <[email protected]>
std::chrono::high_resolution_clock::now().time_since_epoch()) {
cancel_retry_timer_.expires_after(boost::asio::chrono::milliseconds(
RayConfig::instance().cancellation_retry_ms()));
auto do_cancel_local_task =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like this formatting change by the pre commit hooks, yuck.....

Signed-off-by: joshlee <[email protected]>
@Sparks0219 Sparks0219 requested a review from dayshah November 13, 2025 00:58

// Keep retrying every 2 seconds until a task is officially
// finished.
if (!task_manager_.GetTaskSpec(task_id)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's a different task manager api to check if a task is finished / failed
IsTaskPending() -> this is what the normal task submitter uses... this doesn't do that. Ideally the raylet should be able to tell you whether to retry or not and you shouldn't need this anyways?

Also what does the raylet respond with if the actor successully tried to cancel the task. It doesn't retry in that case right??

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

talked offline, this check isn't necessary since when calling RetryCancelTask it'll check via IsTaskPending whether the task spec is still present or not.

reply->set_attempt_succeeded(cancel_task_reply.attempt_succeeded());
reply->set_requested_task_running(cancel_task_reply.requested_task_running());
send_reply_callback(Status::OK(), nullptr, nullptr);
timer->cancel();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's some race here where the timer kicks off the callback before you cancel the timer but after you send the reply callback and you'll try to access reply after doing send_reply_callback and send the reply twice

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya good point, I think I guarded against the case where both callbacks are queued but only if the time exceeded callback is queued first. For the case where the rpc callback is queued first, I assumed the if (current_worker) check should guard against the time exceeded callback but now I don't think that's always true, the ipc worker death callback might not have been queued yet/later. Extended the replied flag to take into account whether the time exceeded callback or the main rpc callback is executed first.

Signed-off-by: joshlee <[email protected]>
Copy link
Collaborator

@edoakes edoakes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me overall, some minor comments inline, only bigger thing is I think we should just make the node cache thread safe.

If you think that's too big a change to do here, we can do it separately and then rebase this PR.

Or if you think it's a bad idea, let me know :)

<< " did not exit after "
<< RayConfig::instance().kill_worker_timeout_milliseconds()
<< "ms, force killing with SIGKILL.";
DestroyWorker(current_worker,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(pontificating)

a more sane implementation in the past when this was written would have been to implement non-force cancelation as SIGTERM. then the graceful shutdown path and escalation path would be fully unified. we probably could still do that, but big refactoring for no real benefit

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🙏 agreed, that would be nice to have

Signed-off-by: joshlee <[email protected]>
@Sparks0219
Copy link
Contributor Author

Pending merge on #58947 then should rebase this PR and remove io context posts

edoakes pushed a commit that referenced this pull request Nov 26, 2025
…58947)

> Briefly describe what this PR accomplishes and why it's needed.

This PR was motivated by #58018 where we call methods of the gcs node
info accessor potentially from the user's python cancel thread,
potentially causing thread safety issues. I did the trivial solution of
adding a mutex onto node_cache_address_and_liveness_ cache. The one
downside of this is instead of returning ptrs to the
GcsNodeAddressAndLiveness objects in the cache, I return them by value
instead. I didn't want to allow access to the mutex that guards the
cache outside of the accessor since I think it's a bad precedent/will
create a mess.

---------

Signed-off-by: joshlee <[email protected]>
KaisennHu pushed a commit to KaisennHu/ray that referenced this pull request Nov 26, 2025
…ay-project#58947)

> Briefly describe what this PR accomplishes and why it's needed.

This PR was motivated by ray-project#58018 where we call methods of the gcs node
info accessor potentially from the user's python cancel thread,
potentially causing thread safety issues. I did the trivial solution of
adding a mutex onto node_cache_address_and_liveness_ cache. The one
downside of this is instead of returning ptrs to the
GcsNodeAddressAndLiveness objects in the cache, I return them by value
instead. I didn't want to allow access to the mutex that guards the
cache outside of the accessor since I think it's a bad precedent/will
create a mess.

---------

Signed-off-by: joshlee <[email protected]>
Copy link
Collaborator

@edoakes edoakes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Test failure but doesn't look related (in Serve)

if (timer) {
RAY_LOG(WARNING) << "Escalating graceful shutdown to SIGKILL instead.";
return;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Missing reply when CancelTask RPC fails with force_kill=false

When a CancelTask RPC fails and force_kill is false, the callback returns without calling send_reply_callback, causing the client to hang waiting for a response that never comes. The replied flag is also never set, and the reply fields are never populated. The code should send a reply in both the force_kill true and false cases when the RPC fails.

Fix in Cursor Fix in Web

SheldonTsen pushed a commit to SheldonTsen/ray that referenced this pull request Dec 1, 2025
…ay-project#58947)

> Briefly describe what this PR accomplishes and why it's needed.

This PR was motivated by ray-project#58018 where we call methods of the gcs node
info accessor potentially from the user's python cancel thread,
potentially causing thread safety issues. I did the trivial solution of
adding a mutex onto node_cache_address_and_liveness_ cache. The one
downside of this is instead of returning ptrs to the
GcsNodeAddressAndLiveness objects in the cache, I return them by value
instead. I didn't want to allow access to the mutex that guards the
cache outside of the accessor since I think it's a bad precedent/will
create a mess.

---------

Signed-off-by: joshlee <[email protected]>
Signed-off-by: joshlee <[email protected]>
@Sparks0219 Sparks0219 requested a review from a team as a code owner December 3, 2025 07:18
except asyncio.CancelledError:
print("Cancelled!")
signal_actor.send.remote()
yield "hi"
Copy link
Contributor Author

@Sparks0219 Sparks0219 Dec 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@edoakes It looks like this test got flakier from my changes.
What I observed before my changes was:
1.) Proxy Actor sends a CancelTask RPC to ServeReplica
2.) ServeReplica processes the CancelTask RPC
3.) SignalActor.send.remote() gets sent
4.) CancelChildren doesn't find any pending children tasks to cancel

With my changes 3/4 are flipped, and CancelChildren is cancelling the queued send.remote() task before it fires, so it's timing out. It looks like you ran into the same issue here: https://github.com/ray-project/ray/pull/43320/files#diff-463bbcf17174b07dd1780cae9d6b719b248a0245fa029f8d8f280bf092d4db45R336 and fixed it for the other serve cancellation tests, so I moved this one to also use send_signal_on_cancellation.

Copy link
Contributor Author

@Sparks0219 Sparks0219 Dec 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still trying to figure out why it got more flaky, reverted back to the last time this PR passed CI but it still is flaky locally then for me. I'd expect the timing to change a bit due to my cancellation path changes, but I would've thought it would've slowed the cancellation path due to the node status cache access in actor/normal task submitter so 3/4 should've been less flaky 🤔

@Sparks0219 Sparks0219 requested a review from edoakes December 3, 2025 07:44
Signed-off-by: joshlee <[email protected]>
Signed-off-by: joshlee <[email protected]>
Copy link
Contributor

@abrarsheikh abrarsheikh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good from serve

Signed-off-by: joshlee <[email protected]>
Copy link
Contributor

@dayshah dayshah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 super minor nits

inline void SendCancelLocalTask(std::shared_ptr<gcs::GcsClient> gcs_client,
const NodeID &node_id,
std::function<void(const rpc::Address &)> cancel_callback,
std::function<void()> failure_callback) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how do you feel about cancel callback taking an address optional instead of having a separate failure callback here? Up to personal choice

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mmm I think I'll leave it as is, the place where the clean up for cancelled_tasks is happening in the cancel_callback is within the callback of CancelLocalTask, so I'd have to have two places now. Think it's more clear this way?

<< "with Worker ID: " << executor_worker_id;
if (timer) {
RAY_LOG(WARNING) << "Escalating graceful shutdown to SIGKILL instead.";
return;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm unsure if both of these should be warnings. For the force_kill case (if timer case), this is expected behavior so shouldn't be a warning imo.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea that's a good point, I'll have the timer log be an info log then and keep the one above be a warning.

Signed-off-by: joshlee <[email protected]>
@Sparks0219 Sparks0219 requested a review from dayshah December 4, 2025 23:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Issues that should be addressed in Ray Core go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants