Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 35 additions & 32 deletions python/ray/serve/_private/deployment_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ def __init__(
self._docs_path: Optional[str] = None
self._route_patterns: Optional[List[str]] = None
# Rank assigned to the replica.
self._assign_rank_callback: Optional[Callable[[ReplicaID], ReplicaRank]] = None
self._rank: Optional[ReplicaRank] = None
# Populated in `on_scheduled` or `recover`.
self._actor_handle: ActorHandle = None
Expand Down Expand Up @@ -442,14 +443,16 @@ def initialization_latency_s(self) -> Optional[float]:
return self._initialization_latency_s

def start(
self, deployment_info: DeploymentInfo, rank: ReplicaRank
self,
deployment_info: DeploymentInfo,
assign_rank_callback: Callable[[ReplicaID], ReplicaRank],
) -> ReplicaSchedulingRequest:
"""Start the current DeploymentReplica instance.

The replica will be in the STARTING and PENDING_ALLOCATION states
until the deployment scheduler schedules the underlying actor.
"""
self._rank = rank # Store the rank assigned to this replica
self._assign_rank_callback = assign_rank_callback
self._actor_resources = deployment_info.replica_config.resource_dict
self._ingress = deployment_info.ingress
# it is currently not possible to create a placement group
Expand Down Expand Up @@ -493,7 +496,6 @@ def start(
self._version,
deployment_info.ingress,
deployment_info.route_prefix,
rank,
)
# TODO(simon): unify the constructor arguments across language
elif (
Expand Down Expand Up @@ -574,31 +576,11 @@ def on_scheduled(
self._actor_handle = actor_handle
self._placement_group = placement_group

# Perform auto method name translation for java handles.
# See https://github.com/ray-project/ray/issues/21474
deployment_config = copy(self._version.deployment_config)
deployment_config.user_config = self._format_user_config(
deployment_config.user_config
)
if self._is_cross_language:
self._actor_handle = JavaActorHandleProxy(self._actor_handle)
self._allocated_obj_ref = self._actor_handle.is_allocated.remote()
self._ready_obj_ref = self._actor_handle.is_initialized.remote(
deployment_config.to_proto_bytes()
)
else:
self._allocated_obj_ref = self._actor_handle.is_allocated.remote()
replica_ready_check_func = self._actor_handle.initialize_and_get_metadata
self._ready_obj_ref = replica_ready_check_func.remote(
deployment_config,
# Ensure that `is_allocated` will execute
# before `initialize_and_get_metadata`,
# because `initialize_and_get_metadata` runs
# user code that could block the replica
# asyncio loop. If that happens before `is_allocated` is executed,
# the `is_allocated` call won't be able to run.
self._allocated_obj_ref,
)

def _format_user_config(self, user_config: Any):
temp = copy(user_config)
Expand Down Expand Up @@ -743,6 +725,28 @@ def check_ready(self) -> Tuple[ReplicaStartupStatus, Optional[str]]:
logger.exception(msg)
return ReplicaStartupStatus.FAILED, msg

if self._ready_obj_ref is None:
# Perform auto method name translation for java handles.
# See https://github.com/ray-project/ray/issues/21474
deployment_config = copy(self._version.deployment_config)
deployment_config.user_config = self._format_user_config(
deployment_config.user_config
)
if self._is_cross_language:
self._ready_obj_ref = self._actor_handle.is_initialized.remote(
deployment_config.to_proto_bytes()
)
else:
replica_ready_check_func = (
self._actor_handle.initialize_and_get_metadata
)
self._rank = self._assign_rank_callback(self._replica_id.unique_id)
self._ready_obj_ref = replica_ready_check_func.remote(
deployment_config, self._rank
)

return ReplicaStartupStatus.PENDING_INITIALIZATION, None

# Check whether replica initialization has completed.
replica_ready = check_obj_ref_ready_nowait(self._ready_obj_ref)
# In case of deployment constructor failure, ray.get will help to
Expand Down Expand Up @@ -1166,12 +1170,16 @@ def initialization_latency_s(self) -> Optional[float]:
return self._actor.initialization_latency_s

def start(
self, deployment_info: DeploymentInfo, rank: ReplicaRank
self,
deployment_info: DeploymentInfo,
assign_rank_callback: Callable[[ReplicaID], ReplicaRank],
) -> ReplicaSchedulingRequest:
"""
Start a new actor for current DeploymentReplica instance.
"""
replica_scheduling_request = self._actor.start(deployment_info, rank=rank)
replica_scheduling_request = self._actor.start(
deployment_info, assign_rank_callback=assign_rank_callback
)
self._start_time = time.time()
self._logged_shutdown_message = False
self.update_actor_details(start_time_s=self._start_time)
Expand Down Expand Up @@ -2534,18 +2542,13 @@ def scale_deployment_replicas(
for _ in range(to_add):
replica_id = ReplicaID(get_random_string(), deployment_id=self._id)

# Assign rank during replica creation (startup process)
assigned_rank = self._rank_manager.assign_rank(replica_id.unique_id)

logger.debug(
f"Assigned rank {assigned_rank.rank} to new replica {replica_id.unique_id} during startup"
)
new_deployment_replica = DeploymentReplica(
replica_id,
self._target_state.version,
)
scheduling_request = new_deployment_replica.start(
self._target_state.info, rank=assigned_rank
self._target_state.info,
assign_rank_callback=self._rank_manager.assign_rank,
)

upscale.append(scheduling_request)
Expand Down
22 changes: 13 additions & 9 deletions python/ray/serve/_private/replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,6 @@ def __init__(
version: DeploymentVersion,
ingress: bool,
route_prefix: str,
rank: ReplicaRank,
):
self._version = version
self._replica_id = replica_id
Expand Down Expand Up @@ -561,7 +560,7 @@ def __init__(

# Set metadata for logs and metrics.
# servable_object will be populated in `initialize_and_get_metadata`.
self._set_internal_replica_context(servable_object=None, rank=rank)
self._set_internal_replica_context(servable_object=None, rank=None)

self._metrics_manager = create_replica_metrics_manager(
replica_id=replica_id,
Expand All @@ -575,7 +574,7 @@ def __init__(
self._http_port: Optional[int] = None
self._grpc_port: Optional[int] = None

self._rank = rank
self._rank: Optional[ReplicaRank] = None

@property
def max_ongoing_requests(self) -> int:
Expand Down Expand Up @@ -910,7 +909,14 @@ async def handle_request_with_rejection(
async def _on_initialized(self):
raise NotImplementedError

async def initialize(self, deployment_config: DeploymentConfig):
async def initialize(
self, deployment_config: Optional[DeploymentConfig], rank: Optional[ReplicaRank]
):
if rank is not None:
self._rank = rank
self._set_internal_replica_context(
servable_object=self._user_callable_wrapper.user_callable, rank=rank
)
try:
# Ensure that initialization is only performed once.
# When controller restarts, it will call this method again.
Expand Down Expand Up @@ -941,7 +947,7 @@ async def initialize(self, deployment_config: DeploymentConfig):
record_autoscaling_stats_fn=self._user_callable_wrapper.call_record_autoscaling_stats,
)

if deployment_config:
if deployment_config is not None:
await self._user_callable_wrapper.set_sync_method_threadpool_limit(
deployment_config.max_ongoing_requests
)
Expand Down Expand Up @@ -1186,7 +1192,6 @@ async def __init__(
version: DeploymentVersion,
ingress: bool,
route_prefix: str,
rank: ReplicaRank,
):
deployment_config = DeploymentConfig.from_proto_bytes(
deployment_config_proto_bytes
Expand All @@ -1203,7 +1208,6 @@ async def __init__(
version=version,
ingress=ingress,
route_prefix=route_prefix,
rank=rank,
)

def push_proxy_handle(self, handle: ActorHandle):
Expand Down Expand Up @@ -1282,7 +1286,7 @@ async def is_allocated(self) -> str:
)

async def initialize_and_get_metadata(
self, deployment_config: DeploymentConfig = None, _after: Optional[Any] = None
self, deployment_config: DeploymentConfig = None, rank: ReplicaRank = None
) -> ReplicaMetadata:
"""Handles initializing the replica.

Expand All @@ -1295,7 +1299,7 @@ async def initialize_and_get_metadata(
"""
# Unused `_after` argument is for scheduling: passing an ObjectRef
# allows delaying this call until after the `_after` call has returned.
await self._replica_impl.initialize(deployment_config)
await self._replica_impl.initialize(deployment_config, rank)
return self._replica_impl.get_metadata()

async def check_health(self):
Expand Down
42 changes: 34 additions & 8 deletions python/ray/serve/tests/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,30 +176,56 @@ def get_replicas(replica_state):
)
return replicas.get([replica_state])

# wait for serve to start the replica, and catch a reference to it.
# wait for serve to start the replica
wait_for_condition(lambda: len(get_replicas(ReplicaState.STARTING)) > 0)
replica = get_replicas(ReplicaState.STARTING)[0]

# currently there are no resources to allocate the replica
assert replica.check_started()[0] == ReplicaStartupStatus.PENDING_ALLOCATION
def get_starting_replica():
replicas = get_replicas(ReplicaState.STARTING)
return replicas[0] if replicas else None

def is_pending_allocation():
replica = get_starting_replica()
if replica is None:
return False
return replica.check_started()[0] == ReplicaStartupStatus.PENDING_ALLOCATION

wait_for_condition(is_pending_allocation)

# add the necessary resources to allocate the replica
cluster.add_node(num_cpus=4)
wait_for_condition(lambda: (ray.cluster_resources().get("CPU", 0) >= 4))
wait_for_condition(lambda: (ray.available_resources().get("CPU", 0) >= 2))

def is_replica_pending_initialization():
replica = get_starting_replica()
if replica is None:
return False
status, _ = replica.check_started()
print(status)
return status == ReplicaStartupStatus.PENDING_INITIALIZATION

wait_for_condition(is_replica_pending_initialization, timeout=25)

# send signal to complete replica initialization
signal.send.remote()
wait_for_condition(
lambda: replica.check_started()[0] == ReplicaStartupStatus.SUCCEEDED
)
ray.get(signal.send.remote())

def check_succeeded():
# After initialization succeeds, replica transitions to RUNNING state
# So check both STARTING and RUNNING states
replica = get_starting_replica()
if replica:
status, _ = replica.check_started()
if status == ReplicaStartupStatus.SUCCEEDED:
return True

# Check if replica has moved to RUNNING state (which means it succeeded)
running_replicas = get_replicas(ReplicaState.RUNNING)
if running_replicas and len(running_replicas) > 0:
return True

return False

wait_for_condition(check_succeeded)


@serve.deployment
Expand Down
18 changes: 13 additions & 5 deletions python/ray/serve/tests/unit/test_deployment_state.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import sys
from copy import deepcopy
from typing import Any, Dict, List, Optional, Tuple
from typing import Any, Callable, Dict, List, Optional, Tuple
from unittest.mock import Mock, patch

import pytest
Expand Down Expand Up @@ -108,6 +108,7 @@ def __init__(
self._initialization_latency_s = -1
self._docs_path = None
self._rank = replica_rank_context.get(replica_id.unique_id, None)
self._assign_rank_callback = None

@property
def is_cross_language(self) -> bool:
Expand Down Expand Up @@ -226,10 +227,15 @@ def set_node_id(self, node_id: str):
def set_actor_id(self, actor_id: str):
self._actor_id = actor_id

def start(self, deployment_info: DeploymentInfo, rank: ReplicaRank):
def start(
self,
deployment_info: DeploymentInfo,
assign_rank_callback: Callable[[ReplicaID], ReplicaRank],
):
self.started = True
self._rank = rank
replica_rank_context[self._replica_id.unique_id] = rank
self._assign_rank_callback = assign_rank_callback
self._rank = assign_rank_callback(self._replica_id.unique_id)
replica_rank_context[self._replica_id.unique_id] = self._rank

def _on_scheduled_stub(*args, **kwargs):
pass
Expand Down Expand Up @@ -2682,7 +2688,9 @@ def test_max_concurrency_override(self):
)
max_ongoing_requests = DEFAULT_MAX_CONCURRENCY_ASYNC + 1
d_info, _ = deployment_info(max_ongoing_requests=max_ongoing_requests)
replica_scheduling_request = actor_replica.start(d_info, rank=0)
replica_scheduling_request = actor_replica.start(
d_info, assign_rank_callback=lambda x: 0
)
assert (
"max_concurrency" in replica_scheduling_request.actor_options
and replica_scheduling_request.actor_options["max_concurrency"]
Expand Down