Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 59 additions & 2 deletions python/ray/serve/_private/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from ray.serve._private.constants import (
CLIENT_CHECK_CREATION_POLLING_INTERVAL_S,
CLIENT_POLLING_INTERVAL_S,
HTTP_PROXY_TIMEOUT,
MAX_CACHED_HANDLES,
SERVE_DEFAULT_APP_NAME,
SERVE_LOGGER_NAME,
Expand Down Expand Up @@ -288,6 +289,39 @@ def _wait_for_application_running(self, name: str, timeout_s: int = -1):
f"Application {name} did not become RUNNING after {timeout_s}s."
)

@_ensure_connected
def wait_for_proxies_serving(
self, wait_for_applications_running: bool = True
) -> None:
"""Wait for the proxies to be ready to serve requests."""
proxy_handles = ray.get(self._controller.get_proxies.remote())
serving_refs = [
handle.serving.remote(
wait_for_applications_running=wait_for_applications_running
)
for handle in proxy_handles.values()
]

done, pending = ray.wait(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to check if serving_refs is non empty

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't think so. ray.wait() would return two empty lists for done and pending. tested this out locally:

import ray

done, pending = ray.wait([], timeout=5, num_returns=0)
print(done, pending)
# output
[] []

serving_refs,
timeout=HTTP_PROXY_TIMEOUT,
num_returns=len(serving_refs),
)

if len(pending) > 0:
raise TimeoutError(f"Proxies not available after {HTTP_PROXY_TIMEOUT}s.")

# Ensure the proxies are either serving or dead.
for ref in done:
try:
ray.get(ref, timeout=1)
except ray.exceptions.RayActorError:
pass
except Exception:
raise TimeoutError(
f"Proxies not available after {HTTP_PROXY_TIMEOUT}s."
)

@_ensure_connected
def deploy_applications(
self,
Expand Down Expand Up @@ -388,17 +422,32 @@ def deploy_apps(
if _blocking:
timeout_s = 60

if isinstance(config, ServeDeploySchema):
app_names = {app.name for app in config.applications}
else:
app_names = {config.name}

start = time.time()
while time.time() - start < timeout_s:
curr_status = self.get_serve_status()
if curr_status.app_status.status == ApplicationStatus.RUNNING:
statuses = self.list_serve_statuses()
app_to_status = {
status.name: status.app_status.status
for status in statuses
if status.name in app_names
}
if len(app_names) == len(app_to_status) and set(
app_to_status.values()
) == {ApplicationStatus.RUNNING}:
break

time.sleep(CLIENT_POLLING_INTERVAL_S)
else:
raise TimeoutError(
f"Serve application isn't running after {timeout_s}s."
)

self.wait_for_proxies_serving(wait_for_applications_running=True)

def _check_ingress_deployments(
self, built_apps: Sequence[BuiltApplication]
) -> None:
Expand Down Expand Up @@ -479,6 +528,14 @@ def get_serve_status(self, name: str = SERVE_DEFAULT_APP_NAME) -> StatusOverview
)
return StatusOverview.from_proto(proto)

@_ensure_connected
def list_serve_statuses(self) -> List[StatusOverview]:
statuses_bytes = ray.get(self._controller.list_serve_statuses.remote())
return [
StatusOverview.from_proto(StatusOverviewProto.FromString(status_bytes))
for status_bytes in statuses_bytes
]

@_ensure_connected
def get_all_deployment_statuses(self) -> List[DeploymentStatusInfo]:
statuses_bytes = ray.get(self._controller.get_all_deployment_statuses.remote())
Expand Down
16 changes: 16 additions & 0 deletions python/ray/serve/_private/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -1055,6 +1055,18 @@ async def ready(self) -> str:
"""
pass

@abstractmethod
async def serving(self, wait_for_applications_running: bool = True) -> None:
"""Wait for the proxy to be ready to serve requests.

Args:
wait_for_applications_running: Whether to wait for the applications to be running

Returns:
None
"""
pass

@abstractmethod
async def update_draining(
self, draining: bool, _after: Optional[Any] = None
Expand Down Expand Up @@ -1308,6 +1320,10 @@ async def ready(self) -> str:
]
)

async def serving(self, wait_for_applications_running: bool = True) -> None:
"""Wait for the proxy to be ready to serve requests."""
return
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Proxy Serving Method Fails Readiness Check

The serving() method immediately returns, but its docstring indicates it should wait for the proxy to be ready to serve requests. This means the method doesn't actually verify proxy readiness, like HTTP/gRPC servers accepting connections, which impacts wait_for_proxies_serving()'s ability to ensure proxies are truly ready for traffic.

Fix in Cursor Fix in Web


async def update_draining(self, draining: bool, _after: Optional[Any] = None):
"""Update the draining status of the HTTP and gRPC proxies.

Expand Down
7 changes: 6 additions & 1 deletion python/ray/serve/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -608,12 +608,17 @@ def _run_many(
# Record after Ray has been started.
ServeUsageTag.API_VERSION.record("v2")

return client.deploy_applications(
handles = client.deploy_applications(
built_apps,
wait_for_ingress_deployment_creation=wait_for_ingress_deployment_creation,
wait_for_applications_running=wait_for_applications_running,
)

client.wait_for_proxies_serving(
wait_for_applications_running=wait_for_applications_running
)
return handles


@PublicAPI(stability="stable")
def _run(
Expand Down
2 changes: 1 addition & 1 deletion python/ray/serve/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -913,7 +913,7 @@ def check_for_failed_app():
# The timeout is there to prevent the test from hanging and blocking
# the test suite if it does fail.
r = httpx.post("http://localhost:8000/", timeout=10)
assert r.status_code == 503 and "unavailable" in r.text
assert r.status_code == 503 and "unavailable" in r.text.lower()

@serve.deployment
class A:
Expand Down
14 changes: 8 additions & 6 deletions python/ray/serve/tests/test_deploy_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -511,8 +511,8 @@ def test_deploy_config_update_heavyweight(serve_instance, field_to_update: str):
]
}

client.deploy_apps(ServeDeploySchema.parse_obj(config_template))
wait_for_condition(check_running, timeout=15)
client.deploy_apps(ServeDeploySchema.parse_obj(config_template), _blocking=True)
check_running()
url = get_application_url("HTTP", app_name=SERVE_DEFAULT_APP_NAME)
pid1, _ = httpx.get(url).json()

Expand All @@ -529,8 +529,8 @@ def test_deploy_config_update_heavyweight(serve_instance, field_to_update: str):
"num_cpus": 0.2
}

client.deploy_apps(ServeDeploySchema.parse_obj(config_template))
wait_for_condition(check_running, timeout=15)
client.deploy_apps(ServeDeploySchema.parse_obj(config_template), _blocking=True)
check_running()
url = get_application_url("HTTP", app_name=SERVE_DEFAULT_APP_NAME)

pids = []
Expand All @@ -549,8 +549,10 @@ def test_update_config_user_config(serve_instance):
}

# Deploy first time
client.deploy_apps(ServeDeploySchema.parse_obj({"applications": [config_template]}))
wait_for_condition(check_running, timeout=15)
client.deploy_apps(
ServeDeploySchema.parse_obj({"applications": [config_template]}), _blocking=True
)
check_running()
# Query
url = get_application_url("HTTP")
pid1, res = httpx.get(f"{url}/f").json()
Expand Down
31 changes: 17 additions & 14 deletions python/ray/serve/tests/test_deploy_app_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,8 @@ def serve_running():
]
)
actor_names = [actor["class_name"] for actor in actors]
return "ServeController" in actor_names and "ProxyActor" in actor_names
has_proxy = any("Proxy" in name for name in actor_names)
return "ServeController" in actor_names and has_proxy

wait_for_condition(serve_running)

Expand All @@ -323,8 +324,8 @@ def test_deployments_not_listed_in_config(serve_instance):
config = {
"applications": [{"import_path": "ray.serve.tests.test_config_files.pid.node"}]
}
client.deploy_apps(ServeDeploySchema(**config))
wait_for_condition(check_running, timeout=15)
client.deploy_apps(ServeDeploySchema(**config), _blocking=True)
check_running()
pid1, _ = httpx.get("http://localhost:8000/").json()

# Redeploy the same config (with no deployments listed)
Expand Down Expand Up @@ -408,8 +409,8 @@ def test_deploy_does_not_affect_dynamic_apps(serve_instance):
),
],
)
client.deploy_apps(config)
wait_for_condition(check_running, app_name="declarative-app-1")
client.deploy_apps(config, _blocking=True)
check_running(app_name="declarative-app-1")
url = get_application_url(app_name="declarative-app-1")
assert httpx.post(url).text == "wonderful world"

Expand All @@ -433,8 +434,8 @@ def __call__(self, *args) -> str:
import_path="ray.serve.tests.test_config_files.world.DagNode",
),
)
client.deploy_apps(config)
wait_for_condition(check_running, app_name="declarative-app-2")
client.deploy_apps(config, _blocking=True)
check_running(app_name="declarative-app-2")
url = get_application_url(app_name="declarative-app-2")
assert httpx.post(url).text == "wonderful world"

Expand Down Expand Up @@ -471,8 +472,8 @@ def __call__(self, *args) -> str:
import_path="ray.serve.tests.test_config_files.world.DagNode",
),
]
client.deploy_apps(config)
wait_for_condition(check_running, app_name="declarative-app-1")
client.deploy_apps(config, _blocking=True)
check_running(app_name="declarative-app-1")
url = get_application_url(app_name="declarative-app-1")
assert httpx.post(url).text == "wonderful world"

Expand Down Expand Up @@ -517,8 +518,8 @@ def __call__(self, *args) -> str:
import_path="ray.serve.tests.test_config_files.world.DagNode",
),
]
client.deploy_apps(config)
wait_for_condition(check_running, app_name="declarative-app-2")
client.deploy_apps(config, _blocking=True)
check_running(app_name="declarative-app-2")
url = get_application_url(app_name="declarative-app-2")
assert httpx.post(url).text == "wonderful world"

Expand All @@ -536,8 +537,10 @@ def test_change_route_prefix(serve_instance):
"route_prefix": "/old",
"import_path": "ray.serve.tests.test_config_files.pid.node",
}
client.deploy_apps(ServeDeploySchema(**{"applications": [app_config]}))
wait_for_condition(check_running)
client.deploy_apps(
ServeDeploySchema(**{"applications": [app_config]}), _blocking=True
)
check_running()
url = get_application_url()
pid1 = httpx.get(url).json()[0]
# Redeploy application with route prefix /new.
Expand Down Expand Up @@ -710,7 +713,7 @@ def test_deploy_one_app_failed(serve_instance):
# The timeout is there to prevent the test from hanging and blocking
# the test suite if it does fail.
r = httpx.post("http://localhost:8000/app2", timeout=10)
assert r.status_code == 503 and "unavailable" in r.text
assert r.status_code == 503 and "unavailable" in r.text.lower()


def test_deploy_with_route_prefix_conflict(serve_instance):
Expand Down
4 changes: 2 additions & 2 deletions python/ray/serve/tests/test_gcs_failure.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,8 @@ def test_proxy_router_updated_replicas_then_gcs_failure(serve_ha):
"route_prefix": "/",
"deployments": [{"name": "GetPID", "num_replicas": 1}],
}
client.deploy_apps(ServeDeploySchema(**{"applications": [config]}))
wait_for_condition(check_apps_running, apps=["default"])
client.deploy_apps(ServeDeploySchema(**{"applications": [config]}), _blocking=True)
check_apps_running(apps=["default"])

r = httpx.post("http://localhost:8000")
assert r.status_code == 200, r.text
Expand Down