diff --git a/vllm/entrypoints/openai/serving_responses.py b/vllm/entrypoints/openai/serving_responses.py index 1e3746e956e0..089f50a1e6a3 100644 --- a/vllm/entrypoints/openai/serving_responses.py +++ b/vllm/entrypoints/openai/serving_responses.py @@ -2,6 +2,7 @@ # SPDX-FileCopyrightText: Copyright contributors to the vLLM project import asyncio +import json import time from collections.abc import AsyncGenerator, AsyncIterator from contextlib import AsyncExitStack @@ -10,10 +11,22 @@ from typing import Any, Callable, Final, Optional, Union import jinja2 +import openai.types.responses as openai_responses_types from fastapi import Request -from openai.types.responses import (ResponseFunctionToolCall, - ResponseOutputItem, ResponseOutputMessage, - ResponseOutputText, ResponseReasoningItem) +from openai import BaseModel +# yapf conflicts with isort for this block +# yapf: disable +from openai.types.responses import (ResponseContentPartDoneEvent, + ResponseCreatedEvent, + ResponseFunctionToolCall, + ResponseInProgressEvent, + ResponseOutputItem, + ResponseOutputItemDoneEvent, + ResponseOutputMessage, ResponseOutputText, + ResponseReasoningItem, + ResponseReasoningTextDeltaEvent, + ResponseReasoningTextDoneEvent) +# yapf: enable from openai.types.responses.response_reasoning_item import ( Content as ResponseReasoningTextContent) from openai_harmony import Message as OpenAIHarmonyMessage @@ -330,8 +343,15 @@ async def create_responses( return response if request.stream: - raise NotImplementedError( - "Streaming responses are not supported") + return self.responses_stream_generator( + request, + sampling_params, + result_generator, + context, + model_name, + tokenizer, + request_metadata, + ) try: return await self.responses_full_generator( @@ -744,3 +764,423 @@ def _make_store_not_supported_error(self) -> ErrorResponse: "starting the vLLM server."), status_code=HTTPStatus.BAD_REQUEST, ) + + async def responses_stream_generator( + self, + request: ResponsesRequest, + sampling_params: SamplingParams, + result_generator: AsyncIterator[Optional[ConversationContext]], + context: ConversationContext, + model_name: str, + tokenizer: AnyTokenizer, + request_metadata: RequestResponseMetadata, + created_time: Optional[int] = None, + ) -> AsyncGenerator[str, None]: + # TODO: + # 1. Handle disconnect + + if not isinstance(context, StreamingHarmonyContext): + raise NotImplementedError( + "Streaming is not supported for responses API without Harmony." + ) + + created_time = created_time or int(time.time()) + + sequence_number = 0 + + def _send_event(event: BaseModel): + nonlocal sequence_number + # Set sequence_number if the event has this attribute + if hasattr(event, 'sequence_number'): + event.sequence_number = sequence_number + sequence_number += 1 + # Get event type from the event's type field if it exists + event_type = getattr(event, 'type', 'unknown') + return (f"event: {event_type}\n" + f"data: {event.model_dump_json(indent=None)}\n\n") + + current_content_index = 0 # FIXME: this number is never changed + current_output_index = 0 + current_item_id = "" # FIXME: this number is never changed + sent_output_item_added = False + + initial_response = ResponsesResponse.from_request( + request, + sampling_params, + model_name=model_name, + created_time=created_time, + output=[], + status="in_progress", + usage=None, + ).model_dump() + yield _send_event( + ResponseCreatedEvent( + type="response.created", + sequence_number=-1, + response=initial_response, + )) + yield _send_event( + ResponseInProgressEvent( + type="response.in_progress", + sequence_number=-1, + response=initial_response, + )) + + async for ctx in result_generator: + + assert isinstance(ctx, StreamingHarmonyContext) + + if ctx.is_expecting_start(): + current_output_index += 1 + sent_output_item_added = False + + if len(ctx.parser.messages) > 0: + previous_item = ctx.parser.messages[-1] + if previous_item.recipient is not None: + # Deal with tool call here + pass + elif previous_item.channel == "analysis": + reasoning_item = ResponseReasoningItem( + type="reasoning", + content=[ + ResponseReasoningTextContent( + text=previous_item.content[0].text), + ], + status="completed", + ) + yield _send_event( + ResponseReasoningTextDoneEvent( + type="response.reasoning_text.done", + item_id=current_item_id, + sequence_number=-1, + output_index=current_output_index, + content_index=current_content_index, + text=previous_item.content[0].text, + )) + yield _send_event( + ResponseContentPartDoneEvent( + type="response.content_part.done", + item_id=current_item_id, + sequence_number=-1, + output_index=current_output_index, + content_index=current_content_index, + part=reasoning_item, + )) + yield _send_event( + ResponseOutputItemDoneEvent( + type="response.output_item.done", + sequence_number=-1, + output_index=current_output_index, + item=reasoning_item, + )) + elif previous_item.channel == "final": + text_content = ResponseOutputText( + type="output_text", + text=previous_item.content[0].text, + annotations=[], + ) + yield _send_event( + openai_responses_types.ResponseTextDoneEvent( + type="response.output_text.done", + sequence_number=-1, + output_index=current_output_index, + content_index=current_content_index, + text=previous_item.content[0].text, + logprobs=[], + item_id=current_item_id, + )) + yield _send_event( + openai_responses_types. + ResponseContentPartDoneEvent( + type="response.content_part.done", + sequence_number=-1, + item_id=current_item_id, + output_index=current_output_index, + content_index=current_content_index, + part=text_content, + )) + yield _send_event( + openai_responses_types.ResponseOutputItemDoneEvent( + type="response.output_item.done", + sequence_number=-1, + output_index=current_output_index, + item=ResponseOutputMessage( + id=current_item_id, + type="message", + role="assistant", + content=[text_content], + status="completed", + ), + )) + + if ctx.parser.last_content_delta: + if (ctx.parser.current_channel == "final" + and ctx.parser.current_recipient is None): + if not sent_output_item_added: + sent_output_item_added = True + yield _send_event( + openai_responses_types. + ResponseOutputItemAddedEvent( + type="response.output_item.added", + sequence_number=-1, + output_index=current_output_index, + item=openai_responses_types. + ResponseOutputMessage( + id=current_item_id, + type="message", + role="assistant", + content=[], + status="in_progress", + ), + )) + yield _send_event( + openai_responses_types. + ResponseContentPartAddedEvent( + type="response.content_part.added", + sequence_number=-1, + output_index=current_output_index, + item_id=current_item_id, + content_index=current_content_index, + part=openai_responses_types.ResponseOutputText( + type="output_text", + text="", + annotations=[], + logprobs=[], + ), + )) + yield _send_event( + openai_responses_types.ResponseTextDeltaEvent( + type="response.output_text.delta", + sequence_number=-1, + content_index=current_content_index, + output_index=current_output_index, + item_id=current_item_id, + delta=ctx.parser.last_content_delta, + # TODO, use logprobs from ctx.last_request_output + logprobs=[], + )) + elif (ctx.parser.current_channel == "analysis" + and ctx.parser.current_recipient is None): + if not sent_output_item_added: + sent_output_item_added = True + yield _send_event( + openai_responses_types. + ResponseOutputItemAddedEvent( + type="response.output_item.added", + sequence_number=-1, + output_index=current_output_index, + item=openai_responses_types. + ResponseReasoningItem( + type="reasoning", + id=current_item_id, + summary=[], + status="in_progress", + ), + )) + yield _send_event( + openai_responses_types. + ResponseContentPartAddedEvent( + type="response.content_part.added", + sequence_number=-1, + output_index=current_output_index, + item_id=current_item_id, + content_index=current_content_index, + part=openai_responses_types.ResponseOutputText( + type="output_text", + text="", + annotations=[], + logprobs=[], + ), + )) + yield _send_event( + ResponseReasoningTextDeltaEvent( + type="response.reasoning_text.delta", + item_id=current_item_id, + output_index=current_output_index, + content_index=current_content_index, + delta=ctx.parser.last_content_delta, + sequence_number=-1, + )) + + if ctx.is_assistant_action_turn() and len(ctx.parser.messages) > 0: + previous_item = ctx.parser.messages[-1] + if (self.tool_server is not None + and self.tool_server.has_tool("browser") + and previous_item.recipient is not None + and previous_item.recipient.startswith("browser.")): + function_name = previous_item.recipient[len("browser."):] + action = None + parsed_args = json.loads(previous_item.content[0].text) + if function_name == "search": + action = (openai_responses_types. + response_function_web_search.ActionSearch( + type="search", + query=parsed_args["query"], + )) + elif function_name == "open": + action = ( + openai_responses_types. + response_function_web_search.ActionOpenPage( + type="open_page", + # TODO: translate to url + url=f"cursor:{parsed_args.get('cursor', '')}", + )) + elif function_name == "find": + action = ( + openai_responses_types. + response_function_web_search.ActionFind( + type="find", + pattern=parsed_args["pattern"], + # TODO: translate to url + url=f"cursor:{parsed_args.get('cursor', '')}", + )) + else: + raise ValueError( + f"Unknown function name: {function_name}") + + yield _send_event( + openai_responses_types.ResponseOutputItemAddedEvent( + type="response.output_item.added", + sequence_number=-1, + output_index=current_output_index, + item=openai_responses_types. + response_function_web_search. + ResponseFunctionWebSearch( + # TODO: generate a unique id for web search call + type="web_search_call", + id=current_item_id, + action=action, + status="in_progress", + ), + )) + yield _send_event( + openai_responses_types. + ResponseWebSearchCallInProgressEvent( + type="response.web_search_call.in_progress", + sequence_number=-1, + output_index=current_output_index, + item_id=current_item_id, + )) + yield _send_event( + openai_responses_types. + ResponseWebSearchCallSearchingEvent( + type="response.web_search_call.searching", + sequence_number=-1, + output_index=current_output_index, + item_id=current_item_id, + )) + + # enqueue + yield _send_event( + openai_responses_types. + ResponseWebSearchCallCompletedEvent( + type="response.web_search_call.completed", + sequence_number=-1, + output_index=current_output_index, + item_id=current_item_id, + )) + yield _send_event( + openai_responses_types.ResponseOutputItemDoneEvent( + type="response.output_item.done", + sequence_number=-1, + output_index=current_output_index, + item=openai_responses_types. + ResponseFunctionWebSearch( + type="web_search_call", + id=current_item_id, + action=action, + status="completed", + ), + )) + + if (self.tool_server is not None + and self.tool_server.has_tool("python") + and previous_item.recipient is not None + and previous_item.recipient.startswith("python")): + yield _send_event( + openai_responses_types.ResponseOutputItemAddedEvent( + type="response.output_item.added", + sequence_number=-1, + output_index=current_output_index, + item=openai_responses_types. + ResponseCodeInterpreterToolCallParam( + type="code_interpreter_call", + id=current_item_id, + code="", + container_id="auto", + outputs=[], + status="in_progress", + ), + )) + yield _send_event( + openai_responses_types. + ResponseCodeInterpreterCallInProgressEvent( + type="response.code_interpreter_call.in_progress", + sequence_number=-1, + output_index=current_output_index, + item_id=current_item_id, + )) + # TODO: do we need to add delta event here? + yield _send_event( + openai_responses_types. + ResponseCodeInterpreterCallCodeDoneEvent( + type="response.code_interpreter_call_code.done", + sequence_number=-1, + output_index=current_output_index, + item_id=current_item_id, + code=previous_item.content[0].text)) + yield _send_event( + openai_responses_types. + ResponseCodeInterpreterCallInterpretingEvent( + type="response.code_interpreter_call.interpreting", + sequence_number=-1, + output_index=current_output_index, + item_id=current_item_id, + )) + yield _send_event( + openai_responses_types. + ResponseCodeInterpreterCallCompletedEvent( + type="response.code_interpreter_call.completed", + sequence_number=-1, + output_index=current_output_index, + item_id=current_item_id, + )) + yield _send_event( + openai_responses_types.ResponseOutputItemDoneEvent( + type="response.output_item.done", + sequence_number=-1, + output_index=current_output_index, + item=openai_responses_types. + ResponseCodeInterpreterToolCallParam( + type="code_interpreter_call", + id=current_item_id, + code=previous_item.content[0].text, + container_id="auto", + # TODO: add outputs here + outputs=[], + status="completed", + ), + )) + + async def empty_async_generator(): + # A hack to trick Python to think this is a generator but in fact + # it immediately returns. + if False: + yield + + final_response = await self.responses_full_generator( + request, + sampling_params, + empty_async_generator(), + context, + model_name, + tokenizer, + request_metadata, + created_time=created_time, + ) + yield _send_event( + openai_responses_types.ResponseCompletedEvent( + type="response.completed", + sequence_number=-1, + response=final_response.model_dump(), + ))