diff --git a/python/ray/serve/_private/client.py b/python/ray/serve/_private/client.py index 26ccb753ce8e..4ad027fe8160 100644 --- a/python/ray/serve/_private/client.py +++ b/python/ray/serve/_private/client.py @@ -20,6 +20,7 @@ from ray.serve._private.constants import ( CLIENT_CHECK_CREATION_POLLING_INTERVAL_S, CLIENT_POLLING_INTERVAL_S, + HTTP_PROXY_TIMEOUT, MAX_CACHED_HANDLES, SERVE_DEFAULT_APP_NAME, SERVE_LOGGER_NAME, @@ -288,6 +289,39 @@ def _wait_for_application_running(self, name: str, timeout_s: int = -1): f"Application {name} did not become RUNNING after {timeout_s}s." ) + @_ensure_connected + def wait_for_proxies_serving( + self, wait_for_applications_running: bool = True + ) -> None: + """Wait for the proxies to be ready to serve requests.""" + proxy_handles = ray.get(self._controller.get_proxies.remote()) + serving_refs = [ + handle.serving.remote( + wait_for_applications_running=wait_for_applications_running + ) + for handle in proxy_handles.values() + ] + + done, pending = ray.wait( + serving_refs, + timeout=HTTP_PROXY_TIMEOUT, + num_returns=len(serving_refs), + ) + + if len(pending) > 0: + raise TimeoutError(f"Proxies not available after {HTTP_PROXY_TIMEOUT}s.") + + # Ensure the proxies are either serving or dead. + for ref in done: + try: + ray.get(ref, timeout=1) + except ray.exceptions.RayActorError: + pass + except Exception: + raise TimeoutError( + f"Proxies not available after {HTTP_PROXY_TIMEOUT}s." + ) + @_ensure_connected def deploy_applications( self, @@ -388,17 +422,32 @@ def deploy_apps( if _blocking: timeout_s = 60 + if isinstance(config, ServeDeploySchema): + app_names = {app.name for app in config.applications} + else: + app_names = {config.name} + start = time.time() while time.time() - start < timeout_s: - curr_status = self.get_serve_status() - if curr_status.app_status.status == ApplicationStatus.RUNNING: + statuses = self.list_serve_statuses() + app_to_status = { + status.name: status.app_status.status + for status in statuses + if status.name in app_names + } + if len(app_names) == len(app_to_status) and set( + app_to_status.values() + ) == {ApplicationStatus.RUNNING}: break + time.sleep(CLIENT_POLLING_INTERVAL_S) else: raise TimeoutError( f"Serve application isn't running after {timeout_s}s." ) + self.wait_for_proxies_serving(wait_for_applications_running=True) + def _check_ingress_deployments( self, built_apps: Sequence[BuiltApplication] ) -> None: @@ -479,6 +528,14 @@ def get_serve_status(self, name: str = SERVE_DEFAULT_APP_NAME) -> StatusOverview ) return StatusOverview.from_proto(proto) + @_ensure_connected + def list_serve_statuses(self) -> List[StatusOverview]: + statuses_bytes = ray.get(self._controller.list_serve_statuses.remote()) + return [ + StatusOverview.from_proto(StatusOverviewProto.FromString(status_bytes)) + for status_bytes in statuses_bytes + ] + @_ensure_connected def get_all_deployment_statuses(self) -> List[DeploymentStatusInfo]: statuses_bytes = ray.get(self._controller.get_all_deployment_statuses.remote()) diff --git a/python/ray/serve/_private/proxy.py b/python/ray/serve/_private/proxy.py index aaf0c6008152..9bd4eed731da 100644 --- a/python/ray/serve/_private/proxy.py +++ b/python/ray/serve/_private/proxy.py @@ -1055,6 +1055,18 @@ async def ready(self) -> str: """ pass + @abstractmethod + async def serving(self, wait_for_applications_running: bool = True) -> None: + """Wait for the proxy to be ready to serve requests. + + Args: + wait_for_applications_running: Whether to wait for the applications to be running + + Returns: + None + """ + pass + @abstractmethod async def update_draining( self, draining: bool, _after: Optional[Any] = None @@ -1308,6 +1320,10 @@ async def ready(self) -> str: ] ) + async def serving(self, wait_for_applications_running: bool = True) -> None: + """Wait for the proxy to be ready to serve requests.""" + return + async def update_draining(self, draining: bool, _after: Optional[Any] = None): """Update the draining status of the HTTP and gRPC proxies. diff --git a/python/ray/serve/api.py b/python/ray/serve/api.py index 25fcaa7b76bc..8fe4bf933572 100644 --- a/python/ray/serve/api.py +++ b/python/ray/serve/api.py @@ -608,12 +608,17 @@ def _run_many( # Record after Ray has been started. ServeUsageTag.API_VERSION.record("v2") - return client.deploy_applications( + handles = client.deploy_applications( built_apps, wait_for_ingress_deployment_creation=wait_for_ingress_deployment_creation, wait_for_applications_running=wait_for_applications_running, ) + client.wait_for_proxies_serving( + wait_for_applications_running=wait_for_applications_running + ) + return handles + @PublicAPI(stability="stable") def _run( diff --git a/python/ray/serve/tests/test_api.py b/python/ray/serve/tests/test_api.py index ad6d8fe670ed..80955eeff12f 100644 --- a/python/ray/serve/tests/test_api.py +++ b/python/ray/serve/tests/test_api.py @@ -913,7 +913,7 @@ def check_for_failed_app(): # The timeout is there to prevent the test from hanging and blocking # the test suite if it does fail. r = httpx.post("http://localhost:8000/", timeout=10) - assert r.status_code == 503 and "unavailable" in r.text + assert r.status_code == 503 and "unavailable" in r.text.lower() @serve.deployment class A: diff --git a/python/ray/serve/tests/test_deploy_app.py b/python/ray/serve/tests/test_deploy_app.py index e94f8316d431..b320223b3b91 100644 --- a/python/ray/serve/tests/test_deploy_app.py +++ b/python/ray/serve/tests/test_deploy_app.py @@ -511,8 +511,8 @@ def test_deploy_config_update_heavyweight(serve_instance, field_to_update: str): ] } - client.deploy_apps(ServeDeploySchema.parse_obj(config_template)) - wait_for_condition(check_running, timeout=15) + client.deploy_apps(ServeDeploySchema.parse_obj(config_template), _blocking=True) + check_running() url = get_application_url("HTTP", app_name=SERVE_DEFAULT_APP_NAME) pid1, _ = httpx.get(url).json() @@ -529,8 +529,8 @@ def test_deploy_config_update_heavyweight(serve_instance, field_to_update: str): "num_cpus": 0.2 } - client.deploy_apps(ServeDeploySchema.parse_obj(config_template)) - wait_for_condition(check_running, timeout=15) + client.deploy_apps(ServeDeploySchema.parse_obj(config_template), _blocking=True) + check_running() url = get_application_url("HTTP", app_name=SERVE_DEFAULT_APP_NAME) pids = [] @@ -549,8 +549,10 @@ def test_update_config_user_config(serve_instance): } # Deploy first time - client.deploy_apps(ServeDeploySchema.parse_obj({"applications": [config_template]})) - wait_for_condition(check_running, timeout=15) + client.deploy_apps( + ServeDeploySchema.parse_obj({"applications": [config_template]}), _blocking=True + ) + check_running() # Query url = get_application_url("HTTP") pid1, res = httpx.get(f"{url}/f").json() diff --git a/python/ray/serve/tests/test_deploy_app_2.py b/python/ray/serve/tests/test_deploy_app_2.py index 8858ee64c7ef..d1c585e870ce 100644 --- a/python/ray/serve/tests/test_deploy_app_2.py +++ b/python/ray/serve/tests/test_deploy_app_2.py @@ -310,7 +310,8 @@ def serve_running(): ] ) actor_names = [actor["class_name"] for actor in actors] - return "ServeController" in actor_names and "ProxyActor" in actor_names + has_proxy = any("Proxy" in name for name in actor_names) + return "ServeController" in actor_names and has_proxy wait_for_condition(serve_running) @@ -323,8 +324,8 @@ def test_deployments_not_listed_in_config(serve_instance): config = { "applications": [{"import_path": "ray.serve.tests.test_config_files.pid.node"}] } - client.deploy_apps(ServeDeploySchema(**config)) - wait_for_condition(check_running, timeout=15) + client.deploy_apps(ServeDeploySchema(**config), _blocking=True) + check_running() pid1, _ = httpx.get("http://localhost:8000/").json() # Redeploy the same config (with no deployments listed) @@ -408,8 +409,8 @@ def test_deploy_does_not_affect_dynamic_apps(serve_instance): ), ], ) - client.deploy_apps(config) - wait_for_condition(check_running, app_name="declarative-app-1") + client.deploy_apps(config, _blocking=True) + check_running(app_name="declarative-app-1") url = get_application_url(app_name="declarative-app-1") assert httpx.post(url).text == "wonderful world" @@ -433,8 +434,8 @@ def __call__(self, *args) -> str: import_path="ray.serve.tests.test_config_files.world.DagNode", ), ) - client.deploy_apps(config) - wait_for_condition(check_running, app_name="declarative-app-2") + client.deploy_apps(config, _blocking=True) + check_running(app_name="declarative-app-2") url = get_application_url(app_name="declarative-app-2") assert httpx.post(url).text == "wonderful world" @@ -471,8 +472,8 @@ def __call__(self, *args) -> str: import_path="ray.serve.tests.test_config_files.world.DagNode", ), ] - client.deploy_apps(config) - wait_for_condition(check_running, app_name="declarative-app-1") + client.deploy_apps(config, _blocking=True) + check_running(app_name="declarative-app-1") url = get_application_url(app_name="declarative-app-1") assert httpx.post(url).text == "wonderful world" @@ -517,8 +518,8 @@ def __call__(self, *args) -> str: import_path="ray.serve.tests.test_config_files.world.DagNode", ), ] - client.deploy_apps(config) - wait_for_condition(check_running, app_name="declarative-app-2") + client.deploy_apps(config, _blocking=True) + check_running(app_name="declarative-app-2") url = get_application_url(app_name="declarative-app-2") assert httpx.post(url).text == "wonderful world" @@ -536,8 +537,10 @@ def test_change_route_prefix(serve_instance): "route_prefix": "/old", "import_path": "ray.serve.tests.test_config_files.pid.node", } - client.deploy_apps(ServeDeploySchema(**{"applications": [app_config]})) - wait_for_condition(check_running) + client.deploy_apps( + ServeDeploySchema(**{"applications": [app_config]}), _blocking=True + ) + check_running() url = get_application_url() pid1 = httpx.get(url).json()[0] # Redeploy application with route prefix /new. @@ -710,7 +713,7 @@ def test_deploy_one_app_failed(serve_instance): # The timeout is there to prevent the test from hanging and blocking # the test suite if it does fail. r = httpx.post("http://localhost:8000/app2", timeout=10) - assert r.status_code == 503 and "unavailable" in r.text + assert r.status_code == 503 and "unavailable" in r.text.lower() def test_deploy_with_route_prefix_conflict(serve_instance): diff --git a/python/ray/serve/tests/test_gcs_failure.py b/python/ray/serve/tests/test_gcs_failure.py index 6989cb91ecaa..162bd03ed119 100644 --- a/python/ray/serve/tests/test_gcs_failure.py +++ b/python/ray/serve/tests/test_gcs_failure.py @@ -272,8 +272,8 @@ def test_proxy_router_updated_replicas_then_gcs_failure(serve_ha): "route_prefix": "/", "deployments": [{"name": "GetPID", "num_replicas": 1}], } - client.deploy_apps(ServeDeploySchema(**{"applications": [config]})) - wait_for_condition(check_apps_running, apps=["default"]) + client.deploy_apps(ServeDeploySchema(**{"applications": [config]}), _blocking=True) + check_apps_running(apps=["default"]) r = httpx.post("http://localhost:8000") assert r.status_code == 200, r.text