-
Notifications
You must be signed in to change notification settings - Fork 7.2k
[3/n] [Serve] Defer rank assignment after replica is allocated #58477
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Signed-off-by: abrar <[email protected]>
Signed-off-by: abrar <[email protected]>
Signed-off-by: abrar <[email protected]>
Signed-off-by: abrar <[email protected]>
Signed-off-by: abrar <[email protected]>
Signed-off-by: abrar <[email protected]>
Signed-off-by: abrar <[email protected]>
Signed-off-by: abrar <[email protected]>
Signed-off-by: abrar <[email protected]>
…58473) ### Summary This PR refactors the replica rank system to support multi-dimensional ranking (global, node-level, and local ranks) in preparation for node-local rank tracking. The `ReplicaRank` object now contains three fields instead of being a simple integer, enabling better coordination of replicas across nodes. ### Motivation Currently, Ray Serve only tracks a single global rank per replica. For advanced use cases like tensor parallelism, model sharding across nodes, and node-aware coordination, we need to track: - **Global rank**: Replica's rank across all nodes (0 to N-1) - **Node rank**: Which node the replica is on (0 to M-1) - **Local rank**: Replica's rank on its specific node (0 to K-1) This PR lays the groundwork by introducing the expanded `ReplicaRank` schema while maintaining backward compatibility in feature. ### Changes #### Core Implementation - **`schema.py`**: Extended `ReplicaRank` to include `node_rank` and `local_rank` fields (currently set to -1 as placeholders) - **`replica.py`**: Updated replica actors to handle `ReplicaRank` objects - **`context.py`**: Changed `ReplicaContext.rank` type from `Optional[int]` to `ReplicaRank` ### Current Behavior - `node_rank` and `local_rank` are set to `-1` (placeholder values). Will change in future - Global rank assignment and management works as before - All existing functionality is preserved ### Breaking Changes Rank is changing from `int` to `ReplicaRank` Next PR #58477 --------- Signed-off-by: abrar <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Recovery Breaks Deferred Rank Assignment
During replica recovery after controller restart, initialize_and_get_metadata.remote() is called without passing a rank parameter (line 671), but the method signature now requires it. The _assign_rank_callback is never set during recovery since start() isn't called, and the rank assignment logic in check_ready() (lines 743-746) is skipped because _ready_obj_ref is already set by recover(). This causes recovered replicas to initialize without a rank, breaking the deferred rank assignment feature.
python/ray/serve/_private/deployment_state.py#L627-L673
ray/python/ray/serve/_private/deployment_state.py
Lines 627 to 673 in d7131f0
| def recover(self) -> bool: | |
| """Recover replica version from a live replica actor. | |
| When controller dies, the deployment state loses the info on the version that's | |
| running on each individual replica actor, so as part of the recovery process, we | |
| need to recover the version that is running on the replica actor. | |
| Also confirm that actor is allocated and initialized before marking as running. | |
| Returns: False if the replica actor is no longer alive; the | |
| actor could have been killed in the time between when the | |
| controller fetching all Serve actors in the cluster and when | |
| the controller tries to recover it. Otherwise, return True. | |
| """ | |
| logger.info(f"Recovering {self.replica_id}.") | |
| try: | |
| self._actor_handle = ray.get_actor( | |
| self._actor_name, namespace=SERVE_NAMESPACE | |
| ) | |
| except ValueError: | |
| logger.warning( | |
| f"Failed to get handle to replica {self._actor_name} " | |
| "during controller recovery. Marking as dead." | |
| ) | |
| return False | |
| try: | |
| self._placement_group = ray.util.get_placement_group( | |
| self._actor_name, | |
| ) | |
| except ValueError: | |
| # ValueError is raised if the placement group does not exist. | |
| self._placement_group = None | |
| # Re-fetch initialization proof | |
| self._allocated_obj_ref = self._actor_handle.is_allocated.remote() | |
| # Running actor handle already has all info needed, thus successful | |
| # starting simply means retrieving replica version hash from actor | |
| if self._is_cross_language: | |
| self._ready_obj_ref = self._actor_handle.check_health.remote() | |
| else: | |
| self._ready_obj_ref = ( | |
| self._actor_handle.initialize_and_get_metadata.remote() | |
| ) | |
No true, rank is not required.
this is expect, because we want to fetch rank from already running replica instead to assigning it a rank during recovery. |
Signed-off-by: abrar <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Broken Rank Assignment in Replica Recovery
During replica recovery after controller restart, initialize_and_get_metadata.remote() is called without passing a rank parameter, but the method signature now requires it. The _assign_rank_callback is never set during recovery since start() isn't called, and the rank assignment logic in check_ready() is skipped because _ready_obj_ref is already set by recover(). This causes recovered replicas to initialize without a rank, breaking the deferred rank assignment system.
python/ray/serve/_private/deployment_state.py#L669-L672
ray/python/ray/serve/_private/deployment_state.py
Lines 669 to 672 in 29ce266
| else: | |
| self._ready_obj_ref = ( | |
| self._actor_handle.initialize_and_get_metadata.remote() | |
| ) |
…ay-project#58473) ### Summary This PR refactors the replica rank system to support multi-dimensional ranking (global, node-level, and local ranks) in preparation for node-local rank tracking. The `ReplicaRank` object now contains three fields instead of being a simple integer, enabling better coordination of replicas across nodes. ### Motivation Currently, Ray Serve only tracks a single global rank per replica. For advanced use cases like tensor parallelism, model sharding across nodes, and node-aware coordination, we need to track: - **Global rank**: Replica's rank across all nodes (0 to N-1) - **Node rank**: Which node the replica is on (0 to M-1) - **Local rank**: Replica's rank on its specific node (0 to K-1) This PR lays the groundwork by introducing the expanded `ReplicaRank` schema while maintaining backward compatibility in feature. ### Changes #### Core Implementation - **`schema.py`**: Extended `ReplicaRank` to include `node_rank` and `local_rank` fields (currently set to -1 as placeholders) - **`replica.py`**: Updated replica actors to handle `ReplicaRank` objects - **`context.py`**: Changed `ReplicaContext.rank` type from `Optional[int]` to `ReplicaRank` ### Current Behavior - `node_rank` and `local_rank` are set to `-1` (placeholder values). Will change in future - Global rank assignment and management works as before - All existing functionality is preserved ### Breaking Changes Rank is changing from `int` to `ReplicaRank` Next PR ray-project#58477 --------- Signed-off-by: abrar <[email protected]>
zcin
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the impact of this change on replica startup time?
I think this adds |
|
Can we test / measure e2e |
test applicationprofile codediff --git a/python/ray/serve/_private/deployment_state.py b/python/ray/serve/_private/deployment_state.py
index 4b96833aff..829e0a84f0 100644
--- a/python/ray/serve/_private/deployment_state.py
+++ b/python/ray/serve/_private/deployment_state.py
@@ -283,6 +283,7 @@ class ActorReplicaWrapper:
# Outbound deployments polling state
self._outbound_deployments: Optional[List[DeploymentID]] = None
+ self._replica_start_ts: float = 0.0
@property
def replica_id(self) -> str:
@@ -460,7 +461,7 @@ class ActorReplicaWrapper:
self._deployment_is_cross_language = (
deployment_info.deployment_config.is_cross_language
)
-
+ self._replica_start_ts = time.time()
logger.info(
f"Starting {self.replica_id}.",
extra={"log_to_stderr": False},
@@ -790,6 +791,9 @@ class ActorReplicaWrapper:
)
return ReplicaStartupStatus.FAILED, repr(e)
+ time_taken = time.time() - self._replica_start_ts
+ logger.info(f"Replica {self._replica_id} started in {time_taken:.2f}s.")
+
return ReplicaStartupStatus.SUCCEEDED, None
@propertyFrom this PR❯ RAY_SERVE_CONTROL_LOOP_INTERVAL_S=2 serve run raw_config.yaml
(ServeController pid=3251490) INFO 2025-11-19 00:02:14,908 controller 3251490 -- Replica Replica(id='ykzh0lp3', deployment='d_1', app='default') started in 4.08s.
(ServeController pid=3251490) INFO 2025-11-19 00:02:14,908 controller 3251490 -- Replica Replica(id='wj1jwbst', deployment='d_1', app='default') started in 4.08s.
(ServeController pid=3251490) INFO 2025-11-19 00:02:14,909 controller 3251490 -- Replica Replica(id='cob2l0ts', deployment='d_1', app='default') started in 4.08s.
(ServeController pid=3251490) INFO 2025-11-19 00:02:14,909 controller 3251490 -- Replica Replica(id='luh8ykjt', deployment='d_1', app='default') started in 4.08s.
(ServeController pid=3251490) INFO 2025-11-19 00:02:14,909 controller 3251490 -- Replica Replica(id='8vsm1wg7', deployment='d_2', app='default') started in 4.08s.
(ServeController pid=3251490) INFO 2025-11-19 00:02:14,909 controller 3251490 -- Replica Replica(id='ax8lcckm', deployment='d_2', app='default') started in 4.08s.
(ServeController pid=3251490) INFO 2025-11-19 00:02:14,910 controller 3251490 -- Replica Replica(id='rdhkqk0q', deployment='d_2', app='default') started in 4.08s.
(ServeController pid=3251490) INFO 2025-11-19 00:02:14,910 controller 3251490 -- Replica Replica(id='888udqd7', deployment='d_2', app='default') started in 4.08s.
(ServeController pid=3251490) INFO 2025-11-19 00:02:14,910 controller 3251490 -- Replica Replica(id='15e088r3', deployment='DITest', app='default') started in 4.08s.
(ServeController pid=3251490) INFO 2025-11-19 00:02:14,911 controller 3251490 -- Replica Replica(id='4r4b7on4', deployment='DITest', app='default') started in 4.08s.
(ServeController pid=3251490) INFO 2025-11-19 00:02:14,911 controller 3251490 -- Replica Replica(id='qq5kzseb', deployment='DITest', app='default') started in 4.08s.
(ServeController pid=3251490) INFO 2025-11-19 00:02:14,911 controller 3251490 -- Replica Replica(id='iffoha2e', deployment='DITest', app='default') started in 4.08s.From master |
|
Hmm that's a pretty significant increase. Is there a way to avoid it? |
|
+2 additional seconds to start the replica is because I set The other option I can think of to start the replica in the same controller iteration is to use |
|
without the constant |
…ay-project#58473) ### Summary This PR refactors the replica rank system to support multi-dimensional ranking (global, node-level, and local ranks) in preparation for node-local rank tracking. The `ReplicaRank` object now contains three fields instead of being a simple integer, enabling better coordination of replicas across nodes. ### Motivation Currently, Ray Serve only tracks a single global rank per replica. For advanced use cases like tensor parallelism, model sharding across nodes, and node-aware coordination, we need to track: - **Global rank**: Replica's rank across all nodes (0 to N-1) - **Node rank**: Which node the replica is on (0 to M-1) - **Local rank**: Replica's rank on its specific node (0 to K-1) This PR lays the groundwork by introducing the expanded `ReplicaRank` schema while maintaining backward compatibility in feature. ### Changes #### Core Implementation - **`schema.py`**: Extended `ReplicaRank` to include `node_rank` and `local_rank` fields (currently set to -1 as placeholders) - **`replica.py`**: Updated replica actors to handle `ReplicaRank` objects - **`context.py`**: Changed `ReplicaContext.rank` type from `Optional[int]` to `ReplicaRank` ### Current Behavior - `node_rank` and `local_rank` are set to `-1` (placeholder values). Will change in future - Global rank assignment and management works as before - All existing functionality is preserved ### Breaking Changes Rank is changing from `int` to `ReplicaRank` Next PR ray-project#58477 --------- Signed-off-by: abrar <[email protected]> Signed-off-by: Aydin Abiar <[email protected]>
…roject#58477) **Summary** Modified replica rank assignment to defer rank allocation until the replica is actually allocated, rather than assigning it during the startup call. This is necessary when we want to add node local rank in future, in order to support node rank and node local rank we need to know the node_id which is only known after replica is allocated. **Changes** - Changed `start()` method signature to accept `assign_rank_callback` instead of a pre-assigned `rank` parameter - Rank is now assigned after `_allocated_obj_ref` is resolved, ensuring the replica is allocated before rank assignment - Pass rank to `initialize_and_get_metadata()` method on the replica actor, allowing rank to be set during initialization - Updated `ReplicaBase.initialize()` to accept rank as a parameter and set it along with the internal replica context - Added `PENDING_INITIALIZATION` status check to handle cases where `_ready_obj_ref` is not yet set Next PR ray-project#58479 --------- Signed-off-by: abrar <[email protected]> Signed-off-by: Aydin Abiar <[email protected]>
…roject#58477) **Summary** Modified replica rank assignment to defer rank allocation until the replica is actually allocated, rather than assigning it during the startup call. This is necessary when we want to add node local rank in future, in order to support node rank and node local rank we need to know the node_id which is only known after replica is allocated. **Changes** - Changed `start()` method signature to accept `assign_rank_callback` instead of a pre-assigned `rank` parameter - Rank is now assigned after `_allocated_obj_ref` is resolved, ensuring the replica is allocated before rank assignment - Pass rank to `initialize_and_get_metadata()` method on the replica actor, allowing rank to be set during initialization - Updated `ReplicaBase.initialize()` to accept rank as a parameter and set it along with the internal replica context - Added `PENDING_INITIALIZATION` status check to handle cases where `_ready_obj_ref` is not yet set Next PR ray-project#58479 --------- Signed-off-by: abrar <[email protected]>
…ay-project#58473) ### Summary This PR refactors the replica rank system to support multi-dimensional ranking (global, node-level, and local ranks) in preparation for node-local rank tracking. The `ReplicaRank` object now contains three fields instead of being a simple integer, enabling better coordination of replicas across nodes. ### Motivation Currently, Ray Serve only tracks a single global rank per replica. For advanced use cases like tensor parallelism, model sharding across nodes, and node-aware coordination, we need to track: - **Global rank**: Replica's rank across all nodes (0 to N-1) - **Node rank**: Which node the replica is on (0 to M-1) - **Local rank**: Replica's rank on its specific node (0 to K-1) This PR lays the groundwork by introducing the expanded `ReplicaRank` schema while maintaining backward compatibility in feature. ### Changes #### Core Implementation - **`schema.py`**: Extended `ReplicaRank` to include `node_rank` and `local_rank` fields (currently set to -1 as placeholders) - **`replica.py`**: Updated replica actors to handle `ReplicaRank` objects - **`context.py`**: Changed `ReplicaContext.rank` type from `Optional[int]` to `ReplicaRank` ### Current Behavior - `node_rank` and `local_rank` are set to `-1` (placeholder values). Will change in future - Global rank assignment and management works as before - All existing functionality is preserved ### Breaking Changes Rank is changing from `int` to `ReplicaRank` Next PR ray-project#58477 --------- Signed-off-by: abrar <[email protected]> Signed-off-by: YK <[email protected]>
…roject#58477) **Summary** Modified replica rank assignment to defer rank allocation until the replica is actually allocated, rather than assigning it during the startup call. This is necessary when we want to add node local rank in future, in order to support node rank and node local rank we need to know the node_id which is only known after replica is allocated. **Changes** - Changed `start()` method signature to accept `assign_rank_callback` instead of a pre-assigned `rank` parameter - Rank is now assigned after `_allocated_obj_ref` is resolved, ensuring the replica is allocated before rank assignment - Pass rank to `initialize_and_get_metadata()` method on the replica actor, allowing rank to be set during initialization - Updated `ReplicaBase.initialize()` to accept rank as a parameter and set it along with the internal replica context - Added `PENDING_INITIALIZATION` status check to handle cases where `_ready_obj_ref` is not yet set Next PR ray-project#58479 --------- Signed-off-by: abrar <[email protected]> Signed-off-by: YK <[email protected]>
…ay-project#58473) ### Summary This PR refactors the replica rank system to support multi-dimensional ranking (global, node-level, and local ranks) in preparation for node-local rank tracking. The `ReplicaRank` object now contains three fields instead of being a simple integer, enabling better coordination of replicas across nodes. ### Motivation Currently, Ray Serve only tracks a single global rank per replica. For advanced use cases like tensor parallelism, model sharding across nodes, and node-aware coordination, we need to track: - **Global rank**: Replica's rank across all nodes (0 to N-1) - **Node rank**: Which node the replica is on (0 to M-1) - **Local rank**: Replica's rank on its specific node (0 to K-1) This PR lays the groundwork by introducing the expanded `ReplicaRank` schema while maintaining backward compatibility in feature. ### Changes #### Core Implementation - **`schema.py`**: Extended `ReplicaRank` to include `node_rank` and `local_rank` fields (currently set to -1 as placeholders) - **`replica.py`**: Updated replica actors to handle `ReplicaRank` objects - **`context.py`**: Changed `ReplicaContext.rank` type from `Optional[int]` to `ReplicaRank` ### Current Behavior - `node_rank` and `local_rank` are set to `-1` (placeholder values). Will change in future - Global rank assignment and management works as before - All existing functionality is preserved ### Breaking Changes Rank is changing from `int` to `ReplicaRank` Next PR ray-project#58477 --------- Signed-off-by: abrar <[email protected]>
…roject#58477) **Summary** Modified replica rank assignment to defer rank allocation until the replica is actually allocated, rather than assigning it during the startup call. This is necessary when we want to add node local rank in future, in order to support node rank and node local rank we need to know the node_id which is only known after replica is allocated. **Changes** - Changed `start()` method signature to accept `assign_rank_callback` instead of a pre-assigned `rank` parameter - Rank is now assigned after `_allocated_obj_ref` is resolved, ensuring the replica is allocated before rank assignment - Pass rank to `initialize_and_get_metadata()` method on the replica actor, allowing rank to be set during initialization - Updated `ReplicaBase.initialize()` to accept rank as a parameter and set it along with the internal replica context - Added `PENDING_INITIALIZATION` status check to handle cases where `_ready_obj_ref` is not yet set Next PR ray-project#58479 --------- Signed-off-by: abrar <[email protected]>
…ay-project#58473) ### Summary This PR refactors the replica rank system to support multi-dimensional ranking (global, node-level, and local ranks) in preparation for node-local rank tracking. The `ReplicaRank` object now contains three fields instead of being a simple integer, enabling better coordination of replicas across nodes. ### Motivation Currently, Ray Serve only tracks a single global rank per replica. For advanced use cases like tensor parallelism, model sharding across nodes, and node-aware coordination, we need to track: - **Global rank**: Replica's rank across all nodes (0 to N-1) - **Node rank**: Which node the replica is on (0 to M-1) - **Local rank**: Replica's rank on its specific node (0 to K-1) This PR lays the groundwork by introducing the expanded `ReplicaRank` schema while maintaining backward compatibility in feature. ### Changes #### Core Implementation - **`schema.py`**: Extended `ReplicaRank` to include `node_rank` and `local_rank` fields (currently set to -1 as placeholders) - **`replica.py`**: Updated replica actors to handle `ReplicaRank` objects - **`context.py`**: Changed `ReplicaContext.rank` type from `Optional[int]` to `ReplicaRank` ### Current Behavior - `node_rank` and `local_rank` are set to `-1` (placeholder values). Will change in future - Global rank assignment and management works as before - All existing functionality is preserved ### Breaking Changes Rank is changing from `int` to `ReplicaRank` Next PR ray-project#58477 --------- Signed-off-by: abrar <[email protected]> Signed-off-by: Future-Outlier <[email protected]>
Summary
Modified replica rank assignment to defer rank allocation until the replica is actually allocated, rather than assigning it during the startup call. This is necessary when we want to add node local rank in future, in order to support node rank and node local rank we need to know the node_id which is only known after replica is allocated.
Changes
start()method signature to acceptassign_rank_callbackinstead of a pre-assignedrankparameter_allocated_obj_refis resolved, ensuring the replica is allocated before rank assignmentinitialize_and_get_metadata()method on the replica actor, allowing rank to be set during initializationReplicaBase.initialize()to accept rank as a parameter and set it along with the internal replica contextPENDING_INITIALIZATIONstatus check to handle cases where_ready_obj_refis not yet setNext PR #58479