Skip to content
7 changes: 6 additions & 1 deletion tests/entrypoints/openai/responses/test_basic.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project

import os

import openai # use the official client for correctness check
import openai.types.responses as openai_responses_types
import pytest
Expand Down Expand Up @@ -37,8 +39,8 @@ async def test_instructions(client: openai.AsyncOpenAI):
@pytest.mark.asyncio
async def test_chat(client: openai.AsyncOpenAI):
response = await client.responses.create(
instructions="Finish the answer with QED.",
input=[
{"role": "system", "content": "Finish the answer with QED."},
{"role": "user", "content": "What is 5 * 3?"},
{"role": "assistant", "content": "15. QED."},
{"role": "user", "content": "Multiply the result by 2."},
Expand Down Expand Up @@ -67,6 +69,9 @@ async def test_chat_with_input_type(client: openai.AsyncOpenAI):

@pytest.mark.asyncio
async def test_logprobs(client: openai.AsyncOpenAI):
model = os.environ.get("VLLM_TEST_MODEL", "")
if "gpt-oss" in model:
pytest.skip("logprobs not supported for gpt-oss models")
response = await client.responses.create(
include=["message.output_text.logprobs"],
input="What is 13 * 24?",
Expand Down
9 changes: 4 additions & 5 deletions tests/entrypoints/openai/responses/test_harmony.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,8 +507,7 @@ async def test_web_search(client: OpenAI, model_name: str):
@pytest.mark.asyncio
@pytest.mark.parametrize("model_name", [MODEL_NAME])
async def test_code_interpreter(client: OpenAI, model_name: str):
timeout_value = client.timeout * 3
client_with_timeout = client.with_options(timeout=timeout_value)
client_with_timeout = client.with_options(timeout=1800.0)

response = await client_with_timeout.responses.create(
model=model_name,
Expand Down Expand Up @@ -1000,16 +999,16 @@ async def test_mcp_tool_multi_turn(client: OpenAI, model_name: str, server):
for msg in response1.output_messages
)
tool_response_found = any(
msg.get("author", {}).get("role") == "tool"
and (msg.get("author", {}).get("name") or "").startswith("python")
msg.get("role") == "tool" and (msg.get("name") or "").startswith("python")
for msg in response1.output_messages
)
assert tool_call_found, "MCP tool call not found in output_messages"
assert tool_response_found, "MCP tool response not found in output_messages"

# No developer messages expected for elevated tools
# Message.to_dict() flattens author fields to top level
developer_msgs = [
msg for msg in response1.input_messages if msg["author"]["role"] == "developer"
msg for msg in response1.input_messages if msg.get("role") == "developer"
]
assert len(developer_msgs) == 0, "No developer message expected for elevated tools"

Expand Down
189 changes: 188 additions & 1 deletion tests/entrypoints/openai/responses/test_serving_responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@
ErrorResponse,
RequestResponseMetadata,
)
from vllm.entrypoints.openai.responses.context import ConversationContext, SimpleContext
from vllm.entrypoints.openai.responses.context import (
ConversationContext,
SimpleContext,
StreamingHarmonyContext,
)
from vllm.entrypoints.openai.responses.protocol import ResponsesRequest
from vllm.entrypoints.openai.responses.serving import (
OpenAIServingResponses,
Expand All @@ -36,6 +40,8 @@
)
from vllm.entrypoints.openai.responses.streaming_events import (
StreamingState,
_emit_channel_done_events,
emit_content_delta_events,
)
from vllm.inputs.data import TokensPrompt
from vllm.outputs import CompletionOutput, RequestOutput
Expand Down Expand Up @@ -463,6 +469,7 @@ def _make_ctx(*, channel, recipient, delta="hello"):
"""Build a lightweight mock StreamingHarmonyContext."""
ctx = MagicMock()
ctx.last_content_delta = delta
ctx.channel_deltas = [(channel, recipient, delta)]
ctx.parser.current_channel = channel
ctx.parser.current_recipient = recipient
return ctx
Expand Down Expand Up @@ -619,6 +626,52 @@ def _make_serving_instance_with_reasoning():
return serving


def _make_serving_instance_with_harmony():
"""Create an OpenAIServingResponses configured for Harmony models."""
engine_client = MagicMock()
model_config = MagicMock()
model_config.max_model_len = 100
model_config.hf_config.model_type = "gpt_oss"
model_config.hf_text_config = MagicMock()
model_config.get_diff_sampling_param.return_value = {}
engine_client.model_config = model_config
engine_client.input_processor = MagicMock()
engine_client.io_processor = MagicMock()
engine_client.renderer = MagicMock()

models = MagicMock()
tool_server = MagicMock(spec=ToolServer)
tool_server.has_tool.return_value = False

return OpenAIServingResponses(
engine_client=engine_client,
models=models,
openai_serving_render=MagicMock(),
request_logger=None,
chat_template=None,
chat_template_content_format="auto",
tool_server=tool_server,
)


def _make_streaming_harmony_ctx(
channel_deltas,
*,
expecting_start: bool = False,
assistant_action: bool = False,
parser_messages: list | None = None,
):
"""Build a lightweight StreamingHarmonyContext for SSE event tests."""
ctx = StreamingHarmonyContext(messages=[], available_tools=[])
ctx.finish_reason = None
ctx.channel_deltas = channel_deltas
ctx.parser = MagicMock()
ctx.parser.messages = parser_messages or []
ctx.is_expecting_start = lambda: expecting_start
ctx.is_assistant_action_turn = lambda: assistant_action
return ctx


def _identity_increment(event):
"""Simple identity callable for _increment_sequence_number_and_return."""
seq = getattr(_identity_increment, "_counter", 0)
Expand Down Expand Up @@ -877,3 +930,137 @@ async def result_generator():
]
assert len(item_done_events) == 1
assert isinstance(item_done_events[0].item, ResponseReasoningItem)


class TestHarmonyStreamingLifecycle:
"""Regression tests for Harmony SSE item lifecycle ordering."""

@pytest.mark.asyncio
async def test_function_call_done_includes_tail_from_final_batch(self):
"""The final batch's argument tail must be streamed before *.done."""
serving = _make_serving_instance_with_harmony()
contexts = [
_make_streaming_harmony_ctx(
[("analysis", "functions.get_weather", '{"location":"Ber')]
),
_make_streaming_harmony_ctx(
[("analysis", "functions.get_weather", 'lin"}')],
expecting_start=True,
),
]

async def result_generator():
for ctx in contexts:
yield ctx

request = ResponsesRequest(input="hi", tools=[], stream=True)
sampling_params = SamplingParams(max_tokens=64)
metadata = RequestResponseMetadata(request_id="req")
_identity_increment._counter = 0 # type: ignore[attr-defined]

events = []
async for event in serving._process_harmony_streaming_events(
request=request,
sampling_params=sampling_params,
result_generator=result_generator(),
context=SimpleContext(),
model_name="test-model",
tokenizer=MagicMock(),
request_metadata=metadata,
created_time=0,
_increment_sequence_number_and_return=_identity_increment,
):
events.append(event)

type_names = [e.type for e in events]
assert type_names == [
"response.output_item.added",
"response.function_call_arguments.delta",
"response.function_call_arguments.delta",
"response.function_call_arguments.done",
"response.output_item.done",
]

deltas = [
e.delta
for e in events
if e.type == "response.function_call_arguments.delta"
]
done_event = next(
e for e in events if e.type == "response.function_call_arguments.done"
)
assert "".join(deltas) == '{"location":"Berlin"}'
assert done_event.arguments == '{"location":"Berlin"}'

@pytest.mark.asyncio
async def test_channel_transition_closes_reasoning_before_text_done(self):
"""analysis -> final in one batch must close reasoning first."""
serving = _make_serving_instance_with_harmony()
contexts = [
_make_streaming_harmony_ctx(
[
("analysis", None, "thinking"),
("final", None, "answer"),
],
expecting_start=True,
)
]

async def result_generator():
for ctx in contexts:
yield ctx

request = ResponsesRequest(input="hi", tools=[], stream=True)
sampling_params = SamplingParams(max_tokens=64)
metadata = RequestResponseMetadata(request_id="req")
_identity_increment._counter = 0 # type: ignore[attr-defined]

events = []
async for event in serving._process_harmony_streaming_events(
request=request,
sampling_params=sampling_params,
result_generator=result_generator(),
context=SimpleContext(),
model_name="test-model",
tokenizer=MagicMock(),
request_metadata=metadata,
created_time=0,
_increment_sequence_number_and_return=_identity_increment,
):
events.append(event)

type_names = [e.type for e in events]
assert type_names == [
"response.output_item.added",
"response.reasoning_part.added",
"response.reasoning_text.delta",
"response.reasoning_text.done",
"response.reasoning_part.done",
"response.output_item.done",
"response.output_item.added",
"response.content_part.added",
"response.output_text.delta",
"response.output_text.done",
"response.content_part.done",
"response.output_item.done",
]

def test_ignored_segments_do_not_contaminate_done_payload(self):
"""Unsupported segments should not leak into the later *.done text."""
ctx = _make_streaming_harmony_ctx(
[
("unknown_channel", None, "hidden"),
("final", None, "visible"),
]
)
state = StreamingState()

events = emit_content_delta_events(ctx, state)
done_events = _emit_channel_done_events(state)

deltas = [e.delta for e in events if e.type == "response.output_text.delta"]
done_event = next(
e for e in done_events if e.type == "response.output_text.done"
)
assert deltas == ["visible"]
assert done_event.text == "visible"
18 changes: 17 additions & 1 deletion vllm/entrypoints/openai/responses/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -845,6 +845,7 @@ def __init__(self, *args, **kwargs):
self.last_tok = None
self.first_tok_of_message = True
self.last_content_delta = None
self.channel_deltas: list[tuple[str | None, str | None, str]] = []

@property
def messages(self) -> list:
Expand All @@ -854,6 +855,7 @@ def append_output(self, output: RequestOutput) -> None:
# append_output is called for each output token in streaming case,
# so we only want to add the prompt tokens once for each message.
self.last_content_delta = None
self.channel_deltas = []
if self.first_tok_of_message:
self._update_prefill_token_usage(output)
# Reset self.first_tok_of_message if needed:
Expand All @@ -864,7 +866,21 @@ def append_output(self, output: RequestOutput) -> None:
last_delta_text = ""
for tok in output.outputs[0].token_ids:
self.parser.process(tok)
last_delta_text += self.parser.last_content_delta or ""
tok_delta = self.parser.last_content_delta
if tok_delta:
channel = self.parser.current_channel
recipient = self.parser.current_recipient
# Coalesce consecutive tokens in the same channel+recipient
if (
self.channel_deltas
and self.channel_deltas[-1][0] == channel
and self.channel_deltas[-1][1] == recipient
):
ch, rcp, prev = self.channel_deltas[-1]
self.channel_deltas[-1] = (ch, rcp, prev + tok_delta)
else:
self.channel_deltas.append((channel, recipient, tok_delta))
last_delta_text += tok_delta
if last_delta_text:
self.last_content_delta = last_delta_text
self._update_decode_token_usage(output)
Expand Down
39 changes: 26 additions & 13 deletions vllm/entrypoints/openai/responses/serving.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@
)
from vllm.entrypoints.openai.responses.streaming_events import (
StreamingState,
_emit_channel_done_events,
emit_content_delta_events,
emit_previous_item_done_events,
emit_tool_action_events,
)
from vllm.entrypoints.openai.responses.utils import (
Expand Down Expand Up @@ -1463,9 +1463,7 @@ async def _process_simple_streaming_events(
id=current_item_id,
call_id=current_tool_call_id,
name=current_tool_call_name,
arguments=delta_message.tool_calls[
0
].function.arguments,
arguments="",
status="in_progress",
),
)
Expand Down Expand Up @@ -1917,21 +1915,36 @@ async def _process_harmony_streaming_events(
# finish_reason='error' indicates a retryable error
self._raise_if_error(ctx.finish_reason, request.request_id)

if ctx.is_expecting_start():
if len(ctx.parser.messages) > 0:
previous_item = ctx.parser.messages[-1]
for event in emit_previous_item_done_events(previous_item, state):
yield _increment_sequence_number_and_return(event)
state.reset_for_new_item()

# Stream the output of a harmony message
# Stream the output of a harmony message.
# emit_content_delta_events detects mid-message channel
# transitions (e.g. analysis → final) and emits done events
# for the previous channel before starting the new one.
for event in emit_content_delta_events(ctx, state):
yield _increment_sequence_number_and_return(event)

# Stream tool call outputs
# Stream synthetic browser/web-search events. Function-call,
# MCP, and code-interpreter items are finalized through the
# state-based channel flush below so they do not double-emit
# completion events.
for event in emit_tool_action_events(ctx, state, self.tool_server):
yield _increment_sequence_number_and_return(event)

# If this batch completed a Harmony message, flush done
# events only after the batch's deltas have been emitted.
# Otherwise the current batch's tail would be missing from
# the corresponding *.done payload.
if ctx.is_expecting_start():
if state.sent_output_item_added and state.last_channel is not None:
for event in _emit_channel_done_events(state):
yield _increment_sequence_number_and_return(event)
state.reset_for_new_item()

# Flush done events for the final item when the stream ends
# without a subsequent message boundary.
if state.sent_output_item_added and state.last_channel is not None:
for event in _emit_channel_done_events(state):
yield _increment_sequence_number_and_return(event)

async def responses_stream_generator(
self,
request: ResponsesRequest,
Expand Down
Loading
Loading