Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Ray Data job hanging with flooded Cancelling stale RPC with seqno 125 < 127 error #50814

Open
lee1258561 opened this issue Feb 21, 2025 · 1 comment
Labels
bug Something that is supposed to be working; but isn't core Issues that should be addressed in Ray Core P1 Issue that should be fixed within a few weeks

Comments

@lee1258561
Copy link

What happened + What you expected to happen

We have jobs that using ray data to stream data for training.

However from time to time Ray data could suddenly hanging with a flooded amount of Cancelling stale RPC with seqno from Ray Core.

[36m(MapWorker(ReadParquetBulk->MapBatches(create_and_cast_to_null_safe_schema)->MapBatches(CollateBatchGenerator)) pid=1855751, ip=10.12.5.202)[0m [2024-12-29 17:40:18,679 E 1855751 1855751] actor_scheduling_queue.cc:135: Cancelling stale RPC with seqno 125 < 127

Observation 1: Ray Data inifinitely retry failed actor task

Image

Based on the Scheduled Task State (screenshot is the same ray dashboard data visualized in different UI) in ray dashboard we found that there are flooded failed actor tasks start when the hanging and flood error message occured:

  • the workderid 3ecf62cff60992c8ccb48ecfca612604c42a4ee31e1ed5501ddfdfca points to a SplitCoordinator that submit ray data tasks
  • The flooded failed task is a readparquetbulk-_mapbatches_create_and_cast_to_null_safe_schema_-_mapbatches_collatebatchgenerator_ which is a ray data task

If we further look at the flood failed tasks it first start with 2 normal execution of the tasks with isretry=0, followed by an infinite of them with isretry=1

Ray Data do overwrite the task retry and actor restart to infinite amount of time by default [doc for Ray Core retry setting]
Conclusion: This suggests that Ray is infinitely retrying the 2 failed tasks but never been able to complete it successfully

Observation 2: The first normal task submission failed with network flakyness & Connection reset by peer error, followed by infinite error of ``Cancelling stale RPC with seqno`

Based on Observation 2 we know the worker id of the task submitter is 3ecf62cff60992c8ccb48ecfca612604c42a4ee31e1ed5501ddfdfca and we can inspect the log of it to find more detail:

  • The first two failed tasks attempt shows a status of GrpcUnavailable: RPC Error message: recvmsg:Connection reset by peer;
  • All following task attempt failed with Invalid: client cancelled stale rpc instead. This is a Ray specific logic here
  • We also found out that in the hardware level, at the same time the first Connection reset by peer error happens, there is an AWS instance ENA driver reset happens in the same time which causing 5~10 sec temporary network outage We have managed to reproduce on an environment in which this network outage does not happens and don't see this issue.

Hardware Demesg
Image

Conclusion: This suggests that the first tasks attempt failed is due to network stability. Once this happens, all following attempts enter a bad state and always failed with the some ray logic
Note that in both cause the error_type is ACTOR_DIED, this is important to combine with the following observation

First two failed tasks attempt Rest of failed attempt
Image Image

Observation 3: On the actor that recieving the task attempt, it successfully receive and process the tasks, but failed to return the results object ref to the caller.

Based on the Observation 3, we can also now that:

  • It is always the same 2 tasks being retrying with id:
    • cca400b25197651bb30f279ea44e42eb66a9734202000000
    • 011a883bd654bfd4b30f279ea44e42eb66a9734202000000
  • It is always submitting to the same actor with:
    • Id b30f279ea44e42eb66a9734202000000, with pid 1855751

And based on the above information we can identify the log file for the tasks receiver and in found that the receiver first start with two line of Failed to report streaming generator return then constantly rejecting the retry attempt with Cancelling stale RPC with seqno

Image

Conclusion: This indicates that:

  • The first two tasks attempt to create the streaming generator on the actor is actually succeed on receiver side, but it failed to return the ref to submitter side due to network stability
  • This actor is still keep recieving tasks attempt from the other side. Though the submitter should have already treated this actor as dead
    -Actor receiving tasks entered in unrecoverable state and keep canceling the request

Observation 4: Code digging about why the subsequence tasks attempt keep getting cancel

Actor task receiver logic

Code pointer: https://github.com/ray-project/ray/blob/3db8062692df51ee455d5941480ff94655a06439/src/ray/core_worker/transport/actor_scheduling_queue.cc

Receiver will receive task request along with a sequence number, the actor execute the request based on this sequence number to ensure the tasks submitted from the same caller is executed in the same order of caller submission order [ray doc]

  • Receiver maintain a next_seq_no_ indicating what is the next expected task seq no to execute and cancel all the previous tasks which is the logic that keep canceling the task requests in our case
  • Next_seq_no_ is updated in several scenario:
    • When a task is executed, increment the value regardless success or failure
    • When receiver side alread process_up_to_ certain seq_no, update Next_seq_no_ to process_up_to_ + 1
      • Based on the code comment. This is mostly for the cases where receiver actor died and restart, the receiver can fast forward to the request that submitter has not received value for and skip all the previous request. Instead of waiting for it to timeout.
    • Timeout waiting for the Next_seq_no_ task, and cancel all the queue tasks request before it

Actor task submitter logic

Submitter side maintain two values for tasks submitted to each actor:
A task sequence_number that sequentially increased based on submission order
A client_processed_up_to_ indicating client already receive the results of a certain task sequence number

Submitter side is responsible to increment the sequence number every time submitting a new tasks, including the retry task, but in the case where error_type is ACTOR_DIED or ACTOR_UNAVAILABLE, the sequence number is not incremented. This is the key that cause our stuck since as our request full into ACTOR_DIED so the sequence number is always kept as the same as retrying.

Final root cause analysis:

  1. Actor tasks submitter process_up_to_ seq_ no. 124, and submitted seq no. 125 and 126 to receiver
  2. Actor tasks receiver successfully execute 125 and 126, update next_seq_no_ to 127
  3. Receiver is not able to send the results back to submitter due to GRPC connection error, leading to ACTOR_DIED status on submitter side
  4. Because of ACTOR_DIED, submitter retry without increment the sequence_number
  5. 125, and 126 is send to receiver again with process_up_to_ seq_ no. 124
  6. Submitter expect receiver is already restarted due to fault tolerance, and the next_seq_no_ is reset to 0, so that once received the retry request next_seq_no_ is set to 124 + 1 = 125 and the task can be execute normally
  7. Instead, the actor still lives with next_seq_no_ 127. And after the transient network error, the retry task request is send to the same actor again
  8. Since 125, 126 both < 127, the receiver side canceled the task request.
  9. The cancellation of the tasks request from receiver is also treated as ACTOR_DIED status by the submitter. We go back to 4, and form an infinite while loop since Ray Data by default do infinite retry

Asked from users:

There are many things combine together that cause this tricky issue:

  • Ray Data default an inifinite retry, causing us not able to failed fast and instead hanging
  • We acknowledge that our hardware have network instability and we are actively looking at it. However in Pinterest we have seems 2+ times similar network temporary outage is causing ray core component instability, further more it is not only surfacing in actor task submission component but many other component like ray object store.
  • So we are look ways to let Ray survive this similar temporary outage in the future by:
    • Fix specific issue reporting like this one by one for each ray Core component. This will take times to debug and triage each issue one by one
    • More fault tolerance in Ray Core GRPC layer in general, to survive the network outage.

Versions / Dependencies

ray 2.10.0
python 3.8
ubuntu 20.04

Reproduction script

N/A

Issue Severity

None

@lee1258561 lee1258561 added bug Something that is supposed to be working; but isn't triage Needs triage (eg: priority, bug/not-bug, and owning component) labels Feb 21, 2025
@jcotant1 jcotant1 added the core Issues that should be addressed in Ray Core label Feb 23, 2025
@jjyao
Copy link
Collaborator

jjyao commented Feb 24, 2025

@lee1258561 are you able to reproduce this issue with latest Ray. 2.10.0 is pretty old and we have done things to improve network error handling.

@jjyao jjyao added P1 Issue that should be fixed within a few weeks and removed triage Needs triage (eg: priority, bug/not-bug, and owning component) labels Feb 24, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something that is supposed to be working; but isn't core Issues that should be addressed in Ray Core P1 Issue that should be fixed within a few weeks
Projects
None yet
Development

No branches or pull requests

3 participants