diff --git a/python/ray/serve/_private/deployment_state.py b/python/ray/serve/_private/deployment_state.py index 0beec58c7815..8e9089135114 100644 --- a/python/ray/serve/_private/deployment_state.py +++ b/python/ray/serve/_private/deployment_state.py @@ -254,6 +254,7 @@ def __init__( self._docs_path: Optional[str] = None self._route_patterns: Optional[List[str]] = None # Rank assigned to the replica. + self._assign_rank_callback: Optional[Callable[[ReplicaID], ReplicaRank]] = None self._rank: Optional[ReplicaRank] = None # Populated in `on_scheduled` or `recover`. self._actor_handle: ActorHandle = None @@ -442,14 +443,16 @@ def initialization_latency_s(self) -> Optional[float]: return self._initialization_latency_s def start( - self, deployment_info: DeploymentInfo, rank: ReplicaRank + self, + deployment_info: DeploymentInfo, + assign_rank_callback: Callable[[ReplicaID], ReplicaRank], ) -> ReplicaSchedulingRequest: """Start the current DeploymentReplica instance. The replica will be in the STARTING and PENDING_ALLOCATION states until the deployment scheduler schedules the underlying actor. """ - self._rank = rank # Store the rank assigned to this replica + self._assign_rank_callback = assign_rank_callback self._actor_resources = deployment_info.replica_config.resource_dict self._ingress = deployment_info.ingress # it is currently not possible to create a placement group @@ -493,7 +496,6 @@ def start( self._version, deployment_info.ingress, deployment_info.route_prefix, - rank, ) # TODO(simon): unify the constructor arguments across language elif ( @@ -574,31 +576,11 @@ def on_scheduled( self._actor_handle = actor_handle self._placement_group = placement_group - # Perform auto method name translation for java handles. - # See https://github.com/ray-project/ray/issues/21474 - deployment_config = copy(self._version.deployment_config) - deployment_config.user_config = self._format_user_config( - deployment_config.user_config - ) if self._is_cross_language: self._actor_handle = JavaActorHandleProxy(self._actor_handle) self._allocated_obj_ref = self._actor_handle.is_allocated.remote() - self._ready_obj_ref = self._actor_handle.is_initialized.remote( - deployment_config.to_proto_bytes() - ) else: self._allocated_obj_ref = self._actor_handle.is_allocated.remote() - replica_ready_check_func = self._actor_handle.initialize_and_get_metadata - self._ready_obj_ref = replica_ready_check_func.remote( - deployment_config, - # Ensure that `is_allocated` will execute - # before `initialize_and_get_metadata`, - # because `initialize_and_get_metadata` runs - # user code that could block the replica - # asyncio loop. If that happens before `is_allocated` is executed, - # the `is_allocated` call won't be able to run. - self._allocated_obj_ref, - ) def _format_user_config(self, user_config: Any): temp = copy(user_config) @@ -743,6 +725,28 @@ def check_ready(self) -> Tuple[ReplicaStartupStatus, Optional[str]]: logger.exception(msg) return ReplicaStartupStatus.FAILED, msg + if self._ready_obj_ref is None: + # Perform auto method name translation for java handles. + # See https://github.com/ray-project/ray/issues/21474 + deployment_config = copy(self._version.deployment_config) + deployment_config.user_config = self._format_user_config( + deployment_config.user_config + ) + if self._is_cross_language: + self._ready_obj_ref = self._actor_handle.is_initialized.remote( + deployment_config.to_proto_bytes() + ) + else: + replica_ready_check_func = ( + self._actor_handle.initialize_and_get_metadata + ) + self._rank = self._assign_rank_callback(self._replica_id.unique_id) + self._ready_obj_ref = replica_ready_check_func.remote( + deployment_config, self._rank + ) + + return ReplicaStartupStatus.PENDING_INITIALIZATION, None + # Check whether replica initialization has completed. replica_ready = check_obj_ref_ready_nowait(self._ready_obj_ref) # In case of deployment constructor failure, ray.get will help to @@ -1166,12 +1170,16 @@ def initialization_latency_s(self) -> Optional[float]: return self._actor.initialization_latency_s def start( - self, deployment_info: DeploymentInfo, rank: ReplicaRank + self, + deployment_info: DeploymentInfo, + assign_rank_callback: Callable[[ReplicaID], ReplicaRank], ) -> ReplicaSchedulingRequest: """ Start a new actor for current DeploymentReplica instance. """ - replica_scheduling_request = self._actor.start(deployment_info, rank=rank) + replica_scheduling_request = self._actor.start( + deployment_info, assign_rank_callback=assign_rank_callback + ) self._start_time = time.time() self._logged_shutdown_message = False self.update_actor_details(start_time_s=self._start_time) @@ -2534,18 +2542,13 @@ def scale_deployment_replicas( for _ in range(to_add): replica_id = ReplicaID(get_random_string(), deployment_id=self._id) - # Assign rank during replica creation (startup process) - assigned_rank = self._rank_manager.assign_rank(replica_id.unique_id) - - logger.debug( - f"Assigned rank {assigned_rank.rank} to new replica {replica_id.unique_id} during startup" - ) new_deployment_replica = DeploymentReplica( replica_id, self._target_state.version, ) scheduling_request = new_deployment_replica.start( - self._target_state.info, rank=assigned_rank + self._target_state.info, + assign_rank_callback=self._rank_manager.assign_rank, ) upscale.append(scheduling_request) diff --git a/python/ray/serve/_private/replica.py b/python/ray/serve/_private/replica.py index 32b5721f3b71..c648ebc59845 100644 --- a/python/ray/serve/_private/replica.py +++ b/python/ray/serve/_private/replica.py @@ -510,7 +510,6 @@ def __init__( version: DeploymentVersion, ingress: bool, route_prefix: str, - rank: ReplicaRank, ): self._version = version self._replica_id = replica_id @@ -561,7 +560,7 @@ def __init__( # Set metadata for logs and metrics. # servable_object will be populated in `initialize_and_get_metadata`. - self._set_internal_replica_context(servable_object=None, rank=rank) + self._set_internal_replica_context(servable_object=None, rank=None) self._metrics_manager = create_replica_metrics_manager( replica_id=replica_id, @@ -575,7 +574,7 @@ def __init__( self._http_port: Optional[int] = None self._grpc_port: Optional[int] = None - self._rank = rank + self._rank: Optional[ReplicaRank] = None @property def max_ongoing_requests(self) -> int: @@ -910,7 +909,14 @@ async def handle_request_with_rejection( async def _on_initialized(self): raise NotImplementedError - async def initialize(self, deployment_config: DeploymentConfig): + async def initialize( + self, deployment_config: Optional[DeploymentConfig], rank: Optional[ReplicaRank] + ): + if rank is not None: + self._rank = rank + self._set_internal_replica_context( + servable_object=self._user_callable_wrapper.user_callable, rank=rank + ) try: # Ensure that initialization is only performed once. # When controller restarts, it will call this method again. @@ -941,7 +947,7 @@ async def initialize(self, deployment_config: DeploymentConfig): record_autoscaling_stats_fn=self._user_callable_wrapper.call_record_autoscaling_stats, ) - if deployment_config: + if deployment_config is not None: await self._user_callable_wrapper.set_sync_method_threadpool_limit( deployment_config.max_ongoing_requests ) @@ -1186,7 +1192,6 @@ async def __init__( version: DeploymentVersion, ingress: bool, route_prefix: str, - rank: ReplicaRank, ): deployment_config = DeploymentConfig.from_proto_bytes( deployment_config_proto_bytes @@ -1203,7 +1208,6 @@ async def __init__( version=version, ingress=ingress, route_prefix=route_prefix, - rank=rank, ) def push_proxy_handle(self, handle: ActorHandle): @@ -1282,7 +1286,7 @@ async def is_allocated(self) -> str: ) async def initialize_and_get_metadata( - self, deployment_config: DeploymentConfig = None, _after: Optional[Any] = None + self, deployment_config: DeploymentConfig = None, rank: ReplicaRank = None ) -> ReplicaMetadata: """Handles initializing the replica. @@ -1295,7 +1299,7 @@ async def initialize_and_get_metadata( """ # Unused `_after` argument is for scheduling: passing an ObjectRef # allows delaying this call until after the `_after` call has returned. - await self._replica_impl.initialize(deployment_config) + await self._replica_impl.initialize(deployment_config, rank) return self._replica_impl.get_metadata() async def check_health(self): diff --git a/python/ray/serve/tests/test_cluster.py b/python/ray/serve/tests/test_cluster.py index eb62427df0fe..330e7bb9cabf 100644 --- a/python/ray/serve/tests/test_cluster.py +++ b/python/ray/serve/tests/test_cluster.py @@ -176,12 +176,21 @@ def get_replicas(replica_state): ) return replicas.get([replica_state]) - # wait for serve to start the replica, and catch a reference to it. + # wait for serve to start the replica wait_for_condition(lambda: len(get_replicas(ReplicaState.STARTING)) > 0) - replica = get_replicas(ReplicaState.STARTING)[0] # currently there are no resources to allocate the replica - assert replica.check_started()[0] == ReplicaStartupStatus.PENDING_ALLOCATION + def get_starting_replica(): + replicas = get_replicas(ReplicaState.STARTING) + return replicas[0] if replicas else None + + def is_pending_allocation(): + replica = get_starting_replica() + if replica is None: + return False + return replica.check_started()[0] == ReplicaStartupStatus.PENDING_ALLOCATION + + wait_for_condition(is_pending_allocation) # add the necessary resources to allocate the replica cluster.add_node(num_cpus=4) @@ -189,17 +198,34 @@ def get_replicas(replica_state): wait_for_condition(lambda: (ray.available_resources().get("CPU", 0) >= 2)) def is_replica_pending_initialization(): + replica = get_starting_replica() + if replica is None: + return False status, _ = replica.check_started() - print(status) return status == ReplicaStartupStatus.PENDING_INITIALIZATION wait_for_condition(is_replica_pending_initialization, timeout=25) # send signal to complete replica initialization - signal.send.remote() - wait_for_condition( - lambda: replica.check_started()[0] == ReplicaStartupStatus.SUCCEEDED - ) + ray.get(signal.send.remote()) + + def check_succeeded(): + # After initialization succeeds, replica transitions to RUNNING state + # So check both STARTING and RUNNING states + replica = get_starting_replica() + if replica: + status, _ = replica.check_started() + if status == ReplicaStartupStatus.SUCCEEDED: + return True + + # Check if replica has moved to RUNNING state (which means it succeeded) + running_replicas = get_replicas(ReplicaState.RUNNING) + if running_replicas and len(running_replicas) > 0: + return True + + return False + + wait_for_condition(check_succeeded) @serve.deployment diff --git a/python/ray/serve/tests/unit/test_deployment_state.py b/python/ray/serve/tests/unit/test_deployment_state.py index c6891626570e..9caa560a6d49 100644 --- a/python/ray/serve/tests/unit/test_deployment_state.py +++ b/python/ray/serve/tests/unit/test_deployment_state.py @@ -1,6 +1,6 @@ import sys from copy import deepcopy -from typing import Any, Dict, List, Optional, Tuple +from typing import Any, Callable, Dict, List, Optional, Tuple from unittest.mock import Mock, patch import pytest @@ -108,6 +108,7 @@ def __init__( self._initialization_latency_s = -1 self._docs_path = None self._rank = replica_rank_context.get(replica_id.unique_id, None) + self._assign_rank_callback = None @property def is_cross_language(self) -> bool: @@ -226,10 +227,15 @@ def set_node_id(self, node_id: str): def set_actor_id(self, actor_id: str): self._actor_id = actor_id - def start(self, deployment_info: DeploymentInfo, rank: ReplicaRank): + def start( + self, + deployment_info: DeploymentInfo, + assign_rank_callback: Callable[[ReplicaID], ReplicaRank], + ): self.started = True - self._rank = rank - replica_rank_context[self._replica_id.unique_id] = rank + self._assign_rank_callback = assign_rank_callback + self._rank = assign_rank_callback(self._replica_id.unique_id) + replica_rank_context[self._replica_id.unique_id] = self._rank def _on_scheduled_stub(*args, **kwargs): pass @@ -2682,7 +2688,9 @@ def test_max_concurrency_override(self): ) max_ongoing_requests = DEFAULT_MAX_CONCURRENCY_ASYNC + 1 d_info, _ = deployment_info(max_ongoing_requests=max_ongoing_requests) - replica_scheduling_request = actor_replica.start(d_info, rank=0) + replica_scheduling_request = actor_replica.start( + d_info, assign_rank_callback=lambda x: 0 + ) assert ( "max_concurrency" in replica_scheduling_request.actor_options and replica_scheduling_request.actor_options["max_concurrency"]