Skip to content

Conversation

@ZacAttack
Copy link
Contributor

@ZacAttack ZacAttack commented Nov 24, 2025

Description

This PR fixes scenarios where the internal counter for the ClusterSizeBasedLeaseRequestRateLimiter does not keep a correct count of the number of available nodes in the cluster.

How does this happen? This happens due to the behavior of the NodeAccessor::AsyncSubscribeToNodeAddressAndLivenessChange and ClusterSizeBasedLeaseRequestRateLimiter::OnNodeChange. AsyncSubscribeToNodeAddressAndLivenessChange on initial subscription will query the gcs twice. Once to set up the subscription, the other to bootstrap the initial state. When bootstrapping the initial state, a callback is triggered for every node in the response, which will contain all nodes currently alive in the cluster and a limited number of nodes which had died. Consumers of the api need to be defensive to this, as it implies that for a given subscription it is not guaranteed that for every dead node notification you receive a previous 'alive' node notification. An example scenario where not being defensive to this can cause bugs looks as follows

  1. Cluster has 3 nodes: node1 (alive), node2 (dead), node3 (alive)
  2. Client subscribes to node changes via AsyncSubscribeToNodeAddressAndLivenessChange
  3. gcs_client sets up the subscription to node changes
  4. GetAllNodeAddressAndLiveness returns all 3 nodes (including node2 which is dead)
  5. Subscription triggers OnNodeChanges for all 3 nodes
  6. OnNodeChanges(node2=DEAD) decrements counter even though we never saw node2 as ALIVE
  7. The counter will now say there is only 1 node in the cluster as opposed to the correct number which is 2.

There are a few ways to fix this problem with varying levels of tacticalness.

What's in this PR?

This PR fixes the issue by introducing a flag to the HandleNotification interface and callback functions. A boolean is returned back to the subscriber which specifies if the consumed event is from initialization or if it's from recent callbacks. In this case, a node death notification where the flag is set to true means that this is a historically dead node, and for at least the use case of the ClusterSizeBasedLeaseRequestRateLimiter, can be ignored. This should allow other code paths to know how to be defensive to this sort of thing.

Are there other ways we could fix this?

Yeah.

Delete the ClusterSizeBasedLeaseRequestRateLimiter

This was introduced in a change where prior task scheduling was throttled down to 10 max in flight leases at a time. However, it's not very clear from these changes where 10 came from, or why it was important to limit the lease requests. It seems like the cases where it would be useful to throttle the lease requests are:

  1. To encourage lease reuse: This makes some sense and is the most obvious use case from the code. Essentially it's based in the notion that lease acquisition is expensive, and if we have some queue linger time we can get reuse of a given lease on a worker. This isn't bad. However, it's based in the notion that lease acquisition is expensive relative to task execution. Lease acquisition in lab set up seems to only be a few 100ms to a given worker. Maybe this gets worse under some load scenarios, however, we have to stack this against likely length of task execution. The only scenarios where it makes sense to use Ray task submission is you have

    • So much work that you need to parallelize this work through a cluster to be able to do it in reasonable time
    • You have work which needs specialized hardware like a GPU, or some gang scheduled resources
    • Some work which required some state/data (think actor scheduling or some tree of dependent task input/outputs).

    All scenarios like the above imply an amount of work that is large, or, a kind of work that needs to be queued (against a specific actor for example). It would seem unlikely in any of these scenarios that task execution is materially impacted from the latency of acquiring a lease and therefore needs to be amortized across multiple tasks.

  2. To prevent submitter lease starvation: This is also compelling. However, MOST ray workloads use a single submitter. So this is perhaps remote.

  3. To prevent overloading the Raylet: This one makes the most sense to me. We today don't have a lot of defensibility in the Raylet. If the Raylet refused lease requests when it was too overloaded or if it reached a certain queue depth, then the submitter could spillback the task to another Raylet. Without this it makes sense to throttle down the submitter in order to make sure the job is able to complete.

I think we could pursue this as an option, but we need to lay more ground work, and probably put a microscope under the above assumptions.

Fix Ordering in the pubsub

If pubsub subcription worked in properly ordered stages, we could force users of the api to think in this paradigm and hopefully prevent this class of bug in the future. For example, if pubsub was based in 'bootstrapping' and 'streaming' steps and callbacks had to adhere to that paradigm then that might be easier. Initialization callbacks would get triggered strictly first and then streaming would come next. This is actually how one might expect to consume out of a compacted Kafka topic (as Kafka compacts topics based on keys in the beginning of the topic) or, how one would use a stateful flink job (as flink uses local storage to checkpoint state and that state can be consumed row by row on initialization).

Get rid of pubsub wholesale (at least for this use case)

One lingering thing about pubsub is that right now resubscription in pubsub today is pretty weak. Initial subscription will do the bootstrapping of initial state and stream in callbacks, but GCS restart and resubscription just links into the event stream and doesn't bother with any reinitialization protocol. I don't have concrete examples at this time, but this seems terribly fraught. For example, what if some nodes cycled out during the restart? How would that information get sent along?

A "potentially" better paradigm would be if clients instead of following an incremental event by event protocol instead dealt in a reinitialization kind of paradigm. That is, at regular intervals or at certain events, nodes had to reconstruct their view of the world. There wasn't a notion of constructing incremental state, only versions of the complete state. This is the pattern in a number of systems. It's not bullet proof, and it can have scaling implications that we would need to pay attention to, but it would hopefully give us better correctness guarantees in more situations.

I think this is another good idea, but requires larger surgury.

Related issues

Link related issues: "Fixes #1234", "Closes #1234", or "Related to #1234".

Additional information

Optional: Add implementation details, API changes, usage examples, screenshots, etc.

@ZacAttack ZacAttack added go add ONLY when ready to merge, run all tests core Issues that should be addressed in Ray Core labels Nov 24, 2025
@ZacAttack ZacAttack marked this pull request as ready for review November 25, 2025 00:56
@ZacAttack ZacAttack requested a review from a team as a code owner November 25, 2025 00:56
@edoakes
Copy link
Collaborator

edoakes commented Nov 25, 2025

Thanks for the writeup. A few questions/comments:

  1. Is it strictly guaranteed that we didn't get an alive notification prior to the initialization call? This seems likely to be the case, but just want to be sure that there isn't any corner case that might break the assumption. If we serialize the initialization call with subsequent polling calls on the client, which I think we do, then that should be sufficient.
  2. We should lightly audit other paths that depend on this pubsub channel to check that we don't have other similar lingering bugs.
  3. On lease reuse -- another reason to prefer re-using leases vs. is to reduce overhead on the submitter. Even if each task duration is fairly long, if you are submitting a high volume of them (e.g., a single submitter trying to saturate a 10k node cluster), the constant stream of scheduling RPCs will be substantial.

For example, what if some nodes cycled out during the restart? How would that information get sent along?

Not invalidating your point in general about resubscription, but this specific case is not a concern because the GCS is the source of truth for nodes getting added to/removed from the cluster (logically). If a node is turned off while the GCS is gone, it will restart, attempt to health check it, mark it dead, and broadcast that information.

Change internal_num_spilled_tasks to a counter from a Gauge: this number never goes down, and it's more useful to be able to render this a rate in grafana. So changing the type of this metric from a Gauge to a counter makes more sense.

Let's split this out from this PR because it is a functional change.

@ZacAttack
Copy link
Contributor Author

  1. Is it strictly guaranteed that we didn't get an alive notification prior to the initialization call? This seems likely to be the case, but just want to be sure that there isn't any corner case that might break the assumption. If we serialize the initialization call with subsequent polling calls on the client, which I think we do, then that should be sufficient.

Hrm... This is a good one actually and I think we are vulnerable to an issue now that you mention it. The pubsub has a cache internally that only triggers callbacks on what it considers "new" information. So if for example you did the initialization call which had a node as dead initially, you won't trigger a callback for an 'alive' notification which came as an update on the poll (this is because nodes cannot become alive after being marked dead). If you come at it from the reverse (an alive node on the poll and a dead node on the initialization) then this would break as the dead node update would update the cache and have the is_initializing flag to true (and our counter would filter it). Then later when you got the polling dead node notification, the pubsub cache would filter it out as it's not a new update (because we've cached the dead node notification).

So I guess we still have a bug... A few 'hacky' fixes come to mind, but I think they muddle the semantics. Maybe we should rely on the raylet client pool size instead of a counter...

  1. We should lightly audit other paths that depend on this pubsub channel to check that we don't have other similar lingering bugs.

I did a look through. I didn't see anything too obvious....

  1. On lease reuse -- another reason to prefer re-using leases vs. is to reduce overhead on the submitter. Even if each task duration is fairly long, if you are submitting a high volume of them (e.g., a single submitter trying to saturate a 10k node cluster), the constant stream of scheduling RPCs will be substantial.

Yeah I guess that makes sense. But if submitter resource consumption is (or becomes) our primary concern with lease acquisition, then I think I'd rather implement a throttler around that resource constraint (as opposed to the constraint being 1 lease request per raylet).

Change internal_num_spilled_tasks to a counter from a Gauge: this number never goes down, and it's more useful to be able to render this a rate in grafana. So changing the type of this metric from a Gauge to a counter makes more sense.

Let's split this out from this PR because it is a functional change.

Will do.

@edoakes
Copy link
Collaborator

edoakes commented Nov 25, 2025

One possibility is to add a pub sub channel just for cluster size... feels like way overkill for this use case though

@jjyao
Copy link
Collaborator

jjyao commented Dec 3, 2025

Fix Ordering in the pubsub

I think we should do this regardless: all notifications from pubsub should be delayed until bootstrap is done.

@edoakes
Copy link
Collaborator

edoakes commented Dec 6, 2025

@dayshah PTAL

Copy link
Contributor

@dayshah dayshah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the super late review, didn't see the ping.

I think the most readable solution here is just to have separate alive and dead node maps in the accessor (like there are in the gcs_node_manager). And then the accessor just has a GetNumAliveNodes function that the lease limiter calls every time it gets an update and you can leave a comment there for that.

Having a separate alive map would also be helpful for this case

const auto &node_info_map = gcs_client_.Nodes().GetAllNodeAddressAndLiveness();
because right now, it'll copy out all alive and dead node info but we only want to copy out alive node info.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Issues that should be addressed in Ray Core go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Ray fails to serialize self-reference objects

5 participants