Skip to content
Merged
Changes from 1 commit
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
a25a4cb
[BugFix] Provide bucket algorithm rate limiter and queue for 1P1D pro…
frankie-ys Aug 11, 2025
3a8a896
[PR Fix] prevent deadLock (#22575)
frankie-ys Aug 11, 2025
b4c50ce
[PR Fix] fix ruff (#22575)
frankie-ys Aug 11, 2025
4c71543
[PR Fix] fix ruff (#22575)
frankie-ys Aug 12, 2025
7640a98
[PR Fix] fix ruff (#22575)
frankie-ys Aug 12, 2025
4f380c7
[PR Fix] fix ruff (#22575)
frankie-ys Aug 12, 2025
83c4bb4
[PR Fix] fix ruff (#22575)
frankie-ys Aug 12, 2025
9976225
[PR Fix] fix ruff (#22575)
frankie-ys Aug 12, 2025
68f70a0
[PR Fix] fix ruff (#22575)
frankie-ys Aug 12, 2025
115e536
Merge branch 'vllm-project:main' into main
frankie-ys Aug 13, 2025
fd3f1e9
[PR Fix] fix ruff (#22575)
frankie-ys Aug 13, 2025
fe69db9
Update benchmarks/disagg_benchmarks/disagg_prefill_proxy_server.py
frankie-ys Aug 14, 2025
6a61475
Update benchmarks/disagg_benchmarks/disagg_prefill_proxy_server.py
frankie-ys Aug 14, 2025
e899ef1
[review Fix]
frankie-ys Aug 14, 2025
5583205
[review Fix] abstract the rate limiter into a separate file (#22643)
frankie-ys Aug 14, 2025
273f54f
[review Fix] abstract the rate limiter into a separate file (#22643)
frankie-ys Aug 14, 2025
109555c
[Fix] ruff fix(#22643)
frankie-ys Aug 14, 2025
c1cf0de
[Fix] ruff fix(#22643)
frankie-ys Aug 14, 2025
b230914
[Fix] ruff fix(#22643)
frankie-ys Aug 14, 2025
5e5f0d4
[Fix] add rate limiter as context manager(#22643)
frankie-ys Aug 14, 2025
c607a9b
[Fix] add rate limiter as context manager(#22643)
frankie-ys Aug 14, 2025
455a5c3
[Fix] fix(#22643)
frankie-ys Aug 14, 2025
10aeac1
[Fix] fix(#22643)
frankie-ys Aug 14, 2025
eb7cad6
[Fix] fix(#22643)
frankie-ys Aug 14, 2025
0ea3e8d
[Fix] fix(#22643)
frankie-ys Aug 14, 2025
cdb4941
[Fix] fix(#22643)
frankie-ys Aug 14, 2025
f2d9c2f
[Fix] ruff fix(#22643)
frankie-ys Aug 15, 2025
75d2e5b
Merge branch 'main' into main
frankie-ys Aug 15, 2025
53dfe50
[Fix] ruff fix(#22643)
frankie-ys Aug 15, 2025
6512c8b
[Fix] fix(#22643)
frankie-ys Aug 15, 2025
f78d894
[Fix] fix(#22643)
frankie-ys Aug 15, 2025
9ee5b88
Merge branch 'main' into main
KuntaiDu Aug 15, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 160 additions & 32 deletions benchmarks/disagg_benchmarks/disagg_prefill_proxy_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,62 +2,190 @@
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project

import os

import asyncio
import aiohttp
from quart import Quart, make_response, request
from quart import Quart, make_response, request, Response
from collections import deque
import time
import logging

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Global configuration parameters
AIOHTTP_TIMEOUT = aiohttp.ClientTimeout(total=300) # Timeout for backend service requests (seconds)
MAX_CONCURRENT_REQUESTS = 10 # Maximum concurrent requests to backend services

Check failure on line 18 in benchmarks/disagg_benchmarks/disagg_prefill_proxy_server.py

View workflow job for this annotation

GitHub Actions / pre-commit

Ruff (E501)

benchmarks/disagg_benchmarks/disagg_prefill_proxy_server.py:18:89: E501 Line too long (100 > 88)
REQUEST_QUEUE_SIZE = 50 # Maximum number of requests in the queue
RATE_LIMIT = 5 # Maximum requests per second (rate limiting)
PRE_SERVICE_URL = "http://localhost:8100/v1/completions" # Prefill service endpoint
DECODE_SERVICE_URL = "http://localhost:8200/v1/completions" # Decode service endpoint
#run this need pip install quart
app = Quart(__name__)

AIOHTTP_TIMEOUT = aiohttp.ClientTimeout(total=6 * 60 * 60)

app = Quart(__name__)
# Token bucket rate limiter implementation
class RateLimiter:
def __init__(self, rate_limit):
self.rate_limit = rate_limit # Requests per second
self.tokens = rate_limit # Available tokens
self.last_refill = time.monotonic() # Last token refill time
self.lock = asyncio.Lock() # Synchronization lock

async def acquire(self):
"""Acquire a token from the rate limiter"""
async with self.lock:
current_time = time.monotonic()
elapsed = current_time - self.last_refill

# Refill tokens if more than 1 second has passed
if elapsed > 1.0:
self.tokens = self.rate_limit
self.last_refill = current_time

# Check if tokens are available
if self.tokens > 0:
self.tokens -= 1
return True

# Calculate wait time if no tokens available
wait_time = 1.0 - elapsed
await asyncio.sleep(wait_time)
self.last_refill = time.monotonic()
self.tokens = self.rate_limit - 1
return True

# Request queue manager with concurrency control
class RequestQueue:
def __init__(self, max_concurrent, max_queue_size):
self.max_concurrent = max_concurrent # Maximum concurrent requests
self.max_queue_size = max_queue_size # Maximum queue size
self.semaphore = asyncio.Semaphore(max_concurrent) # Concurrency control
self.queue = deque() # Request queue
self.queue_size = 0 # Current queue size
self.lock = asyncio.Lock() # Sync queue Lock

async def enqueue(self, task):
"""Add a request task to the queue"""
async with self.lock:
if self.queue_size >= self.max_queue_size:
logger.warning("Request queue full, rejecting request")
return False

self.queue.append(task)
self.queue_size += 1
return True

async def process(self):
"""Process queued requests using semaphore for concurrency control"""
while True:
if self.queue:
async with self.semaphore:
async with self.lock:
task = self.queue.popleft()

Check failure on line 85 in benchmarks/disagg_benchmarks/disagg_prefill_proxy_server.py

View workflow job for this annotation

GitHub Actions / pre-commit

Ruff (SIM117)

benchmarks/disagg_benchmarks/disagg_prefill_proxy_server.py:84:17: SIM117 Use a single `with` statement with multiple contexts instead of nested `with` statements
self.queue_size -= 1
await task
await asyncio.sleep(0.01) # Yield control to event loop


# Initialize rate limiter and request queue
rate_limiter = RateLimiter(RATE_LIMIT)
request_queue = RequestQueue(MAX_CONCURRENT_REQUESTS, REQUEST_QUEUE_SIZE)


# Start queue processing on app startup
@app.before_serving
async def startup():
"""Start request processing task when app starts serving"""
asyncio.create_task(request_queue.process())


async def forward_request(url, data):
"""Forward request to backend service with rate limiting and error handling"""
# Apply rate limiting before making request
await rate_limiter.acquire()

headers = {"Authorization": f"Bearer {os.environ.get('OPENAI_API_KEY')}"}
async with aiohttp.ClientSession(timeout=AIOHTTP_TIMEOUT) as session:
headers = {"Authorization": f"Bearer {os.environ.get('OPENAI_API_KEY')}"}
async with session.post(url=url, json=data, headers=headers) as response:
if response.status == 200:
# if response.headers.get('Transfer-Encoding') == 'chunked':
if True:
try:
async with session.post(url=url, json=data, headers=headers) as response:
if response.status == 200:
# Stream response chunks
async for chunk_bytes in response.content.iter_chunked(1024):
yield chunk_bytes
else:
content = await response.read()
yield content


@app.route("/v1/completions", methods=["POST"])
async def handle_request():
# Handle backend service errors
error_text = await response.text()
logger.error(f"Backend service error: {response.status} - {error_text}")
yield b'{"error": "Backend service error"}'

Check failure on line 120 in benchmarks/disagg_benchmarks/disagg_prefill_proxy_server.py

View workflow job for this annotation

GitHub Actions / pre-commit

Ruff (E501)

benchmarks/disagg_benchmarks/disagg_prefill_proxy_server.py:120:89: E501 Line too long (92 > 88)

Check failure on line 120 in benchmarks/disagg_benchmarks/disagg_prefill_proxy_server.py

View workflow job for this annotation

GitHub Actions / pre-commit

Ruff (G004)

benchmarks/disagg_benchmarks/disagg_prefill_proxy_server.py:120:34: G004 Logging statement uses f-string
except aiohttp.ClientError as e:
# Handle connection errors
logger.error(f"Connection error to {url}: {str(e)}")
yield b'{"error": "Service unavailable"}'

Check failure on line 124 in benchmarks/disagg_benchmarks/disagg_prefill_proxy_server.py

View workflow job for this annotation

GitHub Actions / pre-commit

Ruff (G004)

benchmarks/disagg_benchmarks/disagg_prefill_proxy_server.py:124:26: G004 Logging statement uses f-string
except asyncio.TimeoutError:
# Handle timeout errors
logger.error(f"Timeout connecting to {url}")
yield b'{"error": "Service timeout"}'

Check failure on line 128 in benchmarks/disagg_benchmarks/disagg_prefill_proxy_server.py

View workflow job for this annotation

GitHub Actions / pre-commit

Ruff (G004)

benchmarks/disagg_benchmarks/disagg_prefill_proxy_server.py:128:26: G004 Logging statement uses f-string


async def process_request():
"""Process a single request through prefill and decode stages"""
try:
original_request_data = await request.get_json()

# Create prefill request (max_tokens=1)
prefill_request = original_request_data.copy()
# change max_tokens = 1 to let it only do prefill
prefill_request["max_tokens"] = 1

# finish prefill
async for _ in forward_request(
"http://localhost:8100/v1/completions", prefill_request
):
# Execute prefill stage
async for _ in forward_request(PRE_SERVICE_URL, prefill_request):
continue

# return decode
generator = forward_request(
"http://localhost:8200/v1/completions", original_request_data
)
# Execute decode stage and stream response
generator = forward_request(DECODE_SERVICE_URL, original_request_data)
response = await make_response(generator)
response.timeout = None

response.timeout = None # Disable timeout for streaming response
return response

except Exception as e:
import sys
# Handle internal server errors
import traceback
logger.error(f"Error processing request: {str(e)}")
logger.error(traceback.format_exc())

Check failure on line 154 in benchmarks/disagg_benchmarks/disagg_prefill_proxy_server.py

View workflow job for this annotation

GitHub Actions / pre-commit

Ruff (G004)

benchmarks/disagg_benchmarks/disagg_prefill_proxy_server.py:154:22: G004 Logging statement uses f-string
return Response(
response=b'{"error": "Internal server error"}',
status=500,
content_type="application/json"
)


exc_info = sys.exc_info()
print("Error occurred in disagg prefill proxy server")
print(e)
print("".join(traceback.format_exception(*exc_info)))
@app.route("/v1/completions", methods=["POST"])
async def handle_request():
"""Handle incoming API requests with concurrency and rate limiting"""
# Create task for request processing
task = asyncio.create_task(process_request())

# Enqueue request or reject if queue is full
if not await request_queue.enqueue(task):
return Response(
response=b'{"error": "Server busy, try again later"}',
status=503,
content_type="application/json"
)

try:
# Return the response from the processing task
return await task
except asyncio.CancelledError:
# Handle task cancellation (timeout or queue full)
logger.warning("Request cancelled due to timeout or queue full")
return Response(
response=b'{"error": "Request cancelled"}',
status=503,
content_type="application/json"
)


if __name__ == "__main__":
app.run(port=8000)
# Start the Quart server with host: 0.0.0.0 port: 8000
app.run(port=8000, host="0.0.0.0")
Loading