Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
0d24f22
feat: update requirements to include aiohttp
ikaadil Jul 18, 2025
a3bc238
feat: replace HTTPX with aiohttp for asynchronous client handling
ikaadil Jul 18, 2025
49ac0f7
feat: replace HTTPXClientWrapper with AiohttpClientWrapper for improv…
ikaadil Jul 18, 2025
c5fce3b
feat: replace HTTPXClientWrapper with AiohttpClientWrapper in lifespa…
ikaadil Jul 18, 2025
95477f4
feat: remove httpx from requirements for aiohttp integration
ikaadil Jul 18, 2025
1b45d8e
refactor: switch from httpx to aiohttp for asynchronous client handli…
ikaadil Jul 24, 2025
3855dfd
feat: add AiohttpClientWrapper for asynchronous HTTP client handling
ikaadil Jul 24, 2025
c3ec0e2
chore: pin aiohttp version to 3.9.5 in requirements
ikaadil Jul 24, 2025
b026de7
refactor: migrate from httpx to aiohttp for asynchronous HTTP request…
ikaadil Jul 24, 2025
6db9193
refactor: update file upload implementation to use aiohttp for asynch…
ikaadil Jul 24, 2025
d976bce
chore: replace httpx with aiohttp in project dependencies and test re…
ikaadil Jul 24, 2025
321ebc5
chore: format dependencies in pyproject.toml for consistency
ikaadil Jul 24, 2025
b820266
chore: clean up imports and ensure consistent aiohttp version in requ…
ikaadil Jul 24, 2025
ad48764
fix: update aiohttp client closure method for consistency and accurac…
ikaadil Jul 24, 2025
ed6ad34
fix: correct status code handling in request processing for improved …
ikaadil Jul 25, 2025
da6fa9c
bumping version (#593)
YuhanLiu11 Jul 22, 2025
e24bdf6
Added option to specify priority class (#557)
Fabhiahn Jul 22, 2025
d43d061
[CI/Build] Change CI runner to L4 (#595)
Shaoting-Feng Jul 23, 2025
981315a
[Bugfix] fix dynamic config (#598)
zerofishnoodles Jul 24, 2025
04de252
[refactor] redesign RST documentation (#592)
kobe0938 Jul 25, 2025
51ccca9
[Misc] revert uv.lock (#604)
kobe0938 Jul 25, 2025
cbe1dd9
fix: update response content iteration method for improved performance
ikaadil Jul 26, 2025
6e76d18
Merge branch 'main' into router/replace-httpx-with-aiohttp
ikaadil Jul 26, 2025
aab597b
Merge branch 'main' into router/replace-httpx-with-aiohttp
YuhanLiu11 Jul 30, 2025
086fa9d
Replace httpx with aiohttp for prefill and decode clients in K8sServi…
ikaadil Jul 30, 2025
2cc2c4e
Merge branch 'main' into router/replace-httpx-with-aiohttp
ikaadil Jul 30, 2025
4b86abb
Update file upload function to include file parameter in aiohttp post…
ikaadil Jul 30, 2025
5db9e10
Refactor response handling in request service to return JSON data and…
ikaadil Jul 30, 2025
a830972
Merge branch 'main' into router/replace-httpx-with-aiohttp
ikaadil Jul 30, 2025
6aebf9a
Add initialization of aiohttp ClientSession in service discovery duri…
ikaadil Jul 31, 2025
adb08eb
Merge branch 'main' into router/replace-httpx-with-aiohttp
ikaadil Jul 31, 2025
947a212
Merge branch 'main' into router/replace-httpx-with-aiohttp
ikaadil Jul 31, 2025
5703efa
Refactor whitespace and formatting in app.py for improved readability
ikaadil Jul 31, 2025
baec49b
Merge branch 'main' into router/replace-httpx-with-aiohttp
ikaadil Aug 1, 2025
c0699d9
Merge branch 'main' into router/replace-httpx-with-aiohttp
ikaadil Aug 1, 2025
9bb0a58
Merge branch 'main' into router/replace-httpx-with-aiohttp
ikaadil Aug 1, 2025
0a69d87
Merge branch 'main' into router/replace-httpx-with-aiohttp
ikaadil Aug 4, 2025
d083961
Merge branch 'main' into router/replace-httpx-with-aiohttp
ikaadil Aug 5, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def add_line(self, line: str, source: str, *lineno: int) -> None:
"prometheus_client",
"uhashring",
"lmcache",
"httpx",
"aiohttp",
"transformers",
"os",
]
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ dependencies = [
"aiofiles==24.1.0",
"black>=25.1.0",
"fastapi==0.115.8",
"httpx==0.28.1",
"aiohttp==3.9.1",
"kubernetes==32.0.0",
"numpy==1.26.4",
"prometheus-client==0.21.1",
"python-multipart==0.0.20",
"sentry-sdk[fastapi,httpx]==2.27.0",
"sentry-sdk[fastapi]==2.27.0",
"uhashring==2.3",
"uvicorn==0.34.0",
"xxhash==3.5.0",
Expand Down
23 changes: 13 additions & 10 deletions src/examples/example_file_upload.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
import argparse

import httpx
import aiohttp


def upload_file(server_url: str, file_path: str):
async def upload_file(server_url: str, file_path: str):
"""Uploads a file to the production stack."""
try:
with open(file_path, "rb") as file:
files = {"file": (file_path, file, "application/octet-stream")}
data = {"purpose": "unknown"}

with httpx.Client() as client:
response = client.post(server_url, files=files, data=data)

if response.status_code == 200:
print("File uploaded successfully:", response.json())
else:
print("Failed to upload file:", response.text)
async with aiohttp.ClientSession() as client:
async with client.post(server_url, files=files, data=data) as response:
if response.status == 200:
result = await response.json()
print("File uploaded successfully:", result)
else:
text = await response.text()
print("Failed to upload file:", text)
except Exception as e:
print(f"Error: {e}")

Expand All @@ -31,7 +32,9 @@ def parse_args():


if __name__ == "__main__":
import asyncio

args = parse_args()
endpoint = args.url
file_to_upload = args.path
upload_file(endpoint, file_to_upload)
asyncio.run(upload_file(endpoint, file_to_upload))
2 changes: 1 addition & 1 deletion src/tests/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
aiohttp
fastapi
httpx
openai
uvicorn
vllm
Original file line number Diff line number Diff line change
Expand Up @@ -11,39 +11,38 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import httpx
import aiohttp

from vllm_router.log import init_logger

logger = init_logger(__name__)


class HTTPXClientWrapper:
class AiohttpClientWrapper:

async_client = None

def start(self):
"""Instantiate the client. Call from the FastAPI startup hook."""
# To fully leverage the router's concurrency capabilities,
# we set the maximum number of connections to be unlimited.
limits = httpx.Limits(max_connections=None)
self.async_client = httpx.AsyncClient(limits=limits)
logger.info(f"httpx AsyncClient instantiated. Id {id(self.async_client)}")
self.async_client = aiohttp.ClientSession()
logger.info(f"aiohttp ClientSession instantiated. Id {id(self.async_client)}")

async def stop(self):
"""Gracefully shutdown. Call from FastAPI shutdown hook."""
logger.info(
f"httpx async_client.is_closed(): {self.async_client.is_closed} - Now close it. Id (will be unchanged): {id(self.async_client)}"
f"aiohttp async_client.closed: {self.async_client.closed} - Now close it. Id (will be unchanged): {id(self.async_client)}"
)
await self.async_client.aclose()
await self.async_client.close()
logger.info(
f"httpx async_client.is_closed(): {self.async_client.is_closed}. Id (will be unchanged): {id(self.async_client)}"
f"aiohttp async_client.closed: {self.async_client.closed}. Id (will be unchanged): {id(self.async_client)}"
)
self.async_client = None
logger.info("httpx AsyncClient closed")
logger.info("aiohttp ClientSession closed")

def __call__(self):
"""Calling the instantiated HTTPXClientWrapper returns the wrapped singleton."""
"""Calling the instantiated AiohttpClientWrapper returns the wrapped singleton."""
# Ensure we don't use it if not started / running
assert self.async_client is not None
return self.async_client
13 changes: 9 additions & 4 deletions src/vllm_router/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@
import uvicorn
from fastapi import FastAPI

from vllm_router.aiohttp_client import AiohttpClientWrapper
from vllm_router.dynamic_config import (
DynamicRouterConfig,
get_dynamic_config_watcher,
initialize_dynamic_config_watcher,
)
from vllm_router.experimental import get_feature_gates, initialize_feature_gates
from vllm_router.httpx_client import HTTPXClientWrapper
from vllm_router.parsers.parser import parse_args
from vllm_router.routers.batches_router import batches_router
from vllm_router.routers.files_router import files_router
Expand Down Expand Up @@ -82,11 +82,16 @@

@asynccontextmanager
async def lifespan(app: FastAPI):
app.state.httpx_client_wrapper.start()
app.state.aiohttp_client_wrapper.start()
if hasattr(app.state, "batch_processor"):
await app.state.batch_processor.initialize()

service_discovery = get_service_discovery()
if hasattr(service_discovery, "initialize_client_sessions"):
await service_discovery.initialize_client_sessions()

yield
await app.state.httpx_client_wrapper.stop()
await app.state.aiohttp_client_wrapper.stop()

# Close the threaded-components
logger.info("Closing engine stats scraper")
Expand Down Expand Up @@ -265,7 +270,7 @@ def initialize_all(app: FastAPI, args):
app.include_router(files_router)
app.include_router(batches_router)
app.include_router(metrics_router)
app.state.httpx_client_wrapper = HTTPXClientWrapper()
app.state.aiohttp_client_wrapper = AiohttpClientWrapper()
app.state.semantic_cache_available = semantic_cache_available


Expand Down
4 changes: 2 additions & 2 deletions src/vllm_router/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
aiofiles==24.1.0
aiohttp==3.9.5
fastapi==0.115.8
httpx==0.28.1
kubernetes==32.0.0
numpy==1.26.4
prometheus_client==0.21.1
psutil==7.0.0
python-multipart==0.0.20
sentry-sdk[fastapi,httpx]==2.27.0
sentry-sdk[fastapi]==2.27.0
uhashring==2.3
uvicorn==0.34.0
xxhash==3.5.0
93 changes: 59 additions & 34 deletions src/vllm_router/service_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from dataclasses import dataclass
from typing import Dict, List, Optional

import httpx
import aiohttp
import requests
from kubernetes import client, config, watch

Expand Down Expand Up @@ -308,22 +308,29 @@ def get_endpoint_info(self) -> List[EndpointInfo]:
model_info=self._get_model_info(model),
)
endpoint_infos.append(endpoint_info)
return endpoint_infos

async def initialize_client_sessions(self) -> None:
"""
Initialize aiohttp ClientSession objects for prefill and decode endpoints.
This must be called from an async context during app startup.
"""
if (
self.prefill_model_labels is not None
and self.decode_model_labels is not None
):
endpoint_infos = self.get_endpoint_info()
for endpoint_info in endpoint_infos:
if endpoint_info.model_label in self.prefill_model_labels:
self.app.state.prefill_client = httpx.AsyncClient(
self.app.state.prefill_client = aiohttp.ClientSession(
base_url=endpoint_info.url,
timeout=None,
timeout=aiohttp.ClientTimeout(total=None),
)
elif endpoint_info.model_label in self.decode_model_labels:
self.app.state.decode_client = httpx.AsyncClient(
self.app.state.decode_client = aiohttp.ClientSession(
base_url=endpoint_info.url,
timeout=None,
timeout=aiohttp.ClientTimeout(total=None),
)
return endpoint_infos


class K8sPodIPServiceDiscovery(ServiceDiscovery):
Expand Down Expand Up @@ -629,20 +636,7 @@ def _add_engine(
namespace=self.namespace,
model_info=model_info,
)
if (
self.prefill_model_labels is not None
and self.decode_model_labels is not None
):
if model_label in self.prefill_model_labels:
self.app.state.prefill_client = httpx.AsyncClient(
base_url=f"http://{engine_ip}:{self.port}",
timeout=None,
)
elif model_label in self.decode_model_labels:
self.app.state.decode_client = httpx.AsyncClient(
base_url=f"http://{engine_ip}:{self.port}",
timeout=None,
)

# Store model information in the endpoint info
self.available_engines[engine_name].model_info = model_info

Expand Down Expand Up @@ -720,6 +714,28 @@ def close(self):
self.k8s_watcher.stop()
self.watcher_thread.join()

async def initialize_client_sessions(self) -> None:
"""
Initialize aiohttp ClientSession objects for prefill and decode endpoints.
This must be called from an async context during app startup.
"""
if (
self.prefill_model_labels is not None
and self.decode_model_labels is not None
):
endpoint_infos = self.get_endpoint_info()
for endpoint_info in endpoint_infos:
if endpoint_info.model_label in self.prefill_model_labels:
self.app.state.prefill_client = aiohttp.ClientSession(
base_url=endpoint_info.url,
timeout=aiohttp.ClientTimeout(total=None),
)
elif endpoint_info.model_label in self.decode_model_labels:
self.app.state.decode_client = aiohttp.ClientSession(
base_url=endpoint_info.url,
timeout=aiohttp.ClientTimeout(total=None),
)


class K8sServiceNameServiceDiscovery(ServiceDiscovery):
def __init__(
Expand Down Expand Up @@ -1024,20 +1040,7 @@ def _add_engine(self, engine_name: str, model_names: List[str], model_label: str
namespace=self.namespace,
model_info=model_info,
)
if (
self.prefill_model_labels is not None
and self.decode_model_labels is not None
):
if model_label in self.prefill_model_labels:
self.app.state.prefill_client = httpx.AsyncClient(
base_url=f"http://{engine_name}:{self.port}",
timeout=None,
)
elif model_label in self.decode_model_labels:
self.app.state.decode_client = httpx.AsyncClient(
base_url=f"http://{engine_name}:{self.port}",
timeout=None,
)

# Store model information in the endpoint info
self.available_engines[engine_name].model_info = model_info

Expand Down Expand Up @@ -1114,6 +1117,28 @@ def close(self):
self.k8s_watcher.stop()
self.watcher_thread.join()

async def initialize_client_sessions(self) -> None:
"""
Initialize aiohttp ClientSession objects for prefill and decode endpoints.
This must be called from an async context during app startup.
"""
if (
self.prefill_model_labels is not None
and self.decode_model_labels is not None
):
endpoint_infos = self.get_endpoint_info()
for endpoint_info in endpoint_infos:
if endpoint_info.model_label in self.prefill_model_labels:
self.app.state.prefill_client = aiohttp.ClientSession(
base_url=endpoint_info.url,
timeout=aiohttp.ClientTimeout(total=None),
)
elif endpoint_info.model_label in self.decode_model_labels:
self.app.state.decode_client = aiohttp.ClientSession(
base_url=endpoint_info.url,
timeout=aiohttp.ClientTimeout(total=None),
)


def _create_service_discovery(
service_discovery_type: ServiceDiscoveryType, *args, **kwargs
Expand Down
Loading