From 8e8f393618eb83fe9fdbdf389221465d4c32c7ef Mon Sep 17 00:00:00 2001 From: abrar Date: Sat, 8 Nov 2025 02:18:48 +0000 Subject: [PATCH 01/11] Refactor replica rank to prepare for node local ranks Signed-off-by: abrar --- python/ray/serve/_private/constants.py | 4 - python/ray/serve/_private/deployment_state.py | 416 ++++++++++-------- python/ray/serve/schema.py | 9 + python/ray/serve/tests/BUILD.bazel | 3 - python/ray/serve/tests/unit/BUILD.bazel | 4 - .../unit/test_deployment_rank_manager.py | 221 ++++------ 6 files changed, 347 insertions(+), 310 deletions(-) diff --git a/python/ray/serve/_private/constants.py b/python/ray/serve/_private/constants.py index 4035e04d36fc..59928e6bee8c 100644 --- a/python/ray/serve/_private/constants.py +++ b/python/ray/serve/_private/constants.py @@ -490,10 +490,6 @@ "RAY_SERVE_REQUEST_PATH_LOG_BUFFER_SIZE", 1 ) -# Feature flag to fail the deployment if the rank is not set. -# TODO (abrar): Remove this flag after the feature is stable. -RAY_SERVE_FAIL_ON_RANK_ERROR = get_env_bool("RAY_SERVE_FAIL_ON_RANK_ERROR", "0") - # The message to return when the replica is healthy. HEALTHY_MESSAGE = "success" diff --git a/python/ray/serve/_private/deployment_state.py b/python/ray/serve/_private/deployment_state.py index 7c5e27e4e164..aa902230832f 100644 --- a/python/ray/serve/_private/deployment_state.py +++ b/python/ray/serve/_private/deployment_state.py @@ -38,7 +38,6 @@ MAX_DEPLOYMENT_CONSTRUCTOR_RETRY_COUNT, MAX_PER_REPLICA_RETRY_COUNT, RAY_SERVE_ENABLE_TASK_EVENTS, - RAY_SERVE_FAIL_ON_RANK_ERROR, RAY_SERVE_FORCE_STOP_UNHEALTHY_REPLICAS, RAY_SERVE_USE_COMPACT_SCHEDULING_STRATEGY, REPLICA_HEALTH_CHECK_UNHEALTHY_THRESHOLD, @@ -70,6 +69,7 @@ from ray.serve.schema import ( DeploymentDetails, ReplicaDetails, + ReplicaRank, _deployment_info_to_schema, ) from ray.util.placement_group import PlacementGroup @@ -1458,248 +1458,182 @@ def __repr__(self): return repr(self._replicas) -class DeploymentRankManager: - """Manages replica ranks for a deployment. - This class handles rank assignment, release, consistency checking, and reassignment. - It maintains the rank system invariants and provides a clean interface for rank operations. - """ +class RankManager: + """Manages ranks for a single node.""" - def __init__(self, _fail_on_error: Optional[bool] = None): - # Maps replica_id to assigned rank - self._replica_ranks: Dict[str, int] = {} - # Set of available ranks (initially empty, grows as target replicas change) + def __init__(self): + self._ranks: Dict[str, int] = {} self._released_ranks: Set[int] = set() - # Next rank to assign (increments as new replicas are created) self._next_rank: int = 0 - # Whether to fail on rank errors (for testing control) - self._fail_on_error = ( - _fail_on_error - if _fail_on_error is not None - else RAY_SERVE_FAIL_ON_RANK_ERROR - ) - def assign_rank(self, replica_id: str) -> int: - """Assign a rank to a new replica. - Args: - replica_id: The unique ID of the replica - Returns: - The assigned rank - Raises: - RuntimeError: If the replica already has a rank assigned - """ - if replica_id in self._replica_ranks: - raise RuntimeError( - f"Replica {replica_id} already has a rank assigned: {self._replica_ranks[replica_id]}" - ) + def assign_rank(self, key: str) -> int: + if key in self._ranks: + raise RuntimeError(f"Rank for {key} already assigned: {self._ranks[key]}") - # First try to reuse an available rank if self._released_ranks: rank = min(self._released_ranks) self._released_ranks.remove(rank) else: - # Otherwise use the next available rank rank = self._next_rank self._next_rank += 1 - self._replica_ranks[replica_id] = rank + self._ranks[key] = rank return rank - def release_rank(self, replica_id: str) -> None: - """Release a rank when a replica is stopped. - Args: - replica_id: The unique ID of the replica whose rank should be released - """ - if replica_id not in self._replica_ranks: - raise RuntimeError(f"Replica {replica_id} has no rank assigned") - - rank = self._replica_ranks.pop(replica_id) + def release_rank(self, key: str) -> None: + if key not in self._ranks: + raise RuntimeError(f"Rank for {key} not assigned") + rank = self._ranks.pop(key) self._released_ranks.add(rank) - def recover_rank(self, replica_id: str, rank: int) -> None: - """Recover a rank from a live replica during controller restart. - Args: - replica_id: The unique ID of the replica - rank: The rank to recover - Raises: - RuntimeError: If the replica already has a rank or the rank is invalid - ValueError: If the rank is invalid (negative) - """ - if replica_id in self._replica_ranks: - raise RuntimeError(f"Replica {replica_id} already has a rank assigned") - - self._replica_ranks[replica_id] = rank - - # Update available ranks tracking - if rank in self._released_ranks: - self._released_ranks.remove(rank) - - # Update next_rank to ensure we don't assign duplicates + def recover_rank(self, key: str, rank: int) -> None: + if key in self._ranks: + raise RuntimeError(f"Rank for {key} already assigned: {self._ranks[key]}") + self._ranks[key] = rank + self._released_ranks.discard(rank) if rank >= self._next_rank: self._next_rank = rank + 1 - def get_replica_rank(self, replica_id: str) -> Optional[int]: - """Get the rank assigned to a replica. - Args: - replica_id: The unique ID of the replica - Returns: - The assigned rank, or None if no rank is assigned - """ - if replica_id not in self._replica_ranks: - raise RuntimeError(f"Replica {replica_id} has no rank assigned") - return self._replica_ranks.get(replica_id) + def get_rank(self, key: str) -> int: + if key not in self._ranks: + raise RuntimeError(f"Rank for {key} not assigned") + return self._ranks[key] - def get_replica_ranks_mapping(self) -> Dict[str, int]: - """Get a copy of the current replica ranks mapping. - Returns: - A copy of the replica_id to rank mapping - """ - return self._replica_ranks.copy() + def has_rank(self, key: str) -> bool: + return key in self._ranks + + def get_ranks_mapping(self) -> Dict[str, int]: + return self._ranks.copy() + + def clear(self) -> None: + self._ranks.clear() + self._released_ranks.clear() + self._next_rank = 0 def check_rank_consistency_and_reassign_minimally( self, - active_replicas: List["DeploymentReplica"], - ) -> List["DeploymentReplica"]: + active_keys: List[str], + ) -> List[str]: """Verify rank system invariants and reassign ranks when needed. + This method ensures: - 1. All active replicas have ranks + 1. All active keys have ranks 2. No duplicate ranks exist - 3. Ranks are contiguous when at target replica count + 3. Ranks are contiguous when at target count + Args: - active_replicas: List of currently active replicas + active_keys: List of currently active keys + Returns: - List of replicas that need to be reconfigured with new ranks + List of keys that need to be reconfigured with new ranks + Raises: - RuntimeError: If rank system invariants are violated + RuntimeError: If rank system invariants are violated and fail_on_error=True """ - if not active_replicas: + if not active_keys: return [] - active_replica_ids = { - replica.replica_id.unique_id for replica in active_replicas - } - replica_ids_needs_reconfiguration = set() + active_keys_set = set(active_keys) + keys_needs_reconfiguration = set() # Check for stale ranks - this should never happen - stale_replica_ids = set(self._replica_ranks.keys()) - active_replica_ids - if stale_replica_ids: + stale_keys = set(self._ranks.keys()) - active_keys_set + if stale_keys: logger.error( - f"Found stale ranks for replicas: {stale_replica_ids}. " + f"Found stale ranks for keys: {stale_keys}. " "This should never happen. Please report this as a bug." ) - if self._fail_on_error: - raise RuntimeError("Controller rank system is in an invalid state.") - # TODO (abrar): handle this case by removing the stale ranks, but remove this when - # RAY_SERVE_FAIL_ON_RANK_ERROR is set to 1 in the future - for replica_id in stale_replica_ids: - self.release_rank(replica_id) - replica_ids_needs_reconfiguration.add(replica_id) - - # Verify system invariants - all active replicas must have ranks - unranked_replica_ids = active_replica_ids - set(self._replica_ranks.keys()) - if unranked_replica_ids: + raise RuntimeError("Rank system is in an invalid state.") + + # Verify system invariants - all active keys must have ranks + unranked_keys = active_keys_set - set(self._ranks.keys()) + if unranked_keys: logger.error( - f"Found active replicas without ranks: {unranked_replica_ids}. " + f"Found active keys without ranks: {unranked_keys}. " "This should never happen. Please report this as a bug." ) - if self._fail_on_error: - raise RuntimeError("Controller rank system is in an invalid state.") - # TODO (abrar): handle this case by assigning new ranks to the unranked replicas - # but remove this when RAY_SERVE_FAIL_ON_RANK_ERROR is set to 1 in the future - for replica_id in unranked_replica_ids: - self.assign_rank(replica_id) - replica_ids_needs_reconfiguration.add(replica_id) + raise RuntimeError("Rank system is in an invalid state.") # Check for duplicate ranks - this should never happen rank_counts = {} - for replica_id, rank in self._replica_ranks.copy().items(): - if replica_id in active_replica_ids: # Only check active replicas + for key, rank in self._ranks.copy().items(): + if key in active_keys_set: # Only check active keys rank_counts[rank] = rank_counts.get(rank, 0) + 1 if rank_counts[rank] > 1: logger.error( - f"Found duplicate rank {rank} assigned to multiple replicas. " + f"Found duplicate rank {rank} assigned to multiple keys. " "This should never happen. Please report this as a bug." ) - if self._fail_on_error: - raise RuntimeError( - "Controller rank system is in an invalid state." - ) - # TODO (abrar): handle this case by releasing the rank of the replica with the duplicate rank - # and assigning a new rank to the replica with the duplicate rank - # but remove this when RAY_SERVE_FAIL_ON_RANK_ERROR is set to 1 in the future - self._replica_ranks.pop(replica_id) - self.assign_rank(replica_id) - replica_ids_needs_reconfiguration.add(replica_id) + raise RuntimeError("Rank system is in an invalid state.") # Check if we need to reassign ranks for contiguity - # Only force contiguity when at target replica count (e.g., after autoscaling down) - current_ranks = sorted(self._replica_ranks.values()) - expected_ranks = list(range(len(active_replicas))) + # Only force contiguity when at target count (e.g., after autoscaling down) + current_ranks = sorted(self._ranks.values()) + expected_ranks = list(range(len(active_keys))) - replicas_needing_reconfiguration = [] + keys_needing_reconfiguration_from_reassignment = [] if current_ranks != expected_ranks: logger.debug( - f"Deployment at target replica count but ranks are not contiguous. " + f"At target count but ranks are not contiguous. " f"Current: {current_ranks}, Expected: {expected_ranks}. " "Performing minimal reassignment." ) - replicas_needing_reconfiguration.extend( - self._perform_minimal_rank_reassignment(active_replicas) + keys_needing_reconfiguration_from_reassignment = ( + self._perform_minimal_rank_reassignment(active_keys) ) - # TODO (abrar): remove this when RAY_SERVE_FAIL_ON_RANK_ERROR is set to 1 in the future - for replica in active_replicas: - if replica.replica_id.unique_id in replica_ids_needs_reconfiguration: - replicas_needing_reconfiguration.append(replica) + # Combine all keys that need reconfiguration + all_keys_needing_reconfiguration = list(keys_needs_reconfiguration) + all_keys_needing_reconfiguration.extend( + keys_needing_reconfiguration_from_reassignment + ) - return replicas_needing_reconfiguration + return all_keys_needing_reconfiguration - def _perform_minimal_rank_reassignment( - self, active_replicas: List["DeploymentReplica"] - ) -> List["DeploymentReplica"]: + def _perform_minimal_rank_reassignment(self, active_keys: List[str]) -> List[str]: """Perform minimal rank reassignment to achieve contiguity. - This method reassigns ranks while minimizing the number of replicas that need + + This method reassigns ranks while minimizing the number of keys that need to be reconfigured. It prioritizes keeping existing ranks when possible. + Args: - active_replicas: List of currently active replicas + active_keys: List of currently active keys + Returns: - List of replicas that need to be reconfigured with new ranks + List of keys that need to be reconfigured with new ranks """ - target_ranks_set = set(range(len(active_replicas))) + target_ranks_set = set(range(len(active_keys))) - # Find which replicas need new ranks - replicas_needing_ranks = [] - replicas_keeping_ranks = [] + # Find which keys need new ranks + keys_needing_ranks = [] + keys_keeping_ranks = [] - for replica in active_replicas: - replica_id = replica.replica_id.unique_id - current_rank = self.get_replica_rank(replica_id) + for key in active_keys: + current_rank = self.get_rank(key) if current_rank in target_ranks_set: - # This replica can keep its rank + # This key can keep its rank target_ranks_set.remove(current_rank) # O(1) operation - replicas_keeping_ranks.append(replica) + keys_keeping_ranks.append(key) else: - # This replica needs a new rank - replicas_needing_ranks.append(replica) + # This key needs a new rank + keys_needing_ranks.append(key) # Convert remaining target ranks to sorted list for deterministic assignment available_ranks = sorted(target_ranks_set) - # Assign new ranks to replicas that need them - for i, replica in enumerate(replicas_needing_ranks): - replica_id = replica.replica_id.unique_id + # Assign new ranks to keys that need them + for i, key in enumerate(keys_needing_ranks): new_rank = available_ranks[i] # O(1) operation # Store the old rank before updating - old_rank = self._replica_ranks[replica_id] + old_rank = self._ranks[key] - logger.debug( - f"Reassigning replica {replica_id}: rank {old_rank} -> {new_rank}" - ) + logger.debug(f"Reassigning key {key}: rank {old_rank} -> {new_rank}") # Update the rank mapping - self._replica_ranks[replica_id] = new_rank + self._ranks[key] = new_rank # Remove the newly assigned rank from available ranks self._released_ranks.discard(new_rank) # Add the old rank back to available ranks for reuse @@ -1707,17 +1641,151 @@ def _perform_minimal_rank_reassignment( # Log the reassignment summary logger.debug( - f"Minimal reassignment complete: {len(replicas_keeping_ranks)} replicas kept ranks, " - f"{len(replicas_needing_ranks)} replicas reassigned" + f"Minimal reassignment complete: {len(keys_keeping_ranks)} keys kept ranks, " + f"{len(keys_needing_ranks)} keys reassigned" ) - return replicas_needing_ranks + return keys_needing_ranks + + +class DeploymentRankManager: + """Manages replica ranks for a deployment. + This class handles rank assignment, release, consistency checking, and reassignment. + It maintains the rank system invariants and provides a clean interface for rank operations. + + Maintains one level of rank tracking: + - Global rank: Replica-level rank across all nodes (0, 1, 2, ...) + """ + + def __init__(self): + # Global rank manager (existing replica-level rank) + self._replica_rank_manager = RankManager() + + def assign_rank(self, replica_id: str) -> ReplicaRank: + """Assign rank to a replica.""" + if self.has_replica_rank(replica_id): + raise RuntimeError( + f"Rank for {replica_id} already assigned: {self._replica_rank_manager.get_rank(replica_id)}" + ) + + # Assign global rank + rank = self._replica_rank_manager.assign_rank(replica_id) + + return ReplicaRank(rank=rank) + + def release_rank(self, replica_id: str) -> None: + """Release rank for a replica. + + Args: + replica_id: ID of the replica + + Raises: + RuntimeError: If replica doesn't have ranks + """ + if not self.has_replica_rank(replica_id): + raise RuntimeError(f"Rank for {replica_id} not assigned") + + # Release global rank + self._replica_rank_manager.release_rank(replica_id) + + def recover_rank( + self, + replica_id: str, + rank: ReplicaRank, + ) -> None: + """Recover rank for a replica (e.g., after controller restart). + + Args: + replica_id: ID of the replica + rank: The rank to recover + + Raises: + RuntimeError: If replica already has ranks assigned + """ + if self.has_replica_rank(replica_id): + raise RuntimeError( + f"Rank for {replica_id} already assigned: {self._replica_rank_manager.get_rank(replica_id)}" + ) + + # Recover global rank + self._replica_rank_manager.recover_rank(replica_id, rank.rank) + + def has_replica_rank(self, replica_id: str) -> bool: + """Check if replica has a rank assigned.""" + return self._replica_rank_manager.has_rank(replica_id) + + def get_replica_rank(self, replica_id: str) -> ReplicaRank: + """Get the rank for a replica. + + Args: + replica_id: ID of the replica + + Returns: + ReplicaRank object + + Raises: + RuntimeError: If replica doesn't have ranks assigned + """ + if not self.has_replica_rank(replica_id): + raise RuntimeError(f"Rank for {replica_id} not assigned") + + global_rank = self._replica_rank_manager.get_rank(replica_id) + return ReplicaRank(rank=global_rank) + + def check_rank_consistency_and_reassign_minimally( + self, + active_replicas: List["DeploymentReplica"], + ) -> List["DeploymentReplica"]: + """Verify rank system invariants and reassign ranks when needed across all three levels. + + This method ensures: + 1. Global ranks are contiguous [0, N-1] for N replicas + + Args: + active_replicas: List of currently active replicas + + Returns: + List of replicas that need to be reconfigured with new ranks + """ + if not active_replicas: + return [] + + # Extract replica IDs from replicas + active_replica_ids = [ + replica.replica_id.unique_id for replica in active_replicas + ] + + # Create a mapping from replica ID to replica object for quick lookup + replica_id_to_replica = { + replica.replica_id.unique_id: replica for replica in active_replicas + } + + # Track all replicas needing reconfiguration from any rank system + all_replica_ids_needing_reconfiguration = set() + + # STEP 1: Check global rank consistency + replica_ids_from_global = ( + self._replica_rank_manager.check_rank_consistency_and_reassign_minimally( + active_replica_ids + ) + ) + all_replica_ids_needing_reconfiguration.update(replica_ids_from_global) + + # Convert replica IDs back to replica objects + # Filter out stale replicas that are not in the active set + replicas_needing_reconfiguration = [ + replica_id_to_replica[replica_id] + for replica_id in all_replica_ids_needing_reconfiguration + if replica_id in replica_id_to_replica + ] + + return replicas_needing_reconfiguration def clear(self) -> None: - """Clear all rank data. Used for testing and reset.""" - self._replica_ranks.clear() - self._released_ranks.clear() - self._next_rank = 0 + self._replica_rank_manager.clear() + + def get_replica_ranks_mapping(self) -> Dict[str, int]: + return self._replica_rank_manager.get_ranks_mapping() class DeploymentState: @@ -2297,7 +2365,7 @@ def _stop_or_update_outdated_version_replicas(self, max_to_stop=math.inf) -> boo replica.replica_id.unique_id ) actor_updating = replica.reconfigure( - self._target_state.version, rank=current_rank + self._target_state.version, rank=current_rank.rank ) if actor_updating: self._replicas.add(ReplicaState.UPDATING, replica) @@ -2418,14 +2486,14 @@ def scale_deployment_replicas( assigned_rank = self._rank_manager.assign_rank(replica_id.unique_id) logger.debug( - f"Assigned rank {assigned_rank} to new replica {replica_id.unique_id} during startup" + f"Assigned rank {assigned_rank.rank} to new replica {replica_id.unique_id} during startup" ) new_deployment_replica = DeploymentReplica( replica_id, self._target_state.version, ) scheduling_request = new_deployment_replica.start( - self._target_state.info, rank=assigned_rank + self._target_state.info, rank=assigned_rank.rank ) upscale.append(scheduling_request) @@ -2544,7 +2612,9 @@ def _check_startup_replicas( # Recover rank from the replica actor during controller restart replica_id = replica.replica_id.unique_id recovered_rank = replica.rank - self._rank_manager.recover_rank(replica_id, recovered_rank) + self._rank_manager.recover_rank( + replica_id, ReplicaRank(rank=recovered_rank) + ) # This replica should be now be added to handle's replica # set. self._replicas.add(ReplicaState.RUNNING, replica) @@ -2827,7 +2897,7 @@ def _reconfigure_replicas_with_new_ranks( # World size is calculated automatically from deployment config _ = replica.reconfigure( self._target_state.version, - rank=new_rank, + rank=new_rank.rank, ) updated_count += 1 diff --git a/python/ray/serve/schema.py b/python/ray/serve/schema.py index 9d5d9176259d..f7638815443b 100644 --- a/python/ray/serve/schema.py +++ b/python/ray/serve/schema.py @@ -1591,3 +1591,12 @@ class ScaleDeploymentRequest(BaseModel): target_num_replicas: NonNegativeInt = Field( description="The target number of replicas for the deployment." ) + + +@PublicAPI(stability="alpha") +class ReplicaRank(BaseModel): + """Replica rank model.""" + + rank: int = Field( + description="Global rank of the replica across all nodes scoped to the deployment." + ) diff --git a/python/ray/serve/tests/BUILD.bazel b/python/ray/serve/tests/BUILD.bazel index 06b4005ad628..408995d6a6cf 100644 --- a/python/ray/serve/tests/BUILD.bazel +++ b/python/ray/serve/tests/BUILD.bazel @@ -146,9 +146,6 @@ py_test_module_list( # Medium tests, don't run on windows. py_test_module_list( size = "medium", - env = { - "RAY_SERVE_FAIL_ON_RANK_ERROR": "1", - }, files = [ "test_fastapi.py", "test_gcs_failure.py", diff --git a/python/ray/serve/tests/unit/BUILD.bazel b/python/ray/serve/tests/unit/BUILD.bazel index 945a45cf0022..5ab4bb752cc0 100644 --- a/python/ray/serve/tests/unit/BUILD.bazel +++ b/python/ray/serve/tests/unit/BUILD.bazel @@ -24,7 +24,6 @@ py_test_module_list( timeout = "short", env = { "RAY_SERVE_USE_COMPACT_SCHEDULING_STRATEGY": "1", - "RAY_SERVE_FAIL_ON_RANK_ERROR": "1", }, files = [ "test_deployment_scheduler.py", @@ -48,7 +47,6 @@ py_test_module_list_with_env_variants( "metr_disab": { "env": { "RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE": "0", - "RAY_SERVE_FAIL_ON_RANK_ERROR": "1", }, "name_suffix": "_metr_disab", }, @@ -56,7 +54,6 @@ py_test_module_list_with_env_variants( "env": { "RAY_SERVE_AGGREGATE_METRICS_AT_CONTROLLER": "1", "RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE": "1", - "RAY_SERVE_FAIL_ON_RANK_ERROR": "1", }, "name_suffix": "_metr_agg_at_controller", }, @@ -64,7 +61,6 @@ py_test_module_list_with_env_variants( "env": { "RAY_SERVE_AGGREGATE_METRICS_AT_CONTROLLER": "1", "RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE": "0", - "RAY_SERVE_FAIL_ON_RANK_ERROR": "1", }, "name_suffix": "_metr_agg_at_controller_and_replicas", }, diff --git a/python/ray/serve/tests/unit/test_deployment_rank_manager.py b/python/ray/serve/tests/unit/test_deployment_rank_manager.py index 211ce31b9471..4270d3e857ec 100644 --- a/python/ray/serve/tests/unit/test_deployment_rank_manager.py +++ b/python/ray/serve/tests/unit/test_deployment_rank_manager.py @@ -2,10 +2,11 @@ from ray.serve._private.common import DeploymentID, ReplicaID from ray.serve._private.deployment_state import DeploymentRankManager +from ray.serve.schema import ReplicaRank @pytest.fixture -def rank_manager(): +def rank_manager() -> DeploymentRankManager: """Fixture providing a fresh DeploymentRankManager instance for each test.""" return DeploymentRankManager() @@ -33,17 +34,14 @@ class TestDeploymentRankManager: def test_init(self, rank_manager): """Test initialization creates empty state.""" - assert rank_manager._replica_ranks == {} - assert rank_manager._released_ranks == set() - assert rank_manager._next_rank == 0 + assert rank_manager.get_replica_ranks_mapping() == {} def test_assign_rank_first_replica(self, rank_manager): """Test assigning rank to first replica.""" rank = rank_manager.assign_rank("replica_1") - assert rank == 0 - assert rank_manager._replica_ranks["replica_1"] == 0 - assert rank_manager._next_rank == 1 - assert rank_manager._released_ranks == set() + assert rank.rank == 0 + assert rank_manager.has_replica_rank("replica_1") + assert rank_manager.get_replica_rank("replica_1").rank == 0 def test_assign_rank_multiple_replicas(self, rank_manager): """Test assigning ranks to multiple replicas.""" @@ -51,11 +49,13 @@ def test_assign_rank_multiple_replicas(self, rank_manager): rank2 = rank_manager.assign_rank("replica_2") rank3 = rank_manager.assign_rank("replica_3") - assert rank1 == 0 - assert rank2 == 1 - assert rank3 == 2 - assert rank_manager._next_rank == 3 - assert len(rank_manager._replica_ranks) == 3 + assert rank1.rank == 0 + assert rank2.rank == 1 + assert rank3.rank == 2 + + mapping = rank_manager.get_replica_ranks_mapping() + assert len(mapping) == 3 + assert mapping == {"replica_1": 0, "replica_2": 1, "replica_3": 2} def test_assign_rank_reuses_released_ranks(self, rank_manager): """Test that released ranks are reused before assigning new ones.""" @@ -66,19 +66,19 @@ def test_assign_rank_reuses_released_ranks(self, rank_manager): # Release middle rank rank_manager.release_rank("replica_2") - assert 1 in rank_manager._released_ranks + assert not rank_manager.has_replica_rank("replica_2") - # New replica should get the released rank + # New replica should get the released rank (1) rank = rank_manager.assign_rank("replica_4") - assert rank == 1 - assert 1 not in rank_manager._released_ranks + assert rank.rank == 1 + assert rank_manager.get_replica_rank("replica_4").rank == 1 def test_assign_rank_duplicate_fails(self): - """Test assigning rank to replica that already has one fails when flag is enabled.""" + """Test assigning rank to replica that already has one fails.""" rank_manager = DeploymentRankManager() rank_manager.assign_rank("replica_1") - with pytest.raises(RuntimeError, match="already has a rank assigned"): + with pytest.raises(RuntimeError, match="already assigned"): rank_manager.assign_rank("replica_1") def test_release_rank(self, rank_manager): @@ -88,33 +88,34 @@ def test_release_rank(self, rank_manager): rank_manager.release_rank("replica_1") - assert "replica_1" not in rank_manager._replica_ranks - assert 0 in rank_manager._released_ranks - assert "replica_2" in rank_manager._replica_ranks + assert not rank_manager.has_replica_rank("replica_1") + assert rank_manager.has_replica_rank("replica_2") + assert rank_manager.get_replica_rank("replica_2").rank == 1 def test_release_rank_nonexistent_replica(self): - """Test releasing rank for non-existent replica is safe.""" + """Test releasing rank for non-existent replica fails.""" rank_manager = DeploymentRankManager() - with pytest.raises(RuntimeError, match="has no rank assigned"): + with pytest.raises(RuntimeError, match="not assigned"): rank_manager.release_rank("nonexistent") def test_recover_rank_basic(self, rank_manager): """Test basic rank recovery.""" - rank_manager.recover_rank("replica_1", 5) + rank_manager.recover_rank("replica_1", ReplicaRank(rank=5)) - assert rank_manager._replica_ranks["replica_1"] == 5 - assert rank_manager._next_rank == 6 + assert rank_manager.has_replica_rank("replica_1") + assert rank_manager.get_replica_rank("replica_1").rank == 5 def test_recover_rank_updates_next_rank(self, rank_manager): """Test that recovering a high rank updates next_rank appropriately.""" rank_manager.assign_rank("replica_1") # Gets rank 0 - rank_manager.recover_rank("replica_2", 10) + rank_manager.recover_rank("replica_2", ReplicaRank(rank=10)) - assert rank_manager._next_rank == 11 - - # New replica should get rank 11 + # New replica should get rank 11 (next available after 10) rank = rank_manager.assign_rank("replica_3") - assert rank == 11 + assert rank.rank == 11 + + mapping = rank_manager.get_replica_ranks_mapping() + assert mapping == {"replica_1": 0, "replica_2": 10, "replica_3": 11} def test_recover_rank_removes_from_available(self, rank_manager): """Test that recovering a rank removes it from available ranks.""" @@ -122,32 +123,35 @@ def test_recover_rank_removes_from_available(self, rank_manager): rank_manager.assign_rank("replica_2") rank_manager.release_rank("replica_1") # Rank 0 becomes available - assert 0 in rank_manager._released_ranks + # Recover rank 0 for a new replica + rank_manager.recover_rank("replica_3", ReplicaRank(rank=0)) - # Recover rank 0 - rank_manager.recover_rank("replica_3", 0) + # Verify replica_3 has rank 0 + assert rank_manager.has_replica_rank("replica_3") + assert rank_manager.get_replica_rank("replica_3").rank == 0 - assert 0 not in rank_manager._released_ranks - assert rank_manager._replica_ranks["replica_3"] == 0 + # Next assigned replica should get rank 2 (not 0, which is now taken) + rank = rank_manager.assign_rank("replica_4") + assert rank.rank == 2 def test_recover_rank_duplicate_fails(self): - """Test recovering rank for replica that already has one fails when flag is enabled.""" + """Test recovering rank for replica that already has one fails.""" rank_manager = DeploymentRankManager() rank_manager.assign_rank("replica_1") - with pytest.raises(RuntimeError, match="already has a rank assigned"): - rank_manager.recover_rank("replica_1", 5) + with pytest.raises(RuntimeError, match="already assigned"): + rank_manager.recover_rank("replica_1", ReplicaRank(rank=5)) def test_get_replica_rank_existing(self, rank_manager): """Test getting rank for existing replica.""" rank_manager.assign_rank("replica_1") rank = rank_manager.get_replica_rank("replica_1") - assert rank == 0 + assert rank.rank == 0 def test_get_replica_rank_nonexistent_fails(self): - """Test getting rank for non-existent replica fails when flag is enabled.""" + """Test getting rank for non-existent replica fails.""" rank_manager = DeploymentRankManager() - with pytest.raises(RuntimeError, match="has no rank assigned"): + with pytest.raises(RuntimeError, match="not assigned"): rank_manager.get_replica_rank("nonexistent") def test_get_replica_ranks_mapping(self, rank_manager): @@ -160,9 +164,12 @@ def test_get_replica_ranks_mapping(self, rank_manager): assert mapping == expected - # Verify it's a copy + # Verify it's a copy by modifying it mapping["replica_3"] = 2 - assert "replica_3" not in rank_manager._replica_ranks + # Get a fresh mapping to verify the original wasn't changed + fresh_mapping = rank_manager.get_replica_ranks_mapping() + assert "replica_3" not in fresh_mapping + assert fresh_mapping == expected def test_clear(self, rank_manager): """Test clearing all rank data.""" @@ -172,9 +179,14 @@ def test_clear(self, rank_manager): rank_manager.clear() - assert rank_manager._replica_ranks == {} - assert rank_manager._released_ranks == set() - assert rank_manager._next_rank == 0 + # After clearing, should have no ranks + assert rank_manager.get_replica_ranks_mapping() == {} + assert not rank_manager.has_replica_rank("replica_1") + assert not rank_manager.has_replica_rank("replica_2") + + # Should be able to assign from 0 again + rank = rank_manager.assign_rank("replica_3") + assert rank.rank == 0 def test_check_rank_consistency_empty_replicas(self, rank_manager): """Test consistency check with no active replicas.""" @@ -205,12 +217,10 @@ def test_check_rank_consistency_non_contiguous_ranks(self, rank_manager): replica2 = MockDeploymentReplica("replica_2") replica3 = MockDeploymentReplica("replica_3") - # Manually set up non-contiguous ranks - rank_manager._replica_ranks = { - "replica_1": 0, - "replica_2": 2, # Gap at rank 1 - "replica_3": 3, - } + # Manually assign non-contiguous ranks using recover_rank + rank_manager.recover_rank("replica_1", ReplicaRank(rank=0)) + rank_manager.recover_rank("replica_2", ReplicaRank(rank=2)) # Gap at rank 1 + rank_manager.recover_rank("replica_3", ReplicaRank(rank=3)) result = rank_manager.check_rank_consistency_and_reassign_minimally( [replica1, replica2, replica3] @@ -219,8 +229,9 @@ def test_check_rank_consistency_non_contiguous_ranks(self, rank_manager): # Should reassign some replicas to make ranks contiguous assert len(result) > 0 - # After reassignment, ranks should be contiguous - final_ranks = sorted(rank_manager._replica_ranks.values()) + # After reassignment, ranks should be contiguous [0, 1, 2] + mapping = rank_manager.get_replica_ranks_mapping() + final_ranks = sorted(mapping.values()) expected_ranks = [0, 1, 2] assert final_ranks == expected_ranks @@ -231,13 +242,15 @@ def test_minimal_reassignment_keeps_existing_when_possible(self, rank_manager): replica3 = MockDeploymentReplica("replica_3") replica4 = MockDeploymentReplica("replica_4") - # Set up ranks: 0, 2, 5, 7 (non-contiguous) - rank_manager._replica_ranks = { - "replica_1": 0, # Should keep this - "replica_2": 2, # Should keep this - "replica_3": 5, # Should be reassigned to 1 - "replica_4": 7, # Should be reassigned to 3 - } + # Set up ranks: 0, 2, 5, 7 (non-contiguous) using recover_rank + rank_manager.recover_rank("replica_1", ReplicaRank(rank=0)) # Should keep this + rank_manager.recover_rank("replica_2", ReplicaRank(rank=2)) # Should keep this + rank_manager.recover_rank( + "replica_3", ReplicaRank(rank=5) + ) # Should be reassigned to 1 + rank_manager.recover_rank( + "replica_4", ReplicaRank(rank=7) + ) # Should be reassigned to 3 result = rank_manager.check_rank_consistency_and_reassign_minimally( [replica1, replica2, replica3, replica4] @@ -249,93 +262,49 @@ def test_minimal_reassignment_keeps_existing_when_possible(self, rank_manager): assert reassigned_ids == {"replica_3", "replica_4"} # Verify final ranks are contiguous - final_ranks = sorted(rank_manager._replica_ranks.values()) + mapping = rank_manager.get_replica_ranks_mapping() + final_ranks = sorted(mapping.values()) assert final_ranks == [0, 1, 2, 3] # Verify that replica_1 and replica_2 kept their original ranks - assert rank_manager._replica_ranks["replica_1"] == 0 - assert rank_manager._replica_ranks["replica_2"] == 2 + assert rank_manager.get_replica_rank("replica_1").rank == 0 + assert rank_manager.get_replica_rank("replica_2").rank == 2 - def test_check_rank_consistency_unranked_replicas_fails_when_flag_enabled(self): - """Test consistency check fails when active replicas have no ranks and flag is enabled.""" - rank_manager = DeploymentRankManager(_fail_on_error=True) + def test_check_rank_consistency_unranked_replicas_fails(self): + """Test consistency check fails when active replicas have no ranks.""" + rank_manager = DeploymentRankManager() replica1 = MockDeploymentReplica("replica_1") - with pytest.raises( - RuntimeError, match="Controller rank system is in an invalid state" - ): + with pytest.raises(RuntimeError, match="Rank system is in an invalid state"): rank_manager.check_rank_consistency_and_reassign_minimally([replica1]) - def test_check_rank_consistency_unranked_replicas_logs_when_flag_disabled(self): - """Test consistency check only logs when active replicas have no ranks and flag is disabled.""" - rank_manager = DeploymentRankManager(_fail_on_error=False) - replica1 = MockDeploymentReplica("replica_1") - - # When flag is disabled, it logs error but still tries to proceed with reassignment - # However, the reassignment will fail when trying to access ranks that don't exist - result = rank_manager.check_rank_consistency_and_reassign_minimally([replica1]) - assert result == [replica1] - - def test_check_rank_consistency_stale_ranks_fails_when_flag_enabled(self): - """Test consistency check fails when there are stale ranks and flag is enabled.""" - rank_manager = DeploymentRankManager(_fail_on_error=True) + def test_check_rank_consistency_stale_ranks_fails(self): + """Test consistency check fails when there are stale ranks.""" + rank_manager = DeploymentRankManager() replica1 = MockDeploymentReplica("replica_1") # Set up stale rank (replica not in active list) rank_manager.assign_rank("replica_1") rank_manager.assign_rank("stale_replica") - with pytest.raises( - RuntimeError, match="Controller rank system is in an invalid state" - ): + with pytest.raises(RuntimeError, match="Rank system is in an invalid state"): rank_manager.check_rank_consistency_and_reassign_minimally([replica1]) - def test_check_rank_consistency_stale_ranks_logs_when_flag_disabled(self): - """Test consistency check only logs when there are stale ranks and flag is disabled.""" - rank_manager = DeploymentRankManager(_fail_on_error=False) - replica1 = MockDeploymentReplica("replica_1") - - # Set up stale rank (replica not in active list) - rank_manager.assign_rank("replica_1") - rank_manager.assign_rank("stale_replica") - - # When flag is disabled, it logs error but continues with reassignment - # Since only replica_1 is active and has rank 0, no reassignment needed - result = rank_manager.check_rank_consistency_and_reassign_minimally([replica1]) - assert result == [] - - def test_check_rank_consistency_duplicate_ranks_fails_when_flag_enabled(self): - """Test consistency check fails when there are duplicate ranks and flag is enabled.""" - rank_manager = DeploymentRankManager(_fail_on_error=True) + def test_check_rank_consistency_duplicate_ranks_fails(self): + """Test consistency check fails when there are duplicate ranks.""" + rank_manager = DeploymentRankManager() replica1 = MockDeploymentReplica("replica_1") replica2 = MockDeploymentReplica("replica_2") - # Manually create duplicate ranks (this should never happen in normal operation) - rank_manager._replica_ranks = {"replica_1": 0, "replica_2": 0} # Duplicate! + # Manually create duplicate ranks using recover_rank (this should never happen in normal operation) + rank_manager.recover_rank("replica_1", ReplicaRank(rank=0)) + rank_manager.recover_rank("replica_2", ReplicaRank(rank=0)) # Duplicate! - with pytest.raises( - RuntimeError, match="Controller rank system is in an invalid state" - ): + with pytest.raises(RuntimeError, match="Rank system is in an invalid state"): rank_manager.check_rank_consistency_and_reassign_minimally( [replica1, replica2] ) - def test_check_rank_consistency_duplicate_ranks_logs_when_flag_disabled(self): - """Test consistency check only logs when there are duplicate ranks and flag is disabled.""" - rank_manager = DeploymentRankManager(_fail_on_error=False) - replica1 = MockDeploymentReplica("replica_1") - replica2 = MockDeploymentReplica("replica_2") - - # Manually create duplicate ranks (this should never happen in normal operation) - rank_manager._replica_ranks = {"replica_1": 0, "replica_2": 0} # Duplicate! - rank_manager._next_rank = 1 - - # When flag is disabled, it logs error but still performs reassignment to fix the issue - result = rank_manager.check_rank_consistency_and_reassign_minimally( - [replica1, replica2] - ) - assert result == [replica2] or result == [replica1] - if __name__ == "__main__": import sys From 5f118c2f705c89c8edcea37a11a183a2a3e159ad Mon Sep 17 00:00:00 2001 From: abrar Date: Sat, 8 Nov 2025 02:29:44 +0000 Subject: [PATCH 02/11] referance schema Signed-off-by: abrar --- doc/source/serve/api/index.md | 1 + 1 file changed, 1 insertion(+) diff --git a/doc/source/serve/api/index.md b/doc/source/serve/api/index.md index af6540bd5af5..2e30d20d4b57 100644 --- a/doc/source/serve/api/index.md +++ b/doc/source/serve/api/index.md @@ -108,6 +108,7 @@ See the [model composition guide](serve-model-composition) for how to update cod serve.schema.AutoscalingStatus serve.schema.ScalingDecision serve.schema.DeploymentAutoscalingDetail + serve.schema.ReplicaRank ``` ### Request Router From 0f7ae3c9405657a8d4e7c44f5505fc8ea9aafa75 Mon Sep 17 00:00:00 2001 From: abrar Date: Sat, 8 Nov 2025 05:56:42 +0000 Subject: [PATCH 03/11] [Serve] Refactor replica rank to prepare for node local ranks Signed-off-by: abrar --- .../serve/advanced-guides/replica-ranks.md | 30 ++++++++++-- doc/source/serve/doc_code/replica_rank.py | 12 +++-- python/ray/serve/_private/deployment_state.py | 35 ++++++-------- python/ray/serve/_private/replica.py | 16 +++---- python/ray/serve/context.py | 5 +- python/ray/serve/schema.py | 6 +++ python/ray/serve/tests/test_replica_ranks.py | 18 +++---- .../unit/test_deployment_rank_manager.py | 48 ++++++++++++++----- .../serve/tests/unit/test_deployment_state.py | 25 ++++++---- 9 files changed, 125 insertions(+), 70 deletions(-) diff --git a/doc/source/serve/advanced-guides/replica-ranks.md b/doc/source/serve/advanced-guides/replica-ranks.md index f8f7269e2baa..fa0abb8688b8 100644 --- a/doc/source/serve/advanced-guides/replica-ranks.md +++ b/doc/source/serve/advanced-guides/replica-ranks.md @@ -6,7 +6,7 @@ This API is experimental and may change between Ray minor versions. ::: -Replica ranks provide a unique identifier for **each replica within a deployment**. Each replica receives a **rank (an integer from 0 to N-1)** and **a world size (the total number of replicas)**. +Replica ranks provide a unique identifier for **each replica within a deployment**. Each replica receives a **`ReplicaRank` object** containing rank information and **a world size (the total number of replicas)**. The rank object includes a global rank (an integer from 0 to N-1), a node rank, and a local rank on the node. ## Access replica ranks @@ -28,9 +28,29 @@ The following example shows how to access replica rank information: The [`ReplicaContext`](../api/doc/ray.serve.context.ReplicaContext.rst) provides two key fields: -- `rank`: An integer from 0 to N-1 representing this replica's unique identifier. +- `rank`: A [`ReplicaRank`](../api/doc/ray.serve.schema.ReplicaRank.rst) object containing rank information for this replica. Access the integer rank value with `.rank`. - `world_size`: The target number of replicas for the deployment. +The `ReplicaRank` object contains three fields: +- `rank`: The global rank (an integer from 0 to N-1) representing this replica's unique identifier across all nodes. +- `node_rank`: The rank of the node this replica runs on (an integer from 0 to M-1 where M is the number of nodes). +- `local_rank`: The rank of this replica on its node (an integer from 0 to K-1 where K is the number of replicas on this node). + +:::{note} +**Accessing rank values:** + +To use the rank in your code, access the `.rank` attribute to get the integer value: + +```python +context = serve.get_replica_context() +my_rank = context.rank.rank # Get the integer rank value +my_node_rank = context.rank.node_rank # Get the node rank +my_local_rank = context.rank.local_rank # Get the local rank on this node +``` + +Most use cases only need the global `rank` value. The `node_rank` and `local_rank` are useful for advanced scenarios such as coordinating replicas on the same node. +::: + ## Handle rank changes with reconfigure When a replica's rank changes (such as during downscaling), Ray Serve can automatically call the `reconfigure` method on your deployment class to notify it of the new rank. This allows you to update replica-specific state when ranks change. @@ -54,15 +74,15 @@ The following example shows how to implement `reconfigure` to handle rank change Ray Serve automatically calls your `reconfigure` method in the following situations: 1. **At replica startup:** When a replica starts, if your deployment has both a `reconfigure` method and a `user_config`, Ray Serve calls `reconfigure` after running `__init__`. This lets you initialize rank-aware state without duplicating code between `__init__` and `reconfigure`. -2. **When you update user_config:** When you redeploy with a new `user_config`, Ray Serve calls `reconfigure` on all running replicas. If your `reconfigure` method includes `rank` as a parameter, Ray Serve passes both the new `user_config` and the current rank. -3. **When a replica's rank changes:** During downscaling, ranks may be reassigned to maintain contiguity (0 to N-1). If your `reconfigure` method includes `rank` as a parameter and your deployment has a `user_config`, Ray Serve calls `reconfigure` with the existing `user_config` and the new rank. +2. **When you update user_config:** When you redeploy with a new `user_config`, Ray Serve calls `reconfigure` on all running replicas. If your `reconfigure` method includes `rank` as a parameter, Ray Serve passes both the new `user_config` and the current rank as a `ReplicaRank` object. +3. **When a replica's rank changes:** During downscaling, ranks may be reassigned to maintain contiguity (0 to N-1). If your `reconfigure` method includes `rank` as a parameter and your deployment has a `user_config`, Ray Serve calls `reconfigure` with the existing `user_config` and the new rank as a `ReplicaRank` object. :::{note} **Requirements to receive rank updates:** To get rank changes through `reconfigure`, your deployment needs: - A class-based deployment (function deployments don't support `reconfigure`) -- A `reconfigure` method with `rank` as a parameter: `def reconfigure(self, user_config, rank: int)` +- A `reconfigure` method with `rank` as a parameter: `def reconfigure(self, user_config, rank: ReplicaRank)` - A `user_config` in your deployment (even if it's just an empty dict: `user_config={}`) Without a `user_config`, Ray Serve won't call `reconfigure` for rank changes. diff --git a/doc/source/serve/doc_code/replica_rank.py b/doc/source/serve/doc_code/replica_rank.py index f9a43ae9414f..3ff444c420e8 100644 --- a/doc/source/serve/doc_code/replica_rank.py +++ b/doc/source/serve/doc_code/replica_rank.py @@ -5,9 +5,10 @@ @serve.deployment(num_replicas=4) class ModelShard: def __call__(self): + context = serve.get_replica_context() return { - "rank": serve.get_replica_context().rank, - "world_size": serve.get_replica_context().world_size, + "rank": context.rank.rank, # Access the integer rank value + "world_size": context.world_size, } @@ -17,20 +18,21 @@ def __call__(self): # __reconfigure_rank_start__ from typing import Any from ray import serve +from ray.serve.schema import ReplicaRank @serve.deployment(num_replicas=4, user_config={"name": "model_v1"}) class RankAwareModel: def __init__(self): context = serve.get_replica_context() - self.rank = context.rank + self.rank = context.rank.rank # Extract integer rank value self.world_size = context.world_size self.model_name = None print(f"Replica rank: {self.rank}/{self.world_size}") - async def reconfigure(self, user_config: Any, rank: int): + async def reconfigure(self, user_config: Any, rank: ReplicaRank): """Called when user_config or rank changes.""" - self.rank = rank + self.rank = rank.rank # Extract integer rank value from ReplicaRank object self.world_size = serve.get_replica_context().world_size self.model_name = user_config.get("name") print(f"Reconfigured: rank={self.rank}, model={self.model_name}") diff --git a/python/ray/serve/_private/deployment_state.py b/python/ray/serve/_private/deployment_state.py index aa902230832f..fd1450e372f7 100644 --- a/python/ray/serve/_private/deployment_state.py +++ b/python/ray/serve/_private/deployment_state.py @@ -253,7 +253,7 @@ def __init__( self._docs_path: Optional[str] = None self._route_patterns: Optional[List[str]] = None # Rank assigned to the replica. - self._rank: Optional[int] = None + self._rank: Optional[ReplicaRank] = None # Populated in `on_scheduled` or `recover`. self._actor_handle: ActorHandle = None self._placement_group: PlacementGroup = None @@ -289,7 +289,7 @@ def deployment_name(self) -> str: return self._deployment_id.name @property - def rank(self) -> Optional[int]: + def rank(self) -> Optional[ReplicaRank]: return self._rank @property @@ -441,7 +441,7 @@ def initialization_latency_s(self) -> Optional[float]: return self._initialization_latency_s def start( - self, deployment_info: DeploymentInfo, rank: int + self, deployment_info: DeploymentInfo, rank: ReplicaRank ) -> ReplicaSchedulingRequest: """Start the current DeploymentReplica instance. @@ -608,11 +608,7 @@ def _format_user_config(self, user_config: Any): temp = msgpack_deserialize(temp) return temp - def reconfigure( - self, - version: DeploymentVersion, - rank: int, - ) -> bool: + def reconfigure(self, version: DeploymentVersion, rank: ReplicaRank) -> bool: """ Update replica version. Also, updates the deployment config on the actor behind this DeploymentReplica instance if necessary. @@ -1169,7 +1165,7 @@ def initialization_latency_s(self) -> Optional[float]: return self._actor.initialization_latency_s def start( - self, deployment_info: DeploymentInfo, rank: int + self, deployment_info: DeploymentInfo, rank: ReplicaRank ) -> ReplicaSchedulingRequest: """ Start a new actor for current DeploymentReplica instance. @@ -1183,7 +1179,7 @@ def start( def reconfigure( self, version: DeploymentVersion, - rank: int, + rank: ReplicaRank, ) -> bool: """ Update replica version. Also, updates the deployment config on the actor @@ -1210,7 +1206,7 @@ def recover(self) -> bool: return True @property - def rank(self) -> Optional[int]: + def rank(self) -> Optional[ReplicaRank]: """Get the rank assigned to the replica.""" return self._actor.rank @@ -1671,7 +1667,7 @@ def assign_rank(self, replica_id: str) -> ReplicaRank: # Assign global rank rank = self._replica_rank_manager.assign_rank(replica_id) - return ReplicaRank(rank=rank) + return ReplicaRank(rank=rank, node_rank=-1, local_rank=-1) def release_rank(self, replica_id: str) -> None: """Release rank for a replica. @@ -1730,7 +1726,7 @@ def get_replica_rank(self, replica_id: str) -> ReplicaRank: raise RuntimeError(f"Rank for {replica_id} not assigned") global_rank = self._replica_rank_manager.get_rank(replica_id) - return ReplicaRank(rank=global_rank) + return ReplicaRank(rank=global_rank, node_rank=-1, local_rank=-1) def check_rank_consistency_and_reassign_minimally( self, @@ -2365,7 +2361,7 @@ def _stop_or_update_outdated_version_replicas(self, max_to_stop=math.inf) -> boo replica.replica_id.unique_id ) actor_updating = replica.reconfigure( - self._target_state.version, rank=current_rank.rank + self._target_state.version, rank=current_rank ) if actor_updating: self._replicas.add(ReplicaState.UPDATING, replica) @@ -2486,14 +2482,14 @@ def scale_deployment_replicas( assigned_rank = self._rank_manager.assign_rank(replica_id.unique_id) logger.debug( - f"Assigned rank {assigned_rank.rank} to new replica {replica_id.unique_id} during startup" + f"Assigned rank {assigned_rank} to new replica {replica_id.unique_id} during startup" ) new_deployment_replica = DeploymentReplica( replica_id, self._target_state.version, ) scheduling_request = new_deployment_replica.start( - self._target_state.info, rank=assigned_rank.rank + self._target_state.info, rank=assigned_rank ) upscale.append(scheduling_request) @@ -2611,10 +2607,7 @@ def _check_startup_replicas( # data structure with RUNNING state. # Recover rank from the replica actor during controller restart replica_id = replica.replica_id.unique_id - recovered_rank = replica.rank - self._rank_manager.recover_rank( - replica_id, ReplicaRank(rank=recovered_rank) - ) + self._rank_manager.recover_rank(replica_id, replica.rank) # This replica should be now be added to handle's replica # set. self._replicas.add(ReplicaState.RUNNING, replica) @@ -2897,7 +2890,7 @@ def _reconfigure_replicas_with_new_ranks( # World size is calculated automatically from deployment config _ = replica.reconfigure( self._target_state.version, - rank=new_rank.rank, + rank=new_rank, ) updated_count += 1 diff --git a/python/ray/serve/_private/replica.py b/python/ray/serve/_private/replica.py index fcb882fdb19d..748bac3b9b2b 100644 --- a/python/ray/serve/_private/replica.py +++ b/python/ray/serve/_private/replica.py @@ -113,7 +113,7 @@ DeploymentUnavailableError, RayServeException, ) -from ray.serve.schema import EncodingType, LoggingConfig +from ray.serve.schema import EncodingType, LoggingConfig, ReplicaRank logger = logging.getLogger(SERVE_LOGGER_NAME) @@ -126,7 +126,7 @@ Optional[str], int, int, - int, # rank + ReplicaRank, # rank Optional[List[str]], # route_patterns ] @@ -507,7 +507,7 @@ def __init__( version: DeploymentVersion, ingress: bool, route_prefix: str, - rank: int, + rank: ReplicaRank, ): self._version = version self._replica_id = replica_id @@ -601,7 +601,7 @@ def get_metadata(self) -> ReplicaMetadata: ) def _set_internal_replica_context( - self, *, servable_object: Callable = None, rank: int = None + self, *, servable_object: Callable = None, rank: ReplicaRank = None ): # Calculate world_size from deployment config instead of storing it world_size = self._deployment_config.num_replicas @@ -946,7 +946,7 @@ async def initialize(self, deployment_config: DeploymentConfig): async def reconfigure( self, deployment_config: DeploymentConfig, - rank: int, + rank: ReplicaRank, route_prefix: Optional[str] = None, ): try: @@ -1171,7 +1171,7 @@ async def __init__( version: DeploymentVersion, ingress: bool, route_prefix: str, - rank: int, + rank: ReplicaRank, ): deployment_config = DeploymentConfig.from_proto_bytes( deployment_config_proto_bytes @@ -1251,7 +1251,7 @@ async def record_routing_stats(self) -> Dict[str, Any]: return await self._replica_impl.record_routing_stats() async def reconfigure( - self, deployment_config, rank: int, route_prefix: Optional[str] = None + self, deployment_config, rank: ReplicaRank, route_prefix: Optional[str] = None ) -> ReplicaMetadata: await self._replica_impl.reconfigure(deployment_config, rank, route_prefix) return self._replica_impl.get_metadata() @@ -1748,7 +1748,7 @@ async def _call_user_autoscaling_stats(self) -> Dict[str, Union[int, float]]: return result @_run_user_code - async def call_reconfigure(self, user_config: Optional[Any], rank: int): + async def call_reconfigure(self, user_config: Optional[Any], rank: ReplicaRank): self._raise_if_not_initialized("call_reconfigure") # NOTE(edoakes): there is the possibility of a race condition in user code if diff --git a/python/ray/serve/context.py b/python/ray/serve/context.py index b6ad7bb2a685..d983d455ebf3 100644 --- a/python/ray/serve/context.py +++ b/python/ray/serve/context.py @@ -23,6 +23,7 @@ from ray.serve._private.replica_result import ReplicaResult from ray.serve.exceptions import RayServeException from ray.serve.grpc_util import RayServegRPCContext +from ray.serve.schema import ReplicaRank from ray.util.annotations import DeveloperAPI logger = logging.getLogger(SERVE_LOGGER_NAME) @@ -48,7 +49,7 @@ class ReplicaContext: replica_id: ReplicaID servable_object: Callable _deployment_config: DeploymentConfig - rank: int + rank: ReplicaRank world_size: int @property @@ -112,7 +113,7 @@ def _set_internal_replica_context( replica_id: ReplicaID, servable_object: Callable, _deployment_config: DeploymentConfig, - rank: int, + rank: ReplicaRank, world_size: int, ): global _INTERNAL_REPLICA_CONTEXT diff --git a/python/ray/serve/schema.py b/python/ray/serve/schema.py index f7638815443b..6318207303cb 100644 --- a/python/ray/serve/schema.py +++ b/python/ray/serve/schema.py @@ -1600,3 +1600,9 @@ class ReplicaRank(BaseModel): rank: int = Field( description="Global rank of the replica across all nodes scoped to the deployment." ) + + node_rank: int = Field(description="Rank of the node in the deployment.") + + local_rank: int = Field( + description="Rank of the replica on the node scoped to the deployment." + ) diff --git a/python/ray/serve/tests/test_replica_ranks.py b/python/ray/serve/tests/test_replica_ranks.py index 74e8ec20124c..ce74f643936f 100644 --- a/python/ray/serve/tests/test_replica_ranks.py +++ b/python/ray/serve/tests/test_replica_ranks.py @@ -22,6 +22,7 @@ check_deployment_status, check_num_replicas_eq, ) +from ray.serve.schema import ReplicaRank def get_controller() -> ServeController: @@ -97,7 +98,7 @@ def __init__(self): def __call__(self): context = serve.get_replica_context() - self.replica_rank = context.rank + self.replica_rank = context.rank.rank if context.rank else None self.world_size = context.world_size return { "rank": self.replica_rank, @@ -156,7 +157,7 @@ async def __call__(self): await signal_actor.wait.remote() context = serve.get_replica_context() return { - "rank": context.rank, + "rank": context.rank.rank if context.rank else None, "world_size": context.world_size, } @@ -214,7 +215,7 @@ class PersistentRankTracker: def __call__(self): context = serve.get_replica_context() return { - "rank": context.rank, + "rank": context.rank.rank if context.rank else None, "world_size": context.world_size, } @@ -265,7 +266,7 @@ class SingleReplicaTracker: def __call__(self): context = serve.get_replica_context() return { - "rank": context.rank, + "rank": context.rank.rank if context.rank else None, "world_size": context.world_size, } @@ -296,7 +297,7 @@ def __call__(self): context = serve.get_replica_context() return { "deployment": "deployment1", - "rank": context.rank, + "rank": context.rank.rank if context.rank else None, "world_size": context.world_size, } @@ -309,7 +310,7 @@ def __call__(self): context = serve.get_replica_context() return { "deployment": "deployment2", - "rank": context.rank, + "rank": context.rank.rank if context.rank else None, "world_size": context.world_size, } @@ -410,8 +411,9 @@ async def __call__(self): await signal_actor.wait.remote() return self.my_rank - async def reconfigure(self, user_config: Any, rank: int): - self.my_rank = rank + async def reconfigure(self, user_config: Any, rank: ReplicaRank): + # rank parameter is actually a ReplicaRank object, extract the integer value + self.my_rank = rank.rank handle = serve.run(ReconfigureRankTracker.bind()) wait_for_condition( diff --git a/python/ray/serve/tests/unit/test_deployment_rank_manager.py b/python/ray/serve/tests/unit/test_deployment_rank_manager.py index 4270d3e857ec..7df12cca9308 100644 --- a/python/ray/serve/tests/unit/test_deployment_rank_manager.py +++ b/python/ray/serve/tests/unit/test_deployment_rank_manager.py @@ -100,7 +100,9 @@ def test_release_rank_nonexistent_replica(self): def test_recover_rank_basic(self, rank_manager): """Test basic rank recovery.""" - rank_manager.recover_rank("replica_1", ReplicaRank(rank=5)) + rank_manager.recover_rank( + "replica_1", ReplicaRank(rank=5, node_rank=0, local_rank=0) + ) assert rank_manager.has_replica_rank("replica_1") assert rank_manager.get_replica_rank("replica_1").rank == 5 @@ -108,7 +110,9 @@ def test_recover_rank_basic(self, rank_manager): def test_recover_rank_updates_next_rank(self, rank_manager): """Test that recovering a high rank updates next_rank appropriately.""" rank_manager.assign_rank("replica_1") # Gets rank 0 - rank_manager.recover_rank("replica_2", ReplicaRank(rank=10)) + rank_manager.recover_rank( + "replica_2", ReplicaRank(rank=10, node_rank=0, local_rank=0) + ) # New replica should get rank 11 (next available after 10) rank = rank_manager.assign_rank("replica_3") @@ -124,7 +128,9 @@ def test_recover_rank_removes_from_available(self, rank_manager): rank_manager.release_rank("replica_1") # Rank 0 becomes available # Recover rank 0 for a new replica - rank_manager.recover_rank("replica_3", ReplicaRank(rank=0)) + rank_manager.recover_rank( + "replica_3", ReplicaRank(rank=0, node_rank=0, local_rank=0) + ) # Verify replica_3 has rank 0 assert rank_manager.has_replica_rank("replica_3") @@ -140,7 +146,9 @@ def test_recover_rank_duplicate_fails(self): rank_manager.assign_rank("replica_1") with pytest.raises(RuntimeError, match="already assigned"): - rank_manager.recover_rank("replica_1", ReplicaRank(rank=5)) + rank_manager.recover_rank( + "replica_1", ReplicaRank(rank=5, node_rank=0, local_rank=0) + ) def test_get_replica_rank_existing(self, rank_manager): """Test getting rank for existing replica.""" @@ -218,9 +226,15 @@ def test_check_rank_consistency_non_contiguous_ranks(self, rank_manager): replica3 = MockDeploymentReplica("replica_3") # Manually assign non-contiguous ranks using recover_rank - rank_manager.recover_rank("replica_1", ReplicaRank(rank=0)) - rank_manager.recover_rank("replica_2", ReplicaRank(rank=2)) # Gap at rank 1 - rank_manager.recover_rank("replica_3", ReplicaRank(rank=3)) + rank_manager.recover_rank( + "replica_1", ReplicaRank(rank=0, node_rank=0, local_rank=0) + ) + rank_manager.recover_rank( + "replica_2", ReplicaRank(rank=2, node_rank=0, local_rank=0) + ) # Gap at rank 1 + rank_manager.recover_rank( + "replica_3", ReplicaRank(rank=3, node_rank=0, local_rank=0) + ) result = rank_manager.check_rank_consistency_and_reassign_minimally( [replica1, replica2, replica3] @@ -243,13 +257,17 @@ def test_minimal_reassignment_keeps_existing_when_possible(self, rank_manager): replica4 = MockDeploymentReplica("replica_4") # Set up ranks: 0, 2, 5, 7 (non-contiguous) using recover_rank - rank_manager.recover_rank("replica_1", ReplicaRank(rank=0)) # Should keep this - rank_manager.recover_rank("replica_2", ReplicaRank(rank=2)) # Should keep this rank_manager.recover_rank( - "replica_3", ReplicaRank(rank=5) + "replica_1", ReplicaRank(rank=0, node_rank=0, local_rank=0) + ) # Should keep this + rank_manager.recover_rank( + "replica_2", ReplicaRank(rank=2, node_rank=0, local_rank=0) + ) # Should keep this + rank_manager.recover_rank( + "replica_3", ReplicaRank(rank=5, node_rank=0, local_rank=0) ) # Should be reassigned to 1 rank_manager.recover_rank( - "replica_4", ReplicaRank(rank=7) + "replica_4", ReplicaRank(rank=7, node_rank=0, local_rank=0) ) # Should be reassigned to 3 result = rank_manager.check_rank_consistency_and_reassign_minimally( @@ -297,8 +315,12 @@ def test_check_rank_consistency_duplicate_ranks_fails(self): replica2 = MockDeploymentReplica("replica_2") # Manually create duplicate ranks using recover_rank (this should never happen in normal operation) - rank_manager.recover_rank("replica_1", ReplicaRank(rank=0)) - rank_manager.recover_rank("replica_2", ReplicaRank(rank=0)) # Duplicate! + rank_manager.recover_rank( + "replica_1", ReplicaRank(rank=0, node_rank=0, local_rank=0) + ) + rank_manager.recover_rank( + "replica_2", ReplicaRank(rank=0, node_rank=0, local_rank=0) + ) # Duplicate! with pytest.raises(RuntimeError, match="Rank system is in an invalid state"): rank_manager.check_rank_consistency_and_reassign_minimally( diff --git a/python/ray/serve/tests/unit/test_deployment_state.py b/python/ray/serve/tests/unit/test_deployment_state.py index df145bf0de31..c6891626570e 100644 --- a/python/ray/serve/tests/unit/test_deployment_state.py +++ b/python/ray/serve/tests/unit/test_deployment_state.py @@ -54,6 +54,7 @@ get_capacity_adjusted_num_replicas, get_random_string, ) +from ray.serve.schema import ReplicaRank from ray.util.placement_group import validate_placement_group # Global variable that is fetched during controller recovery that @@ -64,7 +65,7 @@ # loop, so we can't "mark" a replica dead through a method. This global # state is cleared after each test that uses the fixtures in this file. dead_replicas_context = set() -replica_rank_context = {} +replica_rank_context: Dict[str, ReplicaRank] = {} TEST_DEPLOYMENT_ID = DeploymentID(name="test_deployment", app_name="test_app") TEST_DEPLOYMENT_ID_2 = DeploymentID(name="test_deployment_2", app_name="test_app") @@ -225,7 +226,7 @@ def set_node_id(self, node_id: str): def set_actor_id(self, actor_id: str): self._actor_id = actor_id - def start(self, deployment_info: DeploymentInfo, rank: int): + def start(self, deployment_info: DeploymentInfo, rank: ReplicaRank): self.started = True self._rank = rank replica_rank_context[self._replica_id.unique_id] = rank @@ -246,13 +247,13 @@ def _on_scheduled_stub(*args, **kwargs): ) @property - def rank(self) -> Optional[int]: + def rank(self) -> Optional[ReplicaRank]: return self._rank def reconfigure( self, version: DeploymentVersion, - rank: int = None, + rank: ReplicaRank = None, ): self.started = True updating = self.version.requires_actor_reconfigure(version) @@ -5434,10 +5435,18 @@ def test_complex_reassignment_scenario(self, mock_deployment_state_manager): # Simulate very scattered ranks in global context: 0, 3, 7, 10 global replica_rank_context replica_rank_context.clear() - replica_rank_context[replica_ids[0].unique_id] = 0 - replica_rank_context[replica_ids[1].unique_id] = 3 - replica_rank_context[replica_ids[2].unique_id] = 7 - replica_rank_context[replica_ids[3].unique_id] = 10 + replica_rank_context[replica_ids[0].unique_id] = ReplicaRank( + rank=0, node_rank=-1, local_rank=-1 + ) + replica_rank_context[replica_ids[1].unique_id] = ReplicaRank( + rank=3, node_rank=-1, local_rank=-1 + ) + replica_rank_context[replica_ids[2].unique_id] = ReplicaRank( + rank=7, node_rank=-1, local_rank=-1 + ) + replica_rank_context[replica_ids[3].unique_id] = ReplicaRank( + rank=10, node_rank=-1, local_rank=-1 + ) # Simulate controller crashed! Create a new deployment state manager # with the existing replica IDs to trigger recovery From 8d4fbbcc0142e296ea7deea287aabbdfb45bc1c9 Mon Sep 17 00:00:00 2001 From: abrar Date: Sat, 8 Nov 2025 20:30:18 +0000 Subject: [PATCH 04/11] pass rank into replicas initialize method Signed-off-by: abrar --- python/ray/serve/_private/deployment_state.py | 52 +++++++++++-------- python/ray/serve/_private/replica.py | 18 ++++--- .../serve/tests/unit/test_deployment_state.py | 18 +++++-- 3 files changed, 52 insertions(+), 36 deletions(-) diff --git a/python/ray/serve/_private/deployment_state.py b/python/ray/serve/_private/deployment_state.py index fd1450e372f7..cbd58787040a 100644 --- a/python/ray/serve/_private/deployment_state.py +++ b/python/ray/serve/_private/deployment_state.py @@ -253,6 +253,7 @@ def __init__( self._docs_path: Optional[str] = None self._route_patterns: Optional[List[str]] = None # Rank assigned to the replica. + self._assign_rank_callback: Optional[Callable[[ReplicaID], ReplicaRank]] = None self._rank: Optional[ReplicaRank] = None # Populated in `on_scheduled` or `recover`. self._actor_handle: ActorHandle = None @@ -441,14 +442,16 @@ def initialization_latency_s(self) -> Optional[float]: return self._initialization_latency_s def start( - self, deployment_info: DeploymentInfo, rank: ReplicaRank + self, + deployment_info: DeploymentInfo, + assign_rank_callback: Callable[[ReplicaID], ReplicaRank], ) -> ReplicaSchedulingRequest: """Start the current DeploymentReplica instance. The replica will be in the STARTING and PENDING_ALLOCATION states until the deployment scheduler schedules the underlying actor. """ - self._rank = rank # Store the rank assigned to this replica + self._assign_rank_callback = assign_rank_callback self._actor_resources = deployment_info.replica_config.resource_dict self._ingress = deployment_info.ingress # it is currently not possible to create a placement group @@ -492,7 +495,6 @@ def start( self._version, deployment_info.ingress, deployment_info.route_prefix, - rank, ) # TODO(simon): unify the constructor arguments across language elif ( @@ -587,17 +589,19 @@ def on_scheduled( ) else: self._allocated_obj_ref = self._actor_handle.is_allocated.remote() - replica_ready_check_func = self._actor_handle.initialize_and_get_metadata - self._ready_obj_ref = replica_ready_check_func.remote( - deployment_config, - # Ensure that `is_allocated` will execute - # before `initialize_and_get_metadata`, - # because `initialize_and_get_metadata` runs - # user code that could block the replica - # asyncio loop. If that happens before `is_allocated` is executed, - # the `is_allocated` call won't be able to run. - self._allocated_obj_ref, - ) + + def on_completed(args): + self._rank = self._assign_rank_callback(self._replica_id.unique_id) + + replica_ready_check_func = ( + self._actor_handle.initialize_and_get_metadata + ) + self._ready_obj_ref = replica_ready_check_func.remote( + deployment_config, + self._rank, + ) + + self._allocated_obj_ref._on_completed(on_completed) def _format_user_config(self, user_config: Any): temp = copy(user_config) @@ -742,6 +746,9 @@ def check_ready(self) -> Tuple[ReplicaStartupStatus, Optional[str]]: logger.exception(msg) return ReplicaStartupStatus.FAILED, msg + if self._ready_obj_ref is None: + return ReplicaStartupStatus.PENDING_INITIALIZATION, None + # Check whether replica initialization has completed. replica_ready = check_obj_ref_ready_nowait(self._ready_obj_ref) # In case of deployment constructor failure, ray.get will help to @@ -1165,12 +1172,16 @@ def initialization_latency_s(self) -> Optional[float]: return self._actor.initialization_latency_s def start( - self, deployment_info: DeploymentInfo, rank: ReplicaRank + self, + deployment_info: DeploymentInfo, + assign_rank_callback: Callable[[ReplicaID], ReplicaRank], ) -> ReplicaSchedulingRequest: """ Start a new actor for current DeploymentReplica instance. """ - replica_scheduling_request = self._actor.start(deployment_info, rank=rank) + replica_scheduling_request = self._actor.start( + deployment_info, assign_rank_callback=assign_rank_callback + ) self._start_time = time.time() self._logged_shutdown_message = False self.update_actor_details(start_time_s=self._start_time) @@ -2478,18 +2489,13 @@ def scale_deployment_replicas( for _ in range(to_add): replica_id = ReplicaID(get_random_string(), deployment_id=self._id) - # Assign rank during replica creation (startup process) - assigned_rank = self._rank_manager.assign_rank(replica_id.unique_id) - - logger.debug( - f"Assigned rank {assigned_rank} to new replica {replica_id.unique_id} during startup" - ) new_deployment_replica = DeploymentReplica( replica_id, self._target_state.version, ) scheduling_request = new_deployment_replica.start( - self._target_state.info, rank=assigned_rank + self._target_state.info, + assign_rank_callback=self._rank_manager.assign_rank, ) upscale.append(scheduling_request) diff --git a/python/ray/serve/_private/replica.py b/python/ray/serve/_private/replica.py index 748bac3b9b2b..c6b9b7535156 100644 --- a/python/ray/serve/_private/replica.py +++ b/python/ray/serve/_private/replica.py @@ -507,7 +507,6 @@ def __init__( version: DeploymentVersion, ingress: bool, route_prefix: str, - rank: ReplicaRank, ): self._version = version self._replica_id = replica_id @@ -555,7 +554,7 @@ def __init__( # Set metadata for logs and metrics. # servable_object will be populated in `initialize_and_get_metadata`. - self._set_internal_replica_context(servable_object=None, rank=rank) + self._set_internal_replica_context(servable_object=None, rank=None) self._metrics_manager = create_replica_metrics_manager( replica_id=replica_id, @@ -569,7 +568,7 @@ def __init__( self._http_port: Optional[int] = None self._grpc_port: Optional[int] = None - self._rank = rank + self._rank: Optional[ReplicaRank] = None @property def max_ongoing_requests(self) -> int: @@ -895,7 +894,12 @@ async def handle_request_with_rejection( async def _on_initialized(self): raise NotImplementedError - async def initialize(self, deployment_config: DeploymentConfig): + async def initialize(self, deployment_config: DeploymentConfig, rank: ReplicaRank): + if rank is not None: + self._rank = rank + self._set_internal_replica_context( + servable_object=self._user_callable_wrapper.user_callable, rank=rank + ) try: # Ensure that initialization is only performed once. # When controller restarts, it will call this method again. @@ -1171,7 +1175,6 @@ async def __init__( version: DeploymentVersion, ingress: bool, route_prefix: str, - rank: ReplicaRank, ): deployment_config = DeploymentConfig.from_proto_bytes( deployment_config_proto_bytes @@ -1188,7 +1191,6 @@ async def __init__( version=version, ingress=ingress, route_prefix=route_prefix, - rank=rank, ) def push_proxy_handle(self, handle: ActorHandle): @@ -1228,7 +1230,7 @@ async def is_allocated(self) -> str: ) async def initialize_and_get_metadata( - self, deployment_config: DeploymentConfig = None, _after: Optional[Any] = None + self, deployment_config: DeploymentConfig = None, rank: ReplicaRank = None ) -> ReplicaMetadata: """Handles initializing the replica. @@ -1241,7 +1243,7 @@ async def initialize_and_get_metadata( """ # Unused `_after` argument is for scheduling: passing an ObjectRef # allows delaying this call until after the `_after` call has returned. - await self._replica_impl.initialize(deployment_config) + await self._replica_impl.initialize(deployment_config, rank) return self._replica_impl.get_metadata() async def check_health(self): diff --git a/python/ray/serve/tests/unit/test_deployment_state.py b/python/ray/serve/tests/unit/test_deployment_state.py index c6891626570e..9caa560a6d49 100644 --- a/python/ray/serve/tests/unit/test_deployment_state.py +++ b/python/ray/serve/tests/unit/test_deployment_state.py @@ -1,6 +1,6 @@ import sys from copy import deepcopy -from typing import Any, Dict, List, Optional, Tuple +from typing import Any, Callable, Dict, List, Optional, Tuple from unittest.mock import Mock, patch import pytest @@ -108,6 +108,7 @@ def __init__( self._initialization_latency_s = -1 self._docs_path = None self._rank = replica_rank_context.get(replica_id.unique_id, None) + self._assign_rank_callback = None @property def is_cross_language(self) -> bool: @@ -226,10 +227,15 @@ def set_node_id(self, node_id: str): def set_actor_id(self, actor_id: str): self._actor_id = actor_id - def start(self, deployment_info: DeploymentInfo, rank: ReplicaRank): + def start( + self, + deployment_info: DeploymentInfo, + assign_rank_callback: Callable[[ReplicaID], ReplicaRank], + ): self.started = True - self._rank = rank - replica_rank_context[self._replica_id.unique_id] = rank + self._assign_rank_callback = assign_rank_callback + self._rank = assign_rank_callback(self._replica_id.unique_id) + replica_rank_context[self._replica_id.unique_id] = self._rank def _on_scheduled_stub(*args, **kwargs): pass @@ -2682,7 +2688,9 @@ def test_max_concurrency_override(self): ) max_ongoing_requests = DEFAULT_MAX_CONCURRENCY_ASYNC + 1 d_info, _ = deployment_info(max_ongoing_requests=max_ongoing_requests) - replica_scheduling_request = actor_replica.start(d_info, rank=0) + replica_scheduling_request = actor_replica.start( + d_info, assign_rank_callback=lambda x: 0 + ) assert ( "max_concurrency" in replica_scheduling_request.actor_options and replica_scheduling_request.actor_options["max_concurrency"] From 51f2935d993cba33ede5d260a727522a4d973d7b Mon Sep 17 00:00:00 2001 From: abrar Date: Sat, 8 Nov 2025 22:05:24 +0000 Subject: [PATCH 05/11] fix test Signed-off-by: abrar --- python/ray/serve/tests/test_cluster.py | 42 +++++++++++++++++++++----- 1 file changed, 34 insertions(+), 8 deletions(-) diff --git a/python/ray/serve/tests/test_cluster.py b/python/ray/serve/tests/test_cluster.py index eb62427df0fe..330e7bb9cabf 100644 --- a/python/ray/serve/tests/test_cluster.py +++ b/python/ray/serve/tests/test_cluster.py @@ -176,12 +176,21 @@ def get_replicas(replica_state): ) return replicas.get([replica_state]) - # wait for serve to start the replica, and catch a reference to it. + # wait for serve to start the replica wait_for_condition(lambda: len(get_replicas(ReplicaState.STARTING)) > 0) - replica = get_replicas(ReplicaState.STARTING)[0] # currently there are no resources to allocate the replica - assert replica.check_started()[0] == ReplicaStartupStatus.PENDING_ALLOCATION + def get_starting_replica(): + replicas = get_replicas(ReplicaState.STARTING) + return replicas[0] if replicas else None + + def is_pending_allocation(): + replica = get_starting_replica() + if replica is None: + return False + return replica.check_started()[0] == ReplicaStartupStatus.PENDING_ALLOCATION + + wait_for_condition(is_pending_allocation) # add the necessary resources to allocate the replica cluster.add_node(num_cpus=4) @@ -189,17 +198,34 @@ def get_replicas(replica_state): wait_for_condition(lambda: (ray.available_resources().get("CPU", 0) >= 2)) def is_replica_pending_initialization(): + replica = get_starting_replica() + if replica is None: + return False status, _ = replica.check_started() - print(status) return status == ReplicaStartupStatus.PENDING_INITIALIZATION wait_for_condition(is_replica_pending_initialization, timeout=25) # send signal to complete replica initialization - signal.send.remote() - wait_for_condition( - lambda: replica.check_started()[0] == ReplicaStartupStatus.SUCCEEDED - ) + ray.get(signal.send.remote()) + + def check_succeeded(): + # After initialization succeeds, replica transitions to RUNNING state + # So check both STARTING and RUNNING states + replica = get_starting_replica() + if replica: + status, _ = replica.check_started() + if status == ReplicaStartupStatus.SUCCEEDED: + return True + + # Check if replica has moved to RUNNING state (which means it succeeded) + running_replicas = get_replicas(ReplicaState.RUNNING) + if running_replicas and len(running_replicas) > 0: + return True + + return False + + wait_for_condition(check_succeeded) @serve.deployment From d66a7b5b9997f80410c86451e8929391f4daa17a Mon Sep 17 00:00:00 2001 From: abrar Date: Sun, 9 Nov 2025 00:39:24 +0000 Subject: [PATCH 06/11] fix java test Signed-off-by: abrar --- python/ray/serve/_private/deployment_state.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/python/ray/serve/_private/deployment_state.py b/python/ray/serve/_private/deployment_state.py index cbd58787040a..238e83f33f9d 100644 --- a/python/ray/serve/_private/deployment_state.py +++ b/python/ray/serve/_private/deployment_state.py @@ -584,9 +584,14 @@ def on_scheduled( if self._is_cross_language: self._actor_handle = JavaActorHandleProxy(self._actor_handle) self._allocated_obj_ref = self._actor_handle.is_allocated.remote() - self._ready_obj_ref = self._actor_handle.is_initialized.remote( - deployment_config.to_proto_bytes() - ) + + def on_completed(args): + self._rank = self._assign_rank_callback(self._replica_id.unique_id) + self._ready_obj_ref = self._actor_handle.is_initialized.remote( + deployment_config.to_proto_bytes() + ) + + self._allocated_obj_ref._on_completed(on_completed) else: self._allocated_obj_ref = self._actor_handle.is_allocated.remote() From ad660b36bd946b0a3094b56284edd85724715fe8 Mon Sep 17 00:00:00 2001 From: abrar Date: Wed, 12 Nov 2025 23:16:15 +0000 Subject: [PATCH 07/11] add fail on rank Signed-off-by: abrar --- python/ray/serve/_private/constants.py | 4 + python/ray/serve/_private/deployment_state.py | 134 +++++++++----- python/ray/serve/tests/BUILD.bazel | 3 + python/ray/serve/tests/unit/BUILD.bazel | 4 + .../unit/test_deployment_rank_manager.py | 175 ++++++++++++++++++ 5 files changed, 271 insertions(+), 49 deletions(-) diff --git a/python/ray/serve/_private/constants.py b/python/ray/serve/_private/constants.py index 59928e6bee8c..4035e04d36fc 100644 --- a/python/ray/serve/_private/constants.py +++ b/python/ray/serve/_private/constants.py @@ -490,6 +490,10 @@ "RAY_SERVE_REQUEST_PATH_LOG_BUFFER_SIZE", 1 ) +# Feature flag to fail the deployment if the rank is not set. +# TODO (abrar): Remove this flag after the feature is stable. +RAY_SERVE_FAIL_ON_RANK_ERROR = get_env_bool("RAY_SERVE_FAIL_ON_RANK_ERROR", "0") + # The message to return when the replica is healthy. HEALTHY_MESSAGE = "success" diff --git a/python/ray/serve/_private/deployment_state.py b/python/ray/serve/_private/deployment_state.py index aa902230832f..546eadf9e537 100644 --- a/python/ray/serve/_private/deployment_state.py +++ b/python/ray/serve/_private/deployment_state.py @@ -38,6 +38,7 @@ MAX_DEPLOYMENT_CONSTRUCTOR_RETRY_COUNT, MAX_PER_REPLICA_RETRY_COUNT, RAY_SERVE_ENABLE_TASK_EVENTS, + RAY_SERVE_FAIL_ON_RANK_ERROR, RAY_SERVE_FORCE_STOP_UNHEALTHY_REPLICAS, RAY_SERVE_USE_COMPACT_SCHEDULING_STRATEGY, REPLICA_HEALTH_CHECK_UNHEALTHY_THRESHOLD, @@ -1657,21 +1658,38 @@ class DeploymentRankManager: - Global rank: Replica-level rank across all nodes (0, 1, 2, ...) """ - def __init__(self): + def __init__(self, fail_on_rank_error: bool = False): # Global rank manager (existing replica-level rank) self._replica_rank_manager = RankManager() + self._fail_on_rank_error = fail_on_rank_error + + def _execute_with_error_handling(self, func, safe_default, *args, **kwargs): + if self._fail_on_rank_error: + # Let exceptions propagate + return func(*args, **kwargs) + else: + # Catch exceptions and return safe default + try: + return func(*args, **kwargs) + except Exception as e: + logger.error(f"Error executing function {func.__name__}: {e}") + return safe_default def assign_rank(self, replica_id: str) -> ReplicaRank: """Assign rank to a replica.""" - if self.has_replica_rank(replica_id): - raise RuntimeError( - f"Rank for {replica_id} already assigned: {self._replica_rank_manager.get_rank(replica_id)}" - ) - # Assign global rank - rank = self._replica_rank_manager.assign_rank(replica_id) + def _assign_rank_impl(): + if self.has_replica_rank(replica_id): + raise RuntimeError( + f"Rank for {replica_id} already assigned: {self._replica_rank_manager.get_rank(replica_id)}" + ) + + # Assign global rank + rank = self._replica_rank_manager.assign_rank(replica_id) - return ReplicaRank(rank=rank) + return ReplicaRank(rank=rank) + + return self._execute_with_error_handling(_assign_rank_impl, ReplicaRank(rank=0)) def release_rank(self, replica_id: str) -> None: """Release rank for a replica. @@ -1682,11 +1700,15 @@ def release_rank(self, replica_id: str) -> None: Raises: RuntimeError: If replica doesn't have ranks """ - if not self.has_replica_rank(replica_id): - raise RuntimeError(f"Rank for {replica_id} not assigned") - # Release global rank - self._replica_rank_manager.release_rank(replica_id) + def _release_rank_impl(): + if not self.has_replica_rank(replica_id): + raise RuntimeError(f"Rank for {replica_id} not assigned") + + # Release global rank + self._replica_rank_manager.release_rank(replica_id) + + return self._execute_with_error_handling(_release_rank_impl, None) def recover_rank( self, @@ -1702,13 +1724,17 @@ def recover_rank( Raises: RuntimeError: If replica already has ranks assigned """ - if self.has_replica_rank(replica_id): - raise RuntimeError( - f"Rank for {replica_id} already assigned: {self._replica_rank_manager.get_rank(replica_id)}" - ) - # Recover global rank - self._replica_rank_manager.recover_rank(replica_id, rank.rank) + def _recover_rank_impl(): + if self.has_replica_rank(replica_id): + raise RuntimeError( + f"Rank for {replica_id} already assigned: {self._replica_rank_manager.get_rank(replica_id)}" + ) + + # Recover global rank + self._replica_rank_manager.recover_rank(replica_id, rank.rank) + + return self._execute_with_error_handling(_recover_rank_impl, None) def has_replica_rank(self, replica_id: str) -> bool: """Check if replica has a rank assigned.""" @@ -1726,11 +1752,17 @@ def get_replica_rank(self, replica_id: str) -> ReplicaRank: Raises: RuntimeError: If replica doesn't have ranks assigned """ - if not self.has_replica_rank(replica_id): - raise RuntimeError(f"Rank for {replica_id} not assigned") - global_rank = self._replica_rank_manager.get_rank(replica_id) - return ReplicaRank(rank=global_rank) + def _get_replica_rank_impl(): + if not self.has_replica_rank(replica_id): + raise RuntimeError(f"Rank for {replica_id} not assigned") + + global_rank = self._replica_rank_manager.get_rank(replica_id) + return ReplicaRank(rank=global_rank) + + return self._execute_with_error_handling( + _get_replica_rank_impl, ReplicaRank(rank=0) + ) def check_rank_consistency_and_reassign_minimally( self, @@ -1747,39 +1779,41 @@ def check_rank_consistency_and_reassign_minimally( Returns: List of replicas that need to be reconfigured with new ranks """ - if not active_replicas: - return [] - # Extract replica IDs from replicas - active_replica_ids = [ - replica.replica_id.unique_id for replica in active_replicas - ] + def _check_rank_consistency_impl(): + if not active_replicas: + return [] - # Create a mapping from replica ID to replica object for quick lookup - replica_id_to_replica = { - replica.replica_id.unique_id: replica for replica in active_replicas - } + # Extract replica IDs from replicas + active_replica_ids = [ + replica.replica_id.unique_id for replica in active_replicas + ] - # Track all replicas needing reconfiguration from any rank system - all_replica_ids_needing_reconfiguration = set() + # Create a mapping from replica ID to replica object for quick lookup + replica_id_to_replica = { + replica.replica_id.unique_id: replica for replica in active_replicas + } + + # Track all replicas needing reconfiguration from any rank system + all_replica_ids_needing_reconfiguration = set() - # STEP 1: Check global rank consistency - replica_ids_from_global = ( - self._replica_rank_manager.check_rank_consistency_and_reassign_minimally( + # STEP 1: Check global rank consistency + replica_ids_from_global = self._replica_rank_manager.check_rank_consistency_and_reassign_minimally( active_replica_ids ) - ) - all_replica_ids_needing_reconfiguration.update(replica_ids_from_global) - - # Convert replica IDs back to replica objects - # Filter out stale replicas that are not in the active set - replicas_needing_reconfiguration = [ - replica_id_to_replica[replica_id] - for replica_id in all_replica_ids_needing_reconfiguration - if replica_id in replica_id_to_replica - ] + all_replica_ids_needing_reconfiguration.update(replica_ids_from_global) + + # Convert replica IDs back to replica objects + # Filter out stale replicas that are not in the active set + replicas_needing_reconfiguration = [ + replica_id_to_replica[replica_id] + for replica_id in all_replica_ids_needing_reconfiguration + if replica_id in replica_id_to_replica + ] + + return replicas_needing_reconfiguration - return replicas_needing_reconfiguration + return self._execute_with_error_handling(_check_rank_consistency_impl, []) def clear(self) -> None: self._replica_rank_manager.clear() @@ -1829,7 +1863,9 @@ def __init__( DeploymentStatusTrigger.CONFIG_UPDATE_STARTED, ) - self._rank_manager = DeploymentRankManager() + self._rank_manager = DeploymentRankManager( + fail_on_rank_error=RAY_SERVE_FAIL_ON_RANK_ERROR + ) self.replica_average_ongoing_requests: Dict[str, float] = {} diff --git a/python/ray/serve/tests/BUILD.bazel b/python/ray/serve/tests/BUILD.bazel index 408995d6a6cf..06b4005ad628 100644 --- a/python/ray/serve/tests/BUILD.bazel +++ b/python/ray/serve/tests/BUILD.bazel @@ -146,6 +146,9 @@ py_test_module_list( # Medium tests, don't run on windows. py_test_module_list( size = "medium", + env = { + "RAY_SERVE_FAIL_ON_RANK_ERROR": "1", + }, files = [ "test_fastapi.py", "test_gcs_failure.py", diff --git a/python/ray/serve/tests/unit/BUILD.bazel b/python/ray/serve/tests/unit/BUILD.bazel index 5ab4bb752cc0..945a45cf0022 100644 --- a/python/ray/serve/tests/unit/BUILD.bazel +++ b/python/ray/serve/tests/unit/BUILD.bazel @@ -24,6 +24,7 @@ py_test_module_list( timeout = "short", env = { "RAY_SERVE_USE_COMPACT_SCHEDULING_STRATEGY": "1", + "RAY_SERVE_FAIL_ON_RANK_ERROR": "1", }, files = [ "test_deployment_scheduler.py", @@ -47,6 +48,7 @@ py_test_module_list_with_env_variants( "metr_disab": { "env": { "RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE": "0", + "RAY_SERVE_FAIL_ON_RANK_ERROR": "1", }, "name_suffix": "_metr_disab", }, @@ -54,6 +56,7 @@ py_test_module_list_with_env_variants( "env": { "RAY_SERVE_AGGREGATE_METRICS_AT_CONTROLLER": "1", "RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE": "1", + "RAY_SERVE_FAIL_ON_RANK_ERROR": "1", }, "name_suffix": "_metr_agg_at_controller", }, @@ -61,6 +64,7 @@ py_test_module_list_with_env_variants( "env": { "RAY_SERVE_AGGREGATE_METRICS_AT_CONTROLLER": "1", "RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE": "0", + "RAY_SERVE_FAIL_ON_RANK_ERROR": "1", }, "name_suffix": "_metr_agg_at_controller_and_replicas", }, diff --git a/python/ray/serve/tests/unit/test_deployment_rank_manager.py b/python/ray/serve/tests/unit/test_deployment_rank_manager.py index 4270d3e857ec..e7be0e515308 100644 --- a/python/ray/serve/tests/unit/test_deployment_rank_manager.py +++ b/python/ray/serve/tests/unit/test_deployment_rank_manager.py @@ -306,6 +306,181 @@ def test_check_rank_consistency_duplicate_ranks_fails(self): ) +class TestDeploymentRankManagerErrorHandling: + """Test cases for DeploymentRankManager error handling with fail_on_rank_error flag. + + This test class can be easily removed in the future when the error handling + feature is no longer needed. + """ + + def test_assign_rank_error_with_fail_on_rank_error_true(self): + """Test that assign_rank raises exception when fail_on_rank_error=True.""" + rank_manager = DeploymentRankManager(fail_on_rank_error=True) + rank_manager.assign_rank("replica_1") + + # Should raise RuntimeError for duplicate assignment + with pytest.raises(RuntimeError, match="already assigned"): + rank_manager.assign_rank("replica_1") + + def test_assign_rank_error_with_fail_on_rank_error_false(self): + """Test that assign_rank returns safe default when fail_on_rank_error=False.""" + rank_manager = DeploymentRankManager(fail_on_rank_error=False) + rank_manager.assign_rank("replica_1") + + # Should return safe default (ReplicaRank(rank=0)) instead of raising + result = rank_manager.assign_rank("replica_1") + assert result is not None + assert isinstance(result, ReplicaRank) + assert result.rank == 0 + + def test_release_rank_error_with_fail_on_rank_error_true(self): + """Test that release_rank raises exception when fail_on_rank_error=True.""" + rank_manager = DeploymentRankManager(fail_on_rank_error=True) + + # Should raise RuntimeError for non-existent replica + with pytest.raises(RuntimeError, match="not assigned"): + rank_manager.release_rank("nonexistent") + + def test_release_rank_error_with_fail_on_rank_error_false(self): + """Test that release_rank returns safe default when fail_on_rank_error=False.""" + rank_manager = DeploymentRankManager(fail_on_rank_error=False) + + # Should return None instead of raising + result = rank_manager.release_rank("nonexistent") + assert result is None + + def test_recover_rank_error_with_fail_on_rank_error_true(self): + """Test that recover_rank raises exception when fail_on_rank_error=True.""" + rank_manager = DeploymentRankManager(fail_on_rank_error=True) + rank_manager.assign_rank("replica_1") + + # Should raise RuntimeError for duplicate recovery + with pytest.raises(RuntimeError, match="already assigned"): + rank_manager.recover_rank("replica_1", ReplicaRank(rank=5)) + + def test_recover_rank_error_with_fail_on_rank_error_false(self): + """Test that recover_rank returns safe default when fail_on_rank_error=False.""" + rank_manager = DeploymentRankManager(fail_on_rank_error=False) + rank_manager.assign_rank("replica_1") + + # Should return None instead of raising + result = rank_manager.recover_rank("replica_1", ReplicaRank(rank=5)) + assert result is None + + def test_get_replica_rank_error_with_fail_on_rank_error_true(self): + """Test that get_replica_rank raises exception when fail_on_rank_error=True.""" + rank_manager = DeploymentRankManager(fail_on_rank_error=True) + + # Should raise RuntimeError for non-existent replica + with pytest.raises(RuntimeError, match="not assigned"): + rank_manager.get_replica_rank("nonexistent") + + def test_get_replica_rank_error_with_fail_on_rank_error_false(self): + """Test that get_replica_rank returns safe default when fail_on_rank_error=False.""" + rank_manager = DeploymentRankManager(fail_on_rank_error=False) + + # Should return safe default (ReplicaRank(rank=0)) instead of raising + result = rank_manager.get_replica_rank("nonexistent") + assert result is not None + assert isinstance(result, ReplicaRank) + assert result.rank == 0 + + def test_check_rank_consistency_error_with_fail_on_rank_error_true(self): + """Test that check_rank_consistency raises exception when fail_on_rank_error=True.""" + rank_manager = DeploymentRankManager(fail_on_rank_error=True) + replica1 = MockDeploymentReplica("replica_1") + + # Set up invalid state: active replica without rank + with pytest.raises(RuntimeError, match="Rank system is in an invalid state"): + rank_manager.check_rank_consistency_and_reassign_minimally([replica1]) + + def test_check_rank_consistency_error_with_fail_on_rank_error_false(self): + """Test that check_rank_consistency returns safe default when fail_on_rank_error=False.""" + rank_manager = DeploymentRankManager(fail_on_rank_error=False) + replica1 = MockDeploymentReplica("replica_1") + + # Should return empty list instead of raising + result = rank_manager.check_rank_consistency_and_reassign_minimally([replica1]) + assert result == [] + + def test_check_rank_consistency_with_stale_ranks_error_handling(self): + """Test check_rank_consistency with stale ranks and fail_on_rank_error=False.""" + rank_manager = DeploymentRankManager(fail_on_rank_error=False) + replica1 = MockDeploymentReplica("replica_1") + + # Set up stale rank (replica not in active list) + rank_manager.assign_rank("replica_1") + rank_manager.assign_rank("stale_replica") + + # Should return empty list instead of raising + result = rank_manager.check_rank_consistency_and_reassign_minimally([replica1]) + assert result == [] + + def test_check_rank_consistency_with_duplicate_ranks_error_handling(self): + """Test check_rank_consistency with duplicate ranks and fail_on_rank_error=False.""" + rank_manager = DeploymentRankManager(fail_on_rank_error=False) + replica1 = MockDeploymentReplica("replica_1") + replica2 = MockDeploymentReplica("replica_2") + + # Manually create duplicate ranks + rank_manager.recover_rank("replica_1", ReplicaRank(rank=0)) + rank_manager.recover_rank("replica_2", ReplicaRank(rank=0)) + + # Should return empty list instead of raising + result = rank_manager.check_rank_consistency_and_reassign_minimally( + [replica1, replica2] + ) + assert result == [] + + def test_normal_operations_work_with_fail_on_rank_error_false(self): + """Test that normal operations still work correctly with fail_on_rank_error=False.""" + rank_manager = DeploymentRankManager(fail_on_rank_error=False) + + # Test normal assign + rank1 = rank_manager.assign_rank("replica_1") + assert rank1.rank == 0 + + # Test normal get + rank = rank_manager.get_replica_rank("replica_1") + assert rank.rank == 0 + + # Test normal release + rank_manager.release_rank("replica_1") + assert not rank_manager.has_replica_rank("replica_1") + + # Test normal recover + rank_manager.recover_rank("replica_2", ReplicaRank(rank=5)) + assert rank_manager.get_replica_rank("replica_2").rank == 5 + + # Test normal consistency check + replica2 = MockDeploymentReplica("replica_2") + replica3 = MockDeploymentReplica("replica_3") + rank_manager.assign_rank("replica_3") + + result = rank_manager.check_rank_consistency_and_reassign_minimally( + [replica2, replica3] + ) + # Should reassign to make ranks contiguous + assert len(result) > 0 + + def test_multiple_errors_do_not_crash_with_fail_on_rank_error_false(self): + """Test that multiple consecutive errors don't crash when fail_on_rank_error=False.""" + rank_manager = DeploymentRankManager(fail_on_rank_error=False) + + # Multiple errors in a row should all return safe defaults + result1 = rank_manager.get_replica_rank("nonexistent1") + result2 = rank_manager.get_replica_rank("nonexistent2") + result3 = rank_manager.release_rank("nonexistent3") + + assert result1 is not None + assert result2 is not None + assert result3 is None + + # And normal operations should still work after errors + rank = rank_manager.assign_rank("replica_1") + assert rank.rank == 0 + + if __name__ == "__main__": import sys From dd960d357d1278a65e0d929f0588d33156791f24 Mon Sep 17 00:00:00 2001 From: abrar Date: Thu, 13 Nov 2025 00:57:52 +0000 Subject: [PATCH 08/11] add fail on rank Signed-off-by: abrar --- python/ray/serve/_private/deployment_state.py | 29 +++++++++++++++++-- 1 file changed, 26 insertions(+), 3 deletions(-) diff --git a/python/ray/serve/_private/deployment_state.py b/python/ray/serve/_private/deployment_state.py index 546eadf9e537..7e2342f419a6 100644 --- a/python/ray/serve/_private/deployment_state.py +++ b/python/ray/serve/_private/deployment_state.py @@ -1472,9 +1472,12 @@ def assign_rank(self, key: str) -> int: raise RuntimeError(f"Rank for {key} already assigned: {self._ranks[key]}") if self._released_ranks: + # Reuse the smallest released rank rank = min(self._released_ranks) self._released_ranks.remove(rank) else: + # Assign the next available rank + # This is the first time we're assigning a rank to this replica rank = self._next_rank self._next_rank += 1 @@ -1658,7 +1661,7 @@ class DeploymentRankManager: - Global rank: Replica-level rank across all nodes (0, 1, 2, ...) """ - def __init__(self, fail_on_rank_error: bool = False): + def __init__(self, fail_on_rank_error: bool = True): # Global rank manager (existing replica-level rank) self._replica_rank_manager = RankManager() self._fail_on_rank_error = fail_on_rank_error @@ -1676,7 +1679,17 @@ def _execute_with_error_handling(self, func, safe_default, *args, **kwargs): return safe_default def assign_rank(self, replica_id: str) -> ReplicaRank: - """Assign rank to a replica.""" + """Assign a rank to a new replica. + + Args: + replica_id: The unique ID of the replica + + Returns: + ReplicaRank object with the assigned rank + + Raises: + RuntimeError: If the replica already has a rank assigned + """ def _assign_rank_impl(): if self.has_replica_rank(replica_id): @@ -1737,7 +1750,17 @@ def _recover_rank_impl(): return self._execute_with_error_handling(_recover_rank_impl, None) def has_replica_rank(self, replica_id: str) -> bool: - """Check if replica has a rank assigned.""" + """Check if replica has a rank assigned. + + Args: + replica_id: The unique ID of the replica + + Returns: + True if the replica has a rank assigned, False otherwise + + Raises: + RuntimeError: If the replica doesn't have ranks assigned + """ return self._replica_rank_manager.has_rank(replica_id) def get_replica_rank(self, replica_id: str) -> ReplicaRank: From 6d40e49e4c3c1351c3ef049ec6cc5662ee54a023 Mon Sep 17 00:00:00 2001 From: abrar Date: Thu, 13 Nov 2025 04:06:11 +0000 Subject: [PATCH 09/11] remove unused code Signed-off-by: abrar --- python/ray/serve/_private/deployment_state.py | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/python/ray/serve/_private/deployment_state.py b/python/ray/serve/_private/deployment_state.py index 7e2342f419a6..9460147e791a 100644 --- a/python/ray/serve/_private/deployment_state.py +++ b/python/ray/serve/_private/deployment_state.py @@ -1488,6 +1488,8 @@ def release_rank(self, key: str) -> None: if key not in self._ranks: raise RuntimeError(f"Rank for {key} not assigned") rank = self._ranks.pop(key) + # Add the released rank to the set of released ranks + # This rank can be reused for a new replica self._released_ranks.add(rank) def recover_rank(self, key: str, rank: int) -> None: @@ -1538,7 +1540,6 @@ def check_rank_consistency_and_reassign_minimally( return [] active_keys_set = set(active_keys) - keys_needs_reconfiguration = set() # Check for stale ranks - this should never happen stale_keys = set(self._ranks.keys()) - active_keys_set @@ -1587,13 +1588,7 @@ def check_rank_consistency_and_reassign_minimally( self._perform_minimal_rank_reassignment(active_keys) ) - # Combine all keys that need reconfiguration - all_keys_needing_reconfiguration = list(keys_needs_reconfiguration) - all_keys_needing_reconfiguration.extend( - keys_needing_reconfiguration_from_reassignment - ) - - return all_keys_needing_reconfiguration + return keys_needing_reconfiguration_from_reassignment def _perform_minimal_rank_reassignment(self, active_keys: List[str]) -> List[str]: """Perform minimal rank reassignment to achieve contiguity. From d7131f0bbf7c4a2c7a733699e01d24d5a79cadd7 Mon Sep 17 00:00:00 2001 From: abrar Date: Fri, 14 Nov 2025 07:14:22 +0000 Subject: [PATCH 10/11] make rank assign part of control loop Signed-off-by: abrar --- python/ray/serve/_private/deployment_state.py | 46 ++++++++----------- 1 file changed, 19 insertions(+), 27 deletions(-) diff --git a/python/ray/serve/_private/deployment_state.py b/python/ray/serve/_private/deployment_state.py index f88ebdd08be8..8e9089135114 100644 --- a/python/ray/serve/_private/deployment_state.py +++ b/python/ray/serve/_private/deployment_state.py @@ -576,39 +576,12 @@ def on_scheduled( self._actor_handle = actor_handle self._placement_group = placement_group - # Perform auto method name translation for java handles. - # See https://github.com/ray-project/ray/issues/21474 - deployment_config = copy(self._version.deployment_config) - deployment_config.user_config = self._format_user_config( - deployment_config.user_config - ) if self._is_cross_language: self._actor_handle = JavaActorHandleProxy(self._actor_handle) self._allocated_obj_ref = self._actor_handle.is_allocated.remote() - - def on_completed(args): - self._rank = self._assign_rank_callback(self._replica_id.unique_id) - self._ready_obj_ref = self._actor_handle.is_initialized.remote( - deployment_config.to_proto_bytes() - ) - - self._allocated_obj_ref._on_completed(on_completed) else: self._allocated_obj_ref = self._actor_handle.is_allocated.remote() - def on_completed(args): - self._rank = self._assign_rank_callback(self._replica_id.unique_id) - - replica_ready_check_func = ( - self._actor_handle.initialize_and_get_metadata - ) - self._ready_obj_ref = replica_ready_check_func.remote( - deployment_config, - self._rank, - ) - - self._allocated_obj_ref._on_completed(on_completed) - def _format_user_config(self, user_config: Any): temp = copy(user_config) if user_config is not None and self._deployment_is_cross_language: @@ -753,6 +726,25 @@ def check_ready(self) -> Tuple[ReplicaStartupStatus, Optional[str]]: return ReplicaStartupStatus.FAILED, msg if self._ready_obj_ref is None: + # Perform auto method name translation for java handles. + # See https://github.com/ray-project/ray/issues/21474 + deployment_config = copy(self._version.deployment_config) + deployment_config.user_config = self._format_user_config( + deployment_config.user_config + ) + if self._is_cross_language: + self._ready_obj_ref = self._actor_handle.is_initialized.remote( + deployment_config.to_proto_bytes() + ) + else: + replica_ready_check_func = ( + self._actor_handle.initialize_and_get_metadata + ) + self._rank = self._assign_rank_callback(self._replica_id.unique_id) + self._ready_obj_ref = replica_ready_check_func.remote( + deployment_config, self._rank + ) + return ReplicaStartupStatus.PENDING_INITIALIZATION, None # Check whether replica initialization has completed. From 29ce2666af735b93fd5e61d46df2c689049ee635 Mon Sep 17 00:00:00 2001 From: abrar Date: Fri, 14 Nov 2025 07:26:12 +0000 Subject: [PATCH 11/11] fix function types Signed-off-by: abrar --- python/ray/serve/_private/replica.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/python/ray/serve/_private/replica.py b/python/ray/serve/_private/replica.py index b34168c84837..c648ebc59845 100644 --- a/python/ray/serve/_private/replica.py +++ b/python/ray/serve/_private/replica.py @@ -909,7 +909,9 @@ async def handle_request_with_rejection( async def _on_initialized(self): raise NotImplementedError - async def initialize(self, deployment_config: DeploymentConfig, rank: ReplicaRank): + async def initialize( + self, deployment_config: Optional[DeploymentConfig], rank: Optional[ReplicaRank] + ): if rank is not None: self._rank = rank self._set_internal_replica_context( @@ -945,7 +947,7 @@ async def initialize(self, deployment_config: DeploymentConfig, rank: ReplicaRan record_autoscaling_stats_fn=self._user_callable_wrapper.call_record_autoscaling_stats, ) - if deployment_config: + if deployment_config is not None: await self._user_callable_wrapper.set_sync_method_threadpool_limit( deployment_config.max_ongoing_requests )