Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion python/ray/serve/_private/default_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
ClusterNodeInfoCache,
DefaultClusterNodeInfoCache,
)
from ray.serve._private.common import DeploymentID
from ray.serve._private.common import DeploymentHandleSource, DeploymentID, EndpointInfo
from ray.serve._private.constants import (
RAY_SERVE_ENABLE_QUEUE_LENGTH_CACHE,
RAY_SERVE_ENABLE_STRICT_MAX_ONGOING_REQUESTS,
RAY_SERVE_PROXY_PREFER_LOCAL_AZ_ROUTING,
RAY_SERVE_PROXY_PREFER_LOCAL_NODE_ROUTING,
)
from ray.serve._private.deployment_scheduler import (
DefaultDeploymentScheduler,
Expand Down Expand Up @@ -131,3 +132,26 @@ def create_router(
def add_grpc_address(grpc_server: gRPCServer, server_address: str):
"""Helper function to add a address to gRPC server."""
grpc_server.add_insecure_port(server_address)


def get_proxy_handle(endpoint: DeploymentID, info: EndpointInfo):
# NOTE(zcin): needs to be lazy import due to a circular dependency.
# We should not be importing from application_state in context.
from ray.serve.context import _get_global_client

client = _get_global_client()
handle = client.get_handle(endpoint.name, endpoint.app_name, check_exists=True)

# NOTE(zcin): It's possible that a handle is already initialized
# if a deployment with the same name and application name was
# deleted, then redeployed later. However this is not an issue since
# we initialize all handles with the same init options.
if not handle.is_initialized:
# NOTE(zcin): since the router is eagerly initialized here, the
# proxy will receive the replica set from the controller early.
handle._init(
_prefer_local_routing=RAY_SERVE_PROXY_PREFER_LOCAL_NODE_ROUTING,
_source=DeploymentHandleSource.PROXY,
)

return handle.options(stream=not info.app_is_cross_language)
3 changes: 1 addition & 2 deletions python/ray/serve/_private/handle_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from dataclasses import dataclass, fields

import ray
from ray.serve._private.common import DeploymentHandleSource, RequestProtocol
from ray.serve._private.common import DeploymentHandleSource
from ray.serve._private.utils import DEFAULT


Expand Down Expand Up @@ -52,7 +52,6 @@ class DynamicHandleOptionsBase(ABC):
method_name: str = "__call__"
multiplexed_model_id: str = ""
stream: bool = False
_request_protocol: str = RequestProtocol.UNDEFINED

def copy_and_update(self, **kwargs) -> "DynamicHandleOptionsBase":
new_kwargs = {}
Expand Down
55 changes: 15 additions & 40 deletions python/ray/serve/_private/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@
import socket
import time
from abc import ABC, abstractmethod
from functools import partial
from typing import Any, Callable, Dict, Generator, List, Optional, Set, Tuple, Type
from typing import Any, Callable, Dict, Generator, List, Optional, Set, Tuple

import grpc
import starlette
Expand All @@ -19,7 +18,6 @@
from starlette.types import Receive

import ray
from ray import serve
from ray._private.utils import get_or_create_event_loop
from ray.actor import ActorHandle
from ray.exceptions import RayActorError, RayTaskError
Expand All @@ -41,7 +39,7 @@
SERVE_MULTIPLEXED_MODEL_ID,
SERVE_NAMESPACE,
)
from ray.serve._private.default_impl import add_grpc_address
from ray.serve._private.default_impl import add_grpc_address, get_proxy_handle
from ray.serve._private.grpc_util import DummyServicer, create_serve_grpc_server
from ray.serve._private.http_util import (
MessageQueue,
Expand All @@ -68,11 +66,7 @@
gRPCProxyRequest,
)
from ray.serve._private.proxy_response_generator import ProxyResponseGenerator
from ray.serve._private.proxy_router import (
EndpointRouter,
LongestPrefixRouter,
ProxyRouter,
)
from ray.serve._private.proxy_router import ProxyRouter
from ray.serve._private.usage import ServeUsageTag
from ray.serve._private.utils import (
call_function_from_import_path,
Expand Down Expand Up @@ -151,9 +145,8 @@ def __init__(
node_id: NodeId,
node_ip_address: str,
is_head: bool,
proxy_router_class: Type[ProxyRouter],
proxy_router: ProxyRouter,
request_timeout_s: Optional[float] = None,
get_handle_override: Optional[Callable] = None,
):
self.request_timeout_s = request_timeout_s
if self.request_timeout_s is not None and self.request_timeout_s < 0:
Expand All @@ -162,14 +155,7 @@ def __init__(
self._node_id = node_id
self._is_head = is_head

# Used only for displaying the route table.
self.route_info: Dict[str, DeploymentID] = dict()

self.proxy_router = proxy_router_class(
get_handle_override
or partial(serve.get_deployment_handle, _record_telemetry=False),
self.protocol,
)
self.proxy_router = proxy_router
self.request_counter = metrics.Counter(
f"serve_num_{self.protocol.lower()}_requests",
description=f"The number of {self.protocol} requests processed.",
Expand Down Expand Up @@ -250,14 +236,6 @@ def _is_draining(self) -> bool:
"""Whether is proxy actor is in the draining status or not."""
return self._draining_start_time is not None

def update_routes(self, endpoints: Dict[DeploymentID, EndpointInfo]):
self.route_info: Dict[str, DeploymentID] = dict()
for deployment_id, info in endpoints.items():
route = info.route
self.route_info[route] = deployment_id

self.proxy_router.update_routes(endpoints)

def is_drained(self):
"""Check whether the proxy actor is drained or not.

Expand Down Expand Up @@ -590,7 +568,7 @@ async def routes_response(
) -> ResponseGenerator:
yield ListApplicationsResponse(
application_names=[
endpoint.app_name for endpoint in self.route_info.values()
endpoint.app_name for endpoint in self.proxy_router.endpoints
],
).SerializeToString()

Expand Down Expand Up @@ -784,18 +762,16 @@ def __init__(
node_id: NodeId,
node_ip_address: str,
is_head: bool,
proxy_router_class: Type[ProxyRouter],
proxy_router: ProxyRouter,
request_timeout_s: Optional[float] = None,
proxy_actor: Optional[ActorHandle] = None,
get_handle_override: Optional[Callable] = None,
):
super().__init__(
node_id,
node_ip_address,
is_head,
proxy_router_class,
proxy_router,
request_timeout_s=request_timeout_s,
get_handle_override=get_handle_override,
)
self.self_actor_handle = proxy_actor or ray.get_runtime_context().current_actor
self.asgi_receive_queues: Dict[str, MessageQueue] = dict()
Expand Down Expand Up @@ -823,13 +799,13 @@ async def routes_response(
status_code = 200 if healthy else 503
if healthy:
response = dict()
for route, endpoint in self.route_info.items():
for endpoint, info in self.proxy_router.endpoints.items():
# For 2.x deployments, return {route -> app name}
if endpoint.app_name:
response[route] = endpoint.app_name
response[info.route] = endpoint.app_name
# Keep compatibility with 1.x deployments.
else:
response[route] = endpoint.name
response[info.route] = endpoint.name
else:
response = message

Expand Down Expand Up @@ -1231,11 +1207,12 @@ def __init__(
http_middlewares.extend(middlewares)

is_head = node_id == get_head_node_id()
self.proxy_router = ProxyRouter(get_proxy_handle)
self.http_proxy = HTTPProxy(
node_id=node_id,
node_ip_address=node_ip_address,
is_head=is_head,
proxy_router_class=LongestPrefixRouter,
proxy_router=self.proxy_router,
request_timeout_s=(
request_timeout_s or RAY_SERVE_REQUEST_PROCESSING_TIMEOUT_S
),
Expand All @@ -1245,7 +1222,7 @@ def __init__(
node_id=node_id,
node_ip_address=node_ip_address,
is_head=is_head,
proxy_router_class=EndpointRouter,
proxy_router=self.proxy_router,
request_timeout_s=(
request_timeout_s or RAY_SERVE_REQUEST_PROCESSING_TIMEOUT_S
),
Expand Down Expand Up @@ -1283,9 +1260,7 @@ def __init__(
)

def _update_routes_in_proxies(self, endpoints: Dict[DeploymentID, EndpointInfo]):
self.http_proxy.update_routes(endpoints)
if self.grpc_proxy is not None:
self.grpc_proxy.update_routes(endpoints)
self.proxy_router.update_routes(endpoints)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh this is nice! 👍


def _update_logging_config(self, logging_config: LoggingConfig):
configure_component_logger(
Expand Down
Loading