Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
5e08275
support mcp tools streaming response in background mode
wuhang2014 Aug 29, 2025
72de2fc
support mcp tools streaming response in background mode
wuhang2014 Aug 29, 2025
d71f945
support mcp tools streaming response in background mode
wuhang2014 Aug 29, 2025
a17a3bd
support mcp tools streaming response in background mode
wuhang2014 Aug 29, 2025
114a343
support mcp tools streaming response in background mode
wuhang2014 Aug 29, 2025
5400086
support mcp tools streaming response in background mode
wuhang2014 Aug 29, 2025
3582a05
support mcp tools streaming response in background mode
wuhang2014 Aug 29, 2025
ddbb475
support mcp tools streaming response in background mode
wuhang2014 Aug 29, 2025
eb4401b
support mcp tools streaming response in background mode
wuhang2014 Aug 29, 2025
49631fe
support mcp tools streaming response in background mode
wuhang2014 Aug 29, 2025
21014f6
support mcp tools streaming response in background mode
wuhang2014 Sep 1, 2025
570b000
support mcp tools streaming response in background mode
wuhang2014 Sep 1, 2025
e89d424
support mcp tools streaming response in background mode
wuhang2014 Sep 1, 2025
4bb8cfb
support mcp tools streaming response in background mode
wuhang2014 Sep 1, 2025
d18dd46
Merge branch 'main' into streamingmode_responsesapi
wuhang2014 Sep 1, 2025
0699719
Merge branch 'main' into streamingmode_responsesapi
wuhang2014 Sep 3, 2025
29b2df9
Merge branch 'main' into streamingmode_responsesapi
wuhang2014 Sep 3, 2025
4983867
Merge branch 'main' into streamingmode_responsesapi
wuhang2014 Sep 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion tests/entrypoints/openai/test_response_api_with_harmony.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,8 @@ async def test_stateful_multi_turn(client: OpenAI, model_name: str):

@pytest.mark.asyncio
@pytest.mark.parametrize("model_name", [MODEL_NAME])
async def test_streaming(client: OpenAI, model_name: str):
@pytest.mark.parametrize("background", [True, False])
async def test_streaming(client: OpenAI, model_name: str, background: bool):
# TODO: Add back when web search and code interpreter are available in CI
prompts = [
"tell me a story about a cat in 20 words",
Expand All @@ -300,11 +301,16 @@ async def test_streaming(client: OpenAI, model_name: str):
# },
],
stream=True,
background=background,
)

events = []
current_event_mode = None
resp_id = None
async for event in response:
if event.type == "response.created":
resp_id = event.response.id

if current_event_mode != event.type:
current_event_mode = event.type
print(f"\n[{event.type}] ", end="", flush=True)
Expand All @@ -322,6 +328,17 @@ async def test_streaming(client: OpenAI, model_name: str):

assert len(events) > 0

if background:
starting_after = 5
async with await client.responses.retrieve(
response_id=resp_id,
stream=True,
starting_after=starting_after) as stream:
counter = starting_after
async for event in stream:
counter += 1
assert event == events[counter]


@pytest.mark.asyncio
@pytest.mark.parametrize("model_name", [MODEL_NAME])
Expand Down
16 changes: 14 additions & 2 deletions vllm/entrypoints/openai/api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -616,21 +616,33 @@ async def create_responses(request: ResponsesRequest, raw_request: Request):


@router.get("/v1/responses/{response_id}")
async def retrieve_responses(response_id: str, raw_request: Request):
async def retrieve_responses(
response_id: str,
raw_request: Request,
starting_after: Optional[int] = None,
stream: Optional[bool] = False,
):
handler = responses(raw_request)
if handler is None:
return base(raw_request).create_error_response(
message="The model does not support Responses API")

try:
response = await handler.retrieve_responses(response_id)
response = await handler.retrieve_responses(
response_id,
starting_after=starting_after,
stream=stream,
)
except Exception as e:
raise HTTPException(status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value,
detail=str(e)) from e

if isinstance(response, ErrorResponse):
return JSONResponse(content=response.model_dump(),
status_code=response.error.code)
elif stream:
return StreamingResponse(content=response,
media_type="text/event-stream")
return JSONResponse(content=response.model_dump())


Expand Down
129 changes: 106 additions & 23 deletions vllm/entrypoints/openai/serving_responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import asyncio
import json
import time
from collections import deque
from collections.abc import AsyncGenerator, AsyncIterator, Sequence
from contextlib import AsyncExitStack
from copy import copy
Expand Down Expand Up @@ -55,7 +56,7 @@
# yapf: enable
from vllm.entrypoints.openai.serving_engine import OpenAIServing
from vllm.entrypoints.openai.serving_models import OpenAIServingModels
from vllm.entrypoints.tool_server import MCPToolServer, ToolServer
from vllm.entrypoints.tool_server import ToolServer
from vllm.inputs.data import TokensPrompt as EngineTokensPrompt
from vllm.logger import init_logger
from vllm.logprobs import Logprob as SampleLogprob
Expand Down Expand Up @@ -168,6 +169,11 @@ def __init__(
# never remove messages from the store.
self.msg_store: dict[str, list[ChatCompletionMessageParam]] = {}

# HACK(wuhang): This is a hack. We should use a better store.
# FIXME: If enable_store=True, this may cause a memory leak since we
# never remove events from the store.
self.event_store: dict[str, tuple[deque[str], asyncio.Event]] = {}

self.background_tasks: dict[str, asyncio.Task] = {}

self.tool_server = tool_server
Expand Down Expand Up @@ -249,15 +255,6 @@ async def create_responses(
if raw_request:
raw_request.state.request_metadata = request_metadata

if self.tool_server is not None and isinstance(
self.tool_server,
MCPToolServer) and request.stream and request.tools and any(
tool.type in ["web_search_preview", "code_interpreter"]
for tool in request.tools):
return self.create_error_response(
"MCP tool server is not supported in background mode and "
"streaming mode")

# Schedule the request and get the result generator.
generators: list[AsyncGenerator[ConversationContext, None]] = []

Expand Down Expand Up @@ -329,25 +326,44 @@ async def create_responses(
self.response_store[response.id] = response

# Run the request in the background.
task = asyncio.create_task(
self._run_background_request(
request,
sampling_params,
result_generator,
context,
model_name,
tokenizer,
request_metadata,
created_time,
),
name=f"create_{response.id}",
)
if request.stream:
task = asyncio.create_task(
self._run_background_request_stream(
request,
sampling_params,
result_generator,
context,
model_name,
tokenizer,
request_metadata,
created_time,
),
name=f"create_{request.request_id}",
)
else:
task = asyncio.create_task(
self._run_background_request(
request,
sampling_params,
result_generator,
context,
model_name,
tokenizer,
request_metadata,
created_time,
),
name=f"create_{response.id}",
)

# For cleanup.
response_id = response.id
self.background_tasks[response_id] = task
task.add_done_callback(
lambda _: self.background_tasks.pop(response_id, None))

if request.stream:
return self.responses_background_stream_generator(
request.request_id)
return response

if request.stream:
Expand Down Expand Up @@ -736,6 +752,40 @@ def _construct_input_messages_with_harmony(
prev_outputs.append(response_msg)
return messages

async def _run_background_request_stream(
self,
request: ResponsesRequest,
*args,
**kwargs,
):
event_deque: deque[str] = deque()
new_event_signal = asyncio.Event()
self.event_store[request.request_id] = (event_deque, new_event_signal)
response = None
try:
generator = self.responses_stream_generator(
request, *args, **kwargs)
async for event in generator:
event_deque.append(event)
new_event_signal.set() # Signal new event available
except Exception as e:
logger.exception("Background request failed for %s",
request.request_id)
response = self.create_error_response(str(e))
finally:
# Mark as finished with a special marker
event_deque.append("__STREAM_END__")
new_event_signal.set()

if response is not None and isinstance(response, ErrorResponse):
# If the request has failed, update the status to "failed".
response_id = request.request_id
async with self.response_store_lock:
stored_response = self.response_store.get(response_id)
assert stored_response is not None
if stored_response.status not in ("completed", "cancelled"):
stored_response.status = "failed"

async def _run_background_request(
self,
request: ResponsesRequest,
Expand All @@ -759,9 +809,36 @@ async def _run_background_request(
if stored_response.status not in ("completed", "cancelled"):
stored_response.status = "failed"

async def responses_background_stream_generator(
self,
response_id: str,
starting_after: Optional[int] = None,
):
if response_id not in self.event_store:
raise ValueError(f"Unknown response_id: {response_id}")

event_deque, new_event_signal = self.event_store[response_id]
start_index = 0 if starting_after is None else starting_after + 1
current_index = start_index

while True:
new_event_signal.clear()

# Yield existing events from start_index
while current_index < len(event_deque):
event = event_deque[current_index]
if event == "__STREAM_END__":
return
yield event
current_index += 1

await new_event_signal.wait()

async def retrieve_responses(
self,
response_id: str,
starting_after: Optional[int],
stream: Optional[bool],
) -> Union[ErrorResponse, ResponsesResponse]:
if not response_id.startswith("resp_"):
return self._make_invalid_id_error(response_id)
Expand All @@ -771,6 +848,12 @@ async def retrieve_responses(

if response is None:
return self._make_not_found_error(response_id)

if stream:
return self.responses_background_stream_generator(
response_id,
starting_after,
)
return response

async def cancel_responses(
Expand Down