diff --git a/tests/entrypoints/openai/responses/test_basic.py b/tests/entrypoints/openai/responses/test_basic.py index dd3a563e9570..0e8c9d94e0bf 100644 --- a/tests/entrypoints/openai/responses/test_basic.py +++ b/tests/entrypoints/openai/responses/test_basic.py @@ -1,6 +1,8 @@ # SPDX-License-Identifier: Apache-2.0 # SPDX-FileCopyrightText: Copyright contributors to the vLLM project +import os + import openai # use the official client for correctness check import openai.types.responses as openai_responses_types import pytest @@ -37,8 +39,8 @@ async def test_instructions(client: openai.AsyncOpenAI): @pytest.mark.asyncio async def test_chat(client: openai.AsyncOpenAI): response = await client.responses.create( + instructions="Finish the answer with QED.", input=[ - {"role": "system", "content": "Finish the answer with QED."}, {"role": "user", "content": "What is 5 * 3?"}, {"role": "assistant", "content": "15. QED."}, {"role": "user", "content": "Multiply the result by 2."}, @@ -67,6 +69,9 @@ async def test_chat_with_input_type(client: openai.AsyncOpenAI): @pytest.mark.asyncio async def test_logprobs(client: openai.AsyncOpenAI): + model = os.environ.get("VLLM_TEST_MODEL", "") + if "gpt-oss" in model: + pytest.skip("logprobs not supported for gpt-oss models") response = await client.responses.create( include=["message.output_text.logprobs"], input="What is 13 * 24?", diff --git a/tests/entrypoints/openai/responses/test_harmony.py b/tests/entrypoints/openai/responses/test_harmony.py index 74f3360df45f..32b755ec438c 100644 --- a/tests/entrypoints/openai/responses/test_harmony.py +++ b/tests/entrypoints/openai/responses/test_harmony.py @@ -507,8 +507,7 @@ async def test_web_search(client: OpenAI, model_name: str): @pytest.mark.asyncio @pytest.mark.parametrize("model_name", [MODEL_NAME]) async def test_code_interpreter(client: OpenAI, model_name: str): - timeout_value = client.timeout * 3 - client_with_timeout = client.with_options(timeout=timeout_value) + client_with_timeout = client.with_options(timeout=1800.0) response = await client_with_timeout.responses.create( model=model_name, @@ -1000,16 +999,16 @@ async def test_mcp_tool_multi_turn(client: OpenAI, model_name: str, server): for msg in response1.output_messages ) tool_response_found = any( - msg.get("author", {}).get("role") == "tool" - and (msg.get("author", {}).get("name") or "").startswith("python") + msg.get("role") == "tool" and (msg.get("name") or "").startswith("python") for msg in response1.output_messages ) assert tool_call_found, "MCP tool call not found in output_messages" assert tool_response_found, "MCP tool response not found in output_messages" # No developer messages expected for elevated tools + # Message.to_dict() flattens author fields to top level developer_msgs = [ - msg for msg in response1.input_messages if msg["author"]["role"] == "developer" + msg for msg in response1.input_messages if msg.get("role") == "developer" ] assert len(developer_msgs) == 0, "No developer message expected for elevated tools" diff --git a/tests/entrypoints/openai/responses/test_serving_responses.py b/tests/entrypoints/openai/responses/test_serving_responses.py index b5d2b24a63a5..4493e1fa8fc9 100644 --- a/tests/entrypoints/openai/responses/test_serving_responses.py +++ b/tests/entrypoints/openai/responses/test_serving_responses.py @@ -27,7 +27,11 @@ ErrorResponse, RequestResponseMetadata, ) -from vllm.entrypoints.openai.responses.context import ConversationContext, SimpleContext +from vllm.entrypoints.openai.responses.context import ( + ConversationContext, + SimpleContext, + StreamingHarmonyContext, +) from vllm.entrypoints.openai.responses.protocol import ResponsesRequest from vllm.entrypoints.openai.responses.serving import ( OpenAIServingResponses, @@ -36,6 +40,8 @@ ) from vllm.entrypoints.openai.responses.streaming_events import ( StreamingState, + _emit_channel_done_events, + emit_content_delta_events, ) from vllm.inputs.data import TokensPrompt from vllm.outputs import CompletionOutput, RequestOutput @@ -463,6 +469,7 @@ def _make_ctx(*, channel, recipient, delta="hello"): """Build a lightweight mock StreamingHarmonyContext.""" ctx = MagicMock() ctx.last_content_delta = delta + ctx.channel_deltas = [(channel, recipient, delta)] ctx.parser.current_channel = channel ctx.parser.current_recipient = recipient return ctx @@ -619,6 +626,52 @@ def _make_serving_instance_with_reasoning(): return serving +def _make_serving_instance_with_harmony(): + """Create an OpenAIServingResponses configured for Harmony models.""" + engine_client = MagicMock() + model_config = MagicMock() + model_config.max_model_len = 100 + model_config.hf_config.model_type = "gpt_oss" + model_config.hf_text_config = MagicMock() + model_config.get_diff_sampling_param.return_value = {} + engine_client.model_config = model_config + engine_client.input_processor = MagicMock() + engine_client.io_processor = MagicMock() + engine_client.renderer = MagicMock() + + models = MagicMock() + tool_server = MagicMock(spec=ToolServer) + tool_server.has_tool.return_value = False + + return OpenAIServingResponses( + engine_client=engine_client, + models=models, + openai_serving_render=MagicMock(), + request_logger=None, + chat_template=None, + chat_template_content_format="auto", + tool_server=tool_server, + ) + + +def _make_streaming_harmony_ctx( + channel_deltas, + *, + expecting_start: bool = False, + assistant_action: bool = False, + parser_messages: list | None = None, +): + """Build a lightweight StreamingHarmonyContext for SSE event tests.""" + ctx = StreamingHarmonyContext(messages=[], available_tools=[]) + ctx.finish_reason = None + ctx.channel_deltas = channel_deltas + ctx.parser = MagicMock() + ctx.parser.messages = parser_messages or [] + ctx.is_expecting_start = lambda: expecting_start + ctx.is_assistant_action_turn = lambda: assistant_action + return ctx + + def _identity_increment(event): """Simple identity callable for _increment_sequence_number_and_return.""" seq = getattr(_identity_increment, "_counter", 0) @@ -877,3 +930,137 @@ async def result_generator(): ] assert len(item_done_events) == 1 assert isinstance(item_done_events[0].item, ResponseReasoningItem) + + +class TestHarmonyStreamingLifecycle: + """Regression tests for Harmony SSE item lifecycle ordering.""" + + @pytest.mark.asyncio + async def test_function_call_done_includes_tail_from_final_batch(self): + """The final batch's argument tail must be streamed before *.done.""" + serving = _make_serving_instance_with_harmony() + contexts = [ + _make_streaming_harmony_ctx( + [("analysis", "functions.get_weather", '{"location":"Ber')] + ), + _make_streaming_harmony_ctx( + [("analysis", "functions.get_weather", 'lin"}')], + expecting_start=True, + ), + ] + + async def result_generator(): + for ctx in contexts: + yield ctx + + request = ResponsesRequest(input="hi", tools=[], stream=True) + sampling_params = SamplingParams(max_tokens=64) + metadata = RequestResponseMetadata(request_id="req") + _identity_increment._counter = 0 # type: ignore[attr-defined] + + events = [] + async for event in serving._process_harmony_streaming_events( + request=request, + sampling_params=sampling_params, + result_generator=result_generator(), + context=SimpleContext(), + model_name="test-model", + tokenizer=MagicMock(), + request_metadata=metadata, + created_time=0, + _increment_sequence_number_and_return=_identity_increment, + ): + events.append(event) + + type_names = [e.type for e in events] + assert type_names == [ + "response.output_item.added", + "response.function_call_arguments.delta", + "response.function_call_arguments.delta", + "response.function_call_arguments.done", + "response.output_item.done", + ] + + deltas = [ + e.delta + for e in events + if e.type == "response.function_call_arguments.delta" + ] + done_event = next( + e for e in events if e.type == "response.function_call_arguments.done" + ) + assert "".join(deltas) == '{"location":"Berlin"}' + assert done_event.arguments == '{"location":"Berlin"}' + + @pytest.mark.asyncio + async def test_channel_transition_closes_reasoning_before_text_done(self): + """analysis -> final in one batch must close reasoning first.""" + serving = _make_serving_instance_with_harmony() + contexts = [ + _make_streaming_harmony_ctx( + [ + ("analysis", None, "thinking"), + ("final", None, "answer"), + ], + expecting_start=True, + ) + ] + + async def result_generator(): + for ctx in contexts: + yield ctx + + request = ResponsesRequest(input="hi", tools=[], stream=True) + sampling_params = SamplingParams(max_tokens=64) + metadata = RequestResponseMetadata(request_id="req") + _identity_increment._counter = 0 # type: ignore[attr-defined] + + events = [] + async for event in serving._process_harmony_streaming_events( + request=request, + sampling_params=sampling_params, + result_generator=result_generator(), + context=SimpleContext(), + model_name="test-model", + tokenizer=MagicMock(), + request_metadata=metadata, + created_time=0, + _increment_sequence_number_and_return=_identity_increment, + ): + events.append(event) + + type_names = [e.type for e in events] + assert type_names == [ + "response.output_item.added", + "response.reasoning_part.added", + "response.reasoning_text.delta", + "response.reasoning_text.done", + "response.reasoning_part.done", + "response.output_item.done", + "response.output_item.added", + "response.content_part.added", + "response.output_text.delta", + "response.output_text.done", + "response.content_part.done", + "response.output_item.done", + ] + + def test_ignored_segments_do_not_contaminate_done_payload(self): + """Unsupported segments should not leak into the later *.done text.""" + ctx = _make_streaming_harmony_ctx( + [ + ("unknown_channel", None, "hidden"), + ("final", None, "visible"), + ] + ) + state = StreamingState() + + events = emit_content_delta_events(ctx, state) + done_events = _emit_channel_done_events(state) + + deltas = [e.delta for e in events if e.type == "response.output_text.delta"] + done_event = next( + e for e in done_events if e.type == "response.output_text.done" + ) + assert deltas == ["visible"] + assert done_event.text == "visible" diff --git a/vllm/entrypoints/openai/responses/context.py b/vllm/entrypoints/openai/responses/context.py index bab59e0aa1ec..fc1f16a3c73c 100644 --- a/vllm/entrypoints/openai/responses/context.py +++ b/vllm/entrypoints/openai/responses/context.py @@ -845,6 +845,7 @@ def __init__(self, *args, **kwargs): self.last_tok = None self.first_tok_of_message = True self.last_content_delta = None + self.channel_deltas: list[tuple[str | None, str | None, str]] = [] @property def messages(self) -> list: @@ -854,6 +855,7 @@ def append_output(self, output: RequestOutput) -> None: # append_output is called for each output token in streaming case, # so we only want to add the prompt tokens once for each message. self.last_content_delta = None + self.channel_deltas = [] if self.first_tok_of_message: self._update_prefill_token_usage(output) # Reset self.first_tok_of_message if needed: @@ -864,7 +866,21 @@ def append_output(self, output: RequestOutput) -> None: last_delta_text = "" for tok in output.outputs[0].token_ids: self.parser.process(tok) - last_delta_text += self.parser.last_content_delta or "" + tok_delta = self.parser.last_content_delta + if tok_delta: + channel = self.parser.current_channel + recipient = self.parser.current_recipient + # Coalesce consecutive tokens in the same channel+recipient + if ( + self.channel_deltas + and self.channel_deltas[-1][0] == channel + and self.channel_deltas[-1][1] == recipient + ): + ch, rcp, prev = self.channel_deltas[-1] + self.channel_deltas[-1] = (ch, rcp, prev + tok_delta) + else: + self.channel_deltas.append((channel, recipient, tok_delta)) + last_delta_text += tok_delta if last_delta_text: self.last_content_delta = last_delta_text self._update_decode_token_usage(output) diff --git a/vllm/entrypoints/openai/responses/serving.py b/vllm/entrypoints/openai/responses/serving.py index 574282c4cdc6..0cb03770cf65 100644 --- a/vllm/entrypoints/openai/responses/serving.py +++ b/vllm/entrypoints/openai/responses/serving.py @@ -98,8 +98,8 @@ ) from vllm.entrypoints.openai.responses.streaming_events import ( StreamingState, + _emit_channel_done_events, emit_content_delta_events, - emit_previous_item_done_events, emit_tool_action_events, ) from vllm.entrypoints.openai.responses.utils import ( @@ -1463,9 +1463,7 @@ async def _process_simple_streaming_events( id=current_item_id, call_id=current_tool_call_id, name=current_tool_call_name, - arguments=delta_message.tool_calls[ - 0 - ].function.arguments, + arguments="", status="in_progress", ), ) @@ -1917,21 +1915,36 @@ async def _process_harmony_streaming_events( # finish_reason='error' indicates a retryable error self._raise_if_error(ctx.finish_reason, request.request_id) - if ctx.is_expecting_start(): - if len(ctx.parser.messages) > 0: - previous_item = ctx.parser.messages[-1] - for event in emit_previous_item_done_events(previous_item, state): - yield _increment_sequence_number_and_return(event) - state.reset_for_new_item() - - # Stream the output of a harmony message + # Stream the output of a harmony message. + # emit_content_delta_events detects mid-message channel + # transitions (e.g. analysis → final) and emits done events + # for the previous channel before starting the new one. for event in emit_content_delta_events(ctx, state): yield _increment_sequence_number_and_return(event) - # Stream tool call outputs + # Stream synthetic browser/web-search events. Function-call, + # MCP, and code-interpreter items are finalized through the + # state-based channel flush below so they do not double-emit + # completion events. for event in emit_tool_action_events(ctx, state, self.tool_server): yield _increment_sequence_number_and_return(event) + # If this batch completed a Harmony message, flush done + # events only after the batch's deltas have been emitted. + # Otherwise the current batch's tail would be missing from + # the corresponding *.done payload. + if ctx.is_expecting_start(): + if state.sent_output_item_added and state.last_channel is not None: + for event in _emit_channel_done_events(state): + yield _increment_sequence_number_and_return(event) + state.reset_for_new_item() + + # Flush done events for the final item when the stream ends + # without a subsequent message boundary. + if state.sent_output_item_added and state.last_channel is not None: + for event in _emit_channel_done_events(state): + yield _increment_sequence_number_and_return(event) + async def responses_stream_generator( self, request: ResponsesRequest, diff --git a/vllm/entrypoints/openai/responses/streaming_events.py b/vllm/entrypoints/openai/responses/streaming_events.py index cc242e7baa83..12dc3bac3ad0 100644 --- a/vllm/entrypoints/openai/responses/streaming_events.py +++ b/vllm/entrypoints/openai/responses/streaming_events.py @@ -54,6 +54,7 @@ from openai.types.responses.response_reasoning_item import ( Content as ResponseReasoningTextContent, ) +from openai_harmony import Author, Role, TextContent from openai_harmony import Message as HarmonyMessage from vllm.entrypoints.mcp.tool_server import ToolServer @@ -97,12 +98,20 @@ class StreamingState: sent_output_item_added: bool = False is_first_function_call_delta: bool = False + # Channel-transition tracking for mid-message done events + last_channel: str | None = None + last_recipient: str | None = None + accumulated_text: str = "" + def reset_for_new_item(self) -> None: """Reset state when expecting a new output item.""" self.current_output_index += 1 self.sent_output_item_added = False self.is_first_function_call_delta = False self.current_call_id = "" + self.last_channel = None + self.last_recipient = None + self.accumulated_text = "" def is_mcp_tool_by_namespace(recipient: str | None) -> bool: @@ -237,6 +246,7 @@ def emit_function_call_delta_events( events: list[StreamingResponsesResponse] = [] if state.is_first_function_call_delta is False: state.is_first_function_call_delta = True + state.sent_output_item_added = True state.current_item_id = f"fc_{random_uuid()}" state.current_call_id = f"call_{random_uuid()}" tool_call_item = ResponseFunctionToolCall( @@ -547,22 +557,13 @@ def emit_mcp_completion_events( # ===================================================================== -def emit_content_delta_events( - ctx: StreamingHarmonyContext, +def _emit_delta_for_channel( + channel: str | None, + recipient: str | None, + delta: str, state: StreamingState, ) -> list[StreamingResponsesResponse]: - """Emit events for content delta streaming based on channel type. - - This is a Harmony-specific dispatcher that extracts values from the - Harmony context and delegates to shared leaf helpers. - """ - delta = ctx.last_content_delta - if not delta: - return [] - - channel = ctx.parser.current_channel - recipient = ctx.parser.current_recipient - + """Emit events for a single (channel, recipient, delta) triple.""" if channel in ("final", "commentary") and recipient is None: # Preambles (commentary with no recipient) and final messages # are both user-visible text. @@ -584,6 +585,71 @@ def emit_content_delta_events( return [] +def _needs_channel_transition( + state: StreamingState, + channel: str | None, + recipient: str | None, +) -> bool: + """Return True when the channel/recipient changed from the last delta.""" + if state.last_channel is None: + return False + return channel != state.last_channel or recipient != state.last_recipient + + +def _emit_channel_done_events( + state: StreamingState, +) -> list[StreamingResponsesResponse]: + """Emit done events for the channel currently tracked in state.""" + synthetic_msg = HarmonyMessage( + author=Author(role=Role.ASSISTANT), + content=[TextContent(text=state.accumulated_text)], + channel=state.last_channel, + recipient=state.last_recipient, + ) + return emit_previous_item_done_events(synthetic_msg, state) + + +def emit_content_delta_events( + ctx: StreamingHarmonyContext, + state: StreamingState, +) -> list[StreamingResponsesResponse]: + """Emit events for content delta streaming based on channel type. + + This is a Harmony-specific dispatcher that extracts values from the + Harmony context and delegates to shared leaf helpers. + + When a token batch crosses a channel boundary (e.g. analysis -> + commentary), ctx.channel_deltas contains one entry per contiguous + (channel, recipient) run so each segment is emitted with the correct + event type. + + If the channel/recipient changes from the previous delta (either + within this batch or across batches), done events are emitted for + the previous channel before starting the new one. + """ + if not ctx.channel_deltas: + return [] + + events: list[StreamingResponsesResponse] = [] + for channel, recipient, delta in ctx.channel_deltas: + # Detect mid-message channel transition (e.g. analysis → final) + if state.sent_output_item_added and _needs_channel_transition( + state, channel, recipient + ): + events.extend(_emit_channel_done_events(state)) + state.reset_for_new_item() + + segment_events = _emit_delta_for_channel(channel, recipient, delta, state) + if not segment_events: + continue + + events.extend(segment_events) + state.last_channel = channel + state.last_recipient = recipient + state.accumulated_text += delta + return events + + def emit_previous_item_done_events( previous_item: HarmonyMessage, state: StreamingState, @@ -778,21 +844,4 @@ def emit_tool_action_events( ): events.extend(emit_browser_tool_events(previous_item, state)) - # Handle tool completion - if ( - tool_server is not None - and previous_item.recipient is not None - and state.current_item_id is not None - and state.sent_output_item_added - ): - recipient = previous_item.recipient - if recipient == "python": - events.extend(emit_code_interpreter_completion_events(previous_item, state)) - elif recipient.startswith("mcp.") or is_mcp_tool_by_namespace(recipient): - events.extend( - emit_mcp_completion_events( - recipient, previous_item.content[0].text, state - ) - ) - return events