diff --git a/tests/entrypoints/openai/test_serving_responses.py b/tests/entrypoints/openai/test_serving_responses.py index b5d2b24a63a5..656de2a7bb74 100644 --- a/tests/entrypoints/openai/test_serving_responses.py +++ b/tests/entrypoints/openai/test_serving_responses.py @@ -1,17 +1,22 @@ # SPDX-License-Identifier: Apache-2.0 # SPDX-FileCopyrightText: Copyright contributors to the vLLM project +from collections.abc import Sequence from contextlib import AsyncExitStack from unittest.mock import MagicMock import pytest import pytest_asyncio from openai.types.responses import ( + ResponseContentPartAddedEvent, + ResponseOutputItemAddedEvent, ResponseOutputItemDoneEvent, + ResponseOutputMessage, ResponseReasoningItem, ResponseReasoningTextDeltaEvent, ResponseReasoningTextDoneEvent, ResponseTextDeltaEvent, + ResponseTextDoneEvent, ) from openai.types.responses.tool import ( CodeInterpreterContainerCodeInterpreterToolAuto, @@ -22,9 +27,13 @@ import vllm.envs as envs from vllm.entrypoints.mcp.tool_server import ToolServer +from vllm.entrypoints.openai.chat_completion.protocol import ( + ChatCompletionRequest, +) from vllm.entrypoints.openai.engine.protocol import ( DeltaMessage, ErrorResponse, + ExtractedToolCallInformation, RequestResponseMetadata, ) from vllm.entrypoints.openai.responses.context import ConversationContext, SimpleContext @@ -39,6 +48,7 @@ ) from vllm.inputs.data import TokensPrompt from vllm.outputs import CompletionOutput, RequestOutput +from vllm.parser import DelegatingParser, StreamingParseState from vllm.sampling_params import SamplingParams @@ -592,6 +602,77 @@ def _make_simple_context_with_output(text, token_ids): return ctx +class _ScriptedParser(DelegatingParser): + """A DelegatingParser that returns deltas from a pre-defined sequence. + + Used in tests to exercise ``process_streaming_events`` without needing + a real reasoning/tool parser. Each call to ``extract_streaming_delta`` + pops the next DeltaMessage from the sequence. + """ + + def __init__(self, delta_sequence: list[DeltaMessage]): + # Provide a dummy tokenizer — tests don't need a real one. + super().__init__(MagicMock()) + self._delta_sequence = list(delta_sequence) + self._call_count = 0 + + # -- Override abstract methods with minimal stubs -- + + def is_reasoning_end(self, input_ids: list[int]) -> bool: + return True + + def extract_content_ids(self, input_ids: list[int]) -> list[int]: + return input_ids + + def extract_reasoning(self, model_output, request): + return None, model_output + + def extract_response_outputs(self, model_output, request, **kwargs): + return [] + + def extract_reasoning_streaming( + self, + previous_text: str, + current_text: str, + delta_text: str, + previous_token_ids: Sequence[int], + current_token_ids: Sequence[int], + delta_token_ids: Sequence[int], + ) -> DeltaMessage | None: + return DeltaMessage(content=delta_text) + + def extract_tool_calls(self, model_output: str, request: ChatCompletionRequest): + return ExtractedToolCallInformation( + tools_called=False, tool_calls=[], content=model_output + ) + + def extract_tool_calls_streaming( + self, + previous_text, + current_text, + delta_text, + previous_token_ids, + current_token_ids, + delta_token_ids, + request, + ): + return None + + def extract_streaming_delta( + self, + state: StreamingParseState, + delta_text: str, + delta_token_ids: list[int], + prompt_token_ids: list[int] | None, + request, + ) -> DeltaMessage | None: + if self._call_count >= len(self._delta_sequence): + return None + result = self._delta_sequence[self._call_count] + self._call_count += 1 + return result + + def _make_serving_instance_with_reasoning(): """Create an OpenAIServingResponses with a mocked reasoning parser.""" engine_client = MagicMock() @@ -628,14 +709,19 @@ def _identity_increment(event): return event +def _noop_raise_if_error(finish_reason, request_id): + """No-op callback matching the raise_if_error signature.""" + pass + + class TestStreamingReasoningToContentTransition: - """Tests for _process_simple_streaming_events reasoning-to-content + """Tests for process_streaming_events reasoning-to-content transition, specifically the fix for mixed deltas that carry both reasoning and content simultaneously.""" @pytest.mark.asyncio async def test_mixed_delta_reasoning_and_content_emits_reasoning_delta( - self, monkeypatch + self, ): """When the reasoning parser produces a delta with both reasoning and content set (e.g. reasoning end and content start in the same @@ -643,31 +729,13 @@ async def test_mixed_delta_reasoning_and_content_emits_reasoning_delta( ResponseReasoningTextDeltaEvent and included in the ResponseReasoningTextDoneEvent text.""" - monkeypatch.setattr(envs, "VLLM_USE_EXPERIMENTAL_PARSER_CONTEXT", False) - serving = _make_serving_instance_with_reasoning() - - # Sequence of DeltaMessages the mock reasoning parser will return delta_sequence = [ DeltaMessage(reasoning="thinking..."), DeltaMessage(reasoning=" end", content="hello"), # mixed delta DeltaMessage(content=" world"), ] - call_count = 0 - - def mock_extract_reasoning_streaming(**kwargs): - nonlocal call_count - result = delta_sequence[call_count] - call_count += 1 - return result - - # Mock the reasoning parser on the serving instance - mock_parser = MagicMock() - mock_parser.extract_reasoning_streaming = mock_extract_reasoning_streaming - mock_parser.extract_tool_calls_streaming = mock_extract_reasoning_streaming - serving.parser = MagicMock() - serving.parser.reasoning_parser_cls = MagicMock(return_value=mock_parser) - serving.parser.tool_parser_cls = MagicMock(return_value=mock_parser) - # Create contexts for each streaming chunk + parser = _ScriptedParser(delta_sequence) + contexts = [ _make_simple_context_with_output("chunk1", [10]), _make_simple_context_with_output("chunk2", [20]), @@ -679,21 +747,17 @@ async def result_generator(): yield ctx request = ResponsesRequest(input="hi", tools=[], stream=True) - sampling_params = SamplingParams(max_tokens=64) - metadata = RequestResponseMetadata(request_id="req") _identity_increment._counter = 0 # type: ignore events = [] - async for event in serving._process_simple_streaming_events( + async for event in parser.process_streaming_events( request=request, - sampling_params=sampling_params, result_generator=result_generator(), - context=SimpleContext(), - model_name="test-model", - tokenizer=MagicMock(), - request_metadata=metadata, - created_time=0, - _increment_sequence_number_and_return=_identity_increment, + request_id="req", + raise_if_error=_noop_raise_if_error, + create_logprobs=None, + top_logprobs=None, + increment_sequence_number=_identity_increment, ): events.append(event) @@ -722,32 +786,16 @@ async def result_generator(): @pytest.mark.asyncio async def test_transition_without_mixed_delta_no_extra_reasoning_event( - self, monkeypatch + self, ): """When the transition from reasoning to content is clean (no mixed delta), no extra reasoning delta event should be emitted.""" - monkeypatch.setattr(envs, "VLLM_USE_EXPERIMENTAL_PARSER_CONTEXT", False) - serving = _make_serving_instance_with_reasoning() - delta_sequence = [ DeltaMessage(reasoning="thinking"), DeltaMessage(content="answer"), ] - call_count = 0 - - def mock_extract_reasoning_streaming(**kwargs): - nonlocal call_count - result = delta_sequence[call_count] - call_count += 1 - return result - - mock_parser = MagicMock() - mock_parser.extract_reasoning_streaming = mock_extract_reasoning_streaming - mock_parser.extract_tool_calls_streaming = mock_extract_reasoning_streaming - serving.parser = MagicMock() - serving.parser.reasoning_parser_cls = MagicMock(return_value=mock_parser) - serving.parser.tool_parser_cls = MagicMock(return_value=mock_parser) + parser = _ScriptedParser(delta_sequence) contexts = [ _make_simple_context_with_output("chunk1", [10]), @@ -759,21 +807,17 @@ async def result_generator(): yield ctx request = ResponsesRequest(input="hi", tools=[], stream=True) - sampling_params = SamplingParams(max_tokens=64) - metadata = RequestResponseMetadata(request_id="req") _identity_increment._counter = 0 # type: ignore events = [] - async for event in serving._process_simple_streaming_events( + async for event in parser.process_streaming_events( request=request, - sampling_params=sampling_params, result_generator=result_generator(), - context=SimpleContext(), - model_name="test-model", - tokenizer=MagicMock(), - request_metadata=metadata, - created_time=0, - _increment_sequence_number_and_return=_identity_increment, + request_id="req", + raise_if_error=_noop_raise_if_error, + create_logprobs=None, + top_logprobs=None, + increment_sequence_number=_identity_increment, ): events.append(event) @@ -797,32 +841,16 @@ async def result_generator(): assert text_deltas[0].delta == "answer" @pytest.mark.asyncio - async def test_reasoning_only_stream_no_content(self, monkeypatch): + async def test_reasoning_only_stream_no_content(self): """When the stream has only reasoning deltas and no content, the reasoning done event should be emitted at finalization with the full accumulated text, and no text delta events should appear.""" - monkeypatch.setattr(envs, "VLLM_USE_EXPERIMENTAL_PARSER_CONTEXT", False) - serving = _make_serving_instance_with_reasoning() - delta_sequence = [ DeltaMessage(reasoning="step 1"), DeltaMessage(reasoning=" step 2"), ] - call_count = 0 - - def mock_extract_reasoning_streaming(**kwargs): - nonlocal call_count - result = delta_sequence[call_count] - call_count += 1 - return result - - mock_parser = MagicMock() - mock_parser.extract_reasoning_streaming = mock_extract_reasoning_streaming - mock_parser.extract_tool_calls_streaming = mock_extract_reasoning_streaming - serving.parser = MagicMock() - serving.parser.reasoning_parser_cls = MagicMock(return_value=mock_parser) - serving.parser.tool_parser_cls = MagicMock(return_value=mock_parser) + parser = _ScriptedParser(delta_sequence) contexts = [ _make_simple_context_with_output("chunk1", [10]), @@ -834,21 +862,17 @@ async def result_generator(): yield ctx request = ResponsesRequest(input="hi", tools=[], stream=True) - sampling_params = SamplingParams(max_tokens=64) - metadata = RequestResponseMetadata(request_id="req") _identity_increment._counter = 0 # type: ignore events = [] - async for event in serving._process_simple_streaming_events( + async for event in parser.process_streaming_events( request=request, - sampling_params=sampling_params, result_generator=result_generator(), - context=SimpleContext(), - model_name="test-model", - tokenizer=MagicMock(), - request_metadata=metadata, - created_time=0, - _increment_sequence_number_and_return=_identity_increment, + request_id="req", + raise_if_error=_noop_raise_if_error, + create_logprobs=None, + top_logprobs=None, + increment_sequence_number=_identity_increment, ): events.append(event) @@ -877,3 +901,222 @@ async def result_generator(): ] assert len(item_done_events) == 1 assert isinstance(item_done_events[0].item, ResponseReasoningItem) + + +def _make_serving_instance_no_parser(): + """Create an OpenAIServingResponses with no tool/reasoning parser.""" + engine_client = MagicMock() + model_config = MagicMock() + model_config.max_model_len = 100 + model_config.hf_config.model_type = "test" + model_config.hf_text_config = MagicMock() + model_config.get_diff_sampling_param.return_value = {} + engine_client.model_config = model_config + engine_client.input_processor = MagicMock() + engine_client.io_processor = MagicMock() + engine_client.renderer = MagicMock() + + models = MagicMock() + + serving = OpenAIServingResponses( + engine_client=engine_client, + models=models, + openai_serving_render=MagicMock(), + request_logger=None, + chat_template=None, + chat_template_content_format="auto", + # No reasoning_parser or tool_parser → self.parser is None + ) + assert serving.parser is None, "Expected no parser for this test fixture" + return serving + + +class TestNoParserStreamingLifecycle: + """Tests for _process_simple_streaming_events when self.parser is None. + + The fallback path must emit the full event lifecycle + (output_item.added, content_part.added, deltas, done events) + rather than bare delta events only. + """ + + @pytest.mark.asyncio + async def test_full_lifecycle_events_emitted(self): + """When no parser is configured, streaming should still emit: + output_item.added → content_part.added → output_text.delta(s) + → output_text.done → content_part.done → output_item.done + """ + serving = _make_serving_instance_no_parser() + + contexts = [ + _make_simple_context_with_output("Hello", [10]), + _make_simple_context_with_output(" world", [20]), + ] + + async def result_generator(): + for ctx in contexts: + yield ctx + + request = ResponsesRequest(input="hi", tools=[], stream=True) + _identity_increment._counter = 0 # type: ignore + + events = [] + async for event in serving._process_simple_streaming_events( + request=request, + sampling_params=SamplingParams(), + result_generator=result_generator(), + context=SimpleContext(), + model_name="test-model", + tokenizer=MagicMock(), + request_metadata=RequestResponseMetadata(request_id="req"), + created_time=0, + _increment_sequence_number_and_return=_identity_increment, + ): + events.append(event) + + type_names = [e.type for e in events] + + # Verify the full lifecycle is present + assert "response.output_item.added" in type_names + assert "response.content_part.added" in type_names + assert "response.output_text.delta" in type_names + assert "response.output_text.done" in type_names + assert "response.content_part.done" in type_names + assert "response.output_item.done" in type_names + + # Verify ordering: added events come first, done events come last + added_idx = type_names.index("response.output_item.added") + part_added_idx = type_names.index("response.content_part.added") + first_delta_idx = type_names.index("response.output_text.delta") + text_done_idx = type_names.index("response.output_text.done") + part_done_idx = type_names.index("response.content_part.done") + item_done_idx = type_names.index("response.output_item.done") + + assert added_idx < part_added_idx < first_delta_idx + assert first_delta_idx < text_done_idx < part_done_idx < item_done_idx + + @pytest.mark.asyncio + async def test_text_deltas_content(self): + """Delta events should carry the correct text content.""" + serving = _make_serving_instance_no_parser() + + contexts = [ + _make_simple_context_with_output("Hello", [10]), + _make_simple_context_with_output(" world", [20]), + ] + + async def result_generator(): + for ctx in contexts: + yield ctx + + request = ResponsesRequest(input="hi", tools=[], stream=True) + _identity_increment._counter = 0 # type: ignore + + events = [] + async for event in serving._process_simple_streaming_events( + request=request, + sampling_params=SamplingParams(), + result_generator=result_generator(), + context=SimpleContext(), + model_name="test-model", + tokenizer=MagicMock(), + request_metadata=RequestResponseMetadata(request_id="req"), + created_time=0, + _increment_sequence_number_and_return=_identity_increment, + ): + events.append(event) + + # Check delta text values + text_deltas = [e for e in events if isinstance(e, ResponseTextDeltaEvent)] + assert len(text_deltas) == 2 + assert text_deltas[0].delta == "Hello" + assert text_deltas[1].delta == " world" + + # Check done event has full accumulated text + text_done = [e for e in events if isinstance(e, ResponseTextDoneEvent)] + assert len(text_done) == 1 + assert text_done[0].text == "Hello world" + + # Check output_item.done has a completed message with the full text + item_done = [e for e in events if isinstance(e, ResponseOutputItemDoneEvent)] + assert len(item_done) == 1 + assert isinstance(item_done[0].item, ResponseOutputMessage) + assert item_done[0].item.status == "completed" + + @pytest.mark.asyncio + async def test_empty_output_no_lifecycle_events(self): + """When all outputs are empty, no lifecycle events should be emitted.""" + serving = _make_serving_instance_no_parser() + + contexts = [ + _make_simple_context_with_output("", [10]), + ] + + async def result_generator(): + for ctx in contexts: + yield ctx + + request = ResponsesRequest(input="hi", tools=[], stream=True) + _identity_increment._counter = 0 # type: ignore + + events = [] + async for event in serving._process_simple_streaming_events( + request=request, + sampling_params=SamplingParams(), + result_generator=result_generator(), + context=SimpleContext(), + model_name="test-model", + tokenizer=MagicMock(), + request_metadata=RequestResponseMetadata(request_id="req"), + created_time=0, + _increment_sequence_number_and_return=_identity_increment, + ): + events.append(event) + + # No events should be emitted for empty content + assert len(events) == 0 + + @pytest.mark.asyncio + async def test_item_ids_consistent_across_events(self): + """The item_id should be consistent across all lifecycle events.""" + serving = _make_serving_instance_no_parser() + + contexts = [ + _make_simple_context_with_output("test", [10]), + ] + + async def result_generator(): + for ctx in contexts: + yield ctx + + request = ResponsesRequest(input="hi", tools=[], stream=True) + _identity_increment._counter = 0 # type: ignore + + events = [] + async for event in serving._process_simple_streaming_events( + request=request, + sampling_params=SamplingParams(), + result_generator=result_generator(), + context=SimpleContext(), + model_name="test-model", + tokenizer=MagicMock(), + request_metadata=RequestResponseMetadata(request_id="req"), + created_time=0, + _increment_sequence_number_and_return=_identity_increment, + ): + events.append(event) + + # Extract item_id from the added event + added = [e for e in events if isinstance(e, ResponseOutputItemAddedEvent)] + assert len(added) == 1 + item_id = added[0].item.id + assert item_id # not empty + + # All events with item_id should share the same value + part_added = [e for e in events if isinstance(e, ResponseContentPartAddedEvent)] + assert part_added[0].item_id == item_id + + text_deltas = [e for e in events if isinstance(e, ResponseTextDeltaEvent)] + assert text_deltas[0].item_id == item_id + + text_done = [e for e in events if isinstance(e, ResponseTextDoneEvent)] + assert text_done[0].item_id == item_id diff --git a/vllm/entrypoints/openai/responses/serving.py b/vllm/entrypoints/openai/responses/serving.py index dd42a6a56600..272d109f47d6 100644 --- a/vllm/entrypoints/openai/responses/serving.py +++ b/vllm/entrypoints/openai/responses/serving.py @@ -3,7 +3,6 @@ import asyncio import time -import uuid from collections import deque from collections.abc import AsyncGenerator, AsyncIterator, Callable, Mapping, Sequence from contextlib import AsyncExitStack @@ -13,29 +12,14 @@ from fastapi import Request from openai.types.responses import ( - ResponseContentPartAddedEvent, - ResponseContentPartDoneEvent, - ResponseFunctionCallArgumentsDeltaEvent, - ResponseFunctionCallArgumentsDoneEvent, ResponseFunctionToolCall, - ResponseFunctionToolCallItem, ResponseOutputItem, - ResponseOutputItemAddedEvent, - ResponseOutputItemDoneEvent, ResponseOutputMessage, ResponseOutputText, - ResponseReasoningItem, - ResponseReasoningTextDeltaEvent, - ResponseReasoningTextDoneEvent, ResponseStatus, - ResponseTextDeltaEvent, - ResponseTextDoneEvent, response_text_delta_event, ) from openai.types.responses.response_output_text import Logprob, LogprobTopLogprob -from openai.types.responses.response_reasoning_item import ( - Content as ResponseReasoningTextContent, -) from openai.types.responses.tool import Mcp, Tool from openai_harmony import Message as OpenAIHarmonyMessage from pydantic import TypeAdapter @@ -88,8 +72,6 @@ ResponseInProgressEvent, ResponseInputOutputItem, ResponseInputOutputMessage, - ResponseReasoningPartAddedEvent, - ResponseReasoningPartDoneEvent, ResponsesRequest, ResponsesResponse, ResponseUsage, @@ -99,6 +81,8 @@ StreamingState, emit_content_delta_events, emit_previous_item_done_events, + emit_text_delta_events, + emit_text_output_done_events, emit_tool_action_events, ) from vllm.entrypoints.openai.responses.utils import ( @@ -115,12 +99,11 @@ from vllm.logprobs import SampleLogprobs from vllm.lora.request import LoRARequest from vllm.outputs import CompletionOutput -from vllm.parser import ParserManager +from vllm.parser import DelegatingParser, ParserManager from vllm.sampling_params import SamplingParams, StructuredOutputsParams from vllm.tokenizers import TokenizerLike from vllm.tool_parsers import ToolParser from vllm.utils import random_uuid -from vllm.utils.collection_utils import as_list logger = init_logger(__name__) @@ -1343,563 +1326,47 @@ async def _process_simple_streaming_events( [StreamingResponsesResponse], StreamingResponsesResponse ], ) -> AsyncGenerator[StreamingResponsesResponse, None]: - current_content_index = 0 - current_output_index = 0 - current_item_id = "" - reasoning_parser = None - if self.parser and self.parser.reasoning_parser_cls: - reasoning_parser = self.parser.reasoning_parser_cls(tokenizer) - tool_parser = None - if self.parser and self.parser.tool_parser_cls: - tool_parser = self.parser.tool_parser_cls(tokenizer) - reasoning_ended = False - tool_call_text_started = False - previous_text = "" - previous_token_ids: list[int] = [] - prompt_is_reasoning_end = None - first_delta_sent = False - previous_delta_messages: list[DeltaMessage] = [] - async for ctx in result_generator: - assert isinstance(ctx, SimpleContext) - if ctx.last_output is None: - continue - if reasoning_parser and prompt_is_reasoning_end is None: - prompt_is_reasoning_end = reasoning_parser.is_reasoning_end( - ctx.last_output.prompt_token_ids - ) - if ctx.last_output.outputs: - output = ctx.last_output.outputs[0] - # finish_reason='error' indicates a retryable error - self._raise_if_error(output.finish_reason, request.request_id) - delta_text = output.text - delta_token_ids = as_list(output.token_ids) - current_text = previous_text + delta_text - current_token_ids = previous_token_ids + delta_token_ids - - if reasoning_parser and tool_parser: - if prompt_is_reasoning_end: - reasoning_ended = True - if not reasoning_ended: - delta_message = reasoning_parser.extract_reasoning_streaming( - previous_text=previous_text, - current_text=current_text, - delta_text=delta_text, - previous_token_ids=previous_token_ids, - current_token_ids=current_token_ids, - delta_token_ids=delta_token_ids, - ) - if reasoning_parser.is_reasoning_end(delta_token_ids): - reasoning_ended = True - current_token_ids = reasoning_parser.extract_content_ids( - delta_token_ids - ) - if delta_message and delta_message.content: - current_text = delta_message.content - delta_message.content = None - else: - current_text = "" - - if reasoning_ended: - if not tool_call_text_started: - tool_call_text_started = True - previous_text = "" - previous_token_ids = [] - delta_text = current_text - delta_token_ids = current_token_ids - - delta_message = tool_parser.extract_tool_calls_streaming( - previous_text=previous_text, - current_text=current_text, - delta_text=delta_text, - previous_token_ids=previous_token_ids, - current_token_ids=current_token_ids, - delta_token_ids=delta_token_ids, - request=request, # type: ignore[arg-type] - ) - elif reasoning_parser: - delta_message = reasoning_parser.extract_reasoning_streaming( - previous_text=previous_text, - current_text=current_text, - delta_text=delta_text, - previous_token_ids=previous_token_ids, - current_token_ids=current_token_ids, - delta_token_ids=delta_token_ids, - ) - elif tool_parser: - delta_message = tool_parser.extract_tool_calls_streaming( - previous_text=previous_text, - current_text=current_text, - delta_text=delta_text, - previous_token_ids=previous_token_ids, - current_token_ids=current_token_ids, - delta_token_ids=delta_token_ids, - request=request, # type: ignore[arg-type] - ) - else: - delta_message = DeltaMessage( - content=output.text, - ) - previous_text = current_text - previous_token_ids = current_token_ids - if not delta_message: + raw_parser = self.parser(tokenizer) if self.parser else None + parser = raw_parser if isinstance(raw_parser, DelegatingParser) else None + if parser is not None: + async for event in parser.process_streaming_events( + request=request, + result_generator=result_generator, + request_id=request.request_id, + raise_if_error=self._raise_if_error, + create_logprobs=( + self._create_stream_response_logprobs + if request.is_include_output_logprobs() + else None + ), + top_logprobs=request.top_logprobs, + increment_sequence_number=_increment_sequence_number_and_return, + ): + yield event + else: + # Fallback: no parser — emit the full event lifecycle for + # plain text output so clients see the same added/delta/done + # sequence they would with a parser. + state = StreamingState() + full_text = "" + async for ctx in result_generator: + assert isinstance(ctx, SimpleContext) + if ctx.last_output is None: continue - if not first_delta_sent: - current_item_id = random_uuid() - if delta_message.tool_calls: - current_tool_call_id = f"call_{random_uuid()}" - assert len(delta_message.tool_calls) == 1, ( - "Multiple tool calls in one delta is not supported" - ) - assert delta_message.tool_calls[0].function is not None, ( - "Tool call without function is not supported" - ) - assert delta_message.tool_calls[0].function.name is not None, ( - "Tool call without function name is not supported" - ) - current_tool_call_name = delta_message.tool_calls[ - 0 - ].function.name - yield _increment_sequence_number_and_return( - ResponseOutputItemAddedEvent( - type="response.output_item.added", - sequence_number=-1, - output_index=current_output_index, - item=ResponseFunctionToolCallItem( - type="function_call", - id=current_item_id, - call_id=current_tool_call_id, - name=current_tool_call_name, - arguments=delta_message.tool_calls[ - 0 - ].function.arguments, - status="in_progress", - ), - ) - ) - elif delta_message.reasoning: - yield _increment_sequence_number_and_return( - ResponseOutputItemAddedEvent( - type="response.output_item.added", - sequence_number=-1, - output_index=current_output_index, - item=ResponseReasoningItem( - type="reasoning", - id=current_item_id, - summary=[], - status="in_progress", - ), - ) - ) - yield _increment_sequence_number_and_return( - ResponseReasoningPartAddedEvent( - type="response.reasoning_part.added", - sequence_number=-1, - output_index=current_output_index, - item_id=current_item_id, - content_index=current_content_index, - part=ResponseReasoningTextContent( - text="", - type="reasoning_text", - ), - ) - ) - elif not delta_message.tool_calls: - yield _increment_sequence_number_and_return( - ResponseOutputItemAddedEvent( - type="response.output_item.added", - sequence_number=-1, - output_index=current_output_index, - item=ResponseOutputMessage( - id=current_item_id, - type="message", - role="assistant", - content=[], - status="in_progress", - ), - ) - ) - yield _increment_sequence_number_and_return( - ResponseContentPartAddedEvent( - type="response.content_part.added", - sequence_number=-1, - output_index=current_output_index, - item_id=current_item_id, - content_index=current_content_index, - part=ResponseOutputText( - type="output_text", - text="", - annotations=[], - logprobs=[], - ), - ) - ) - first_delta_sent = True - - # check delta message and previous delta message are - # same as content or reasoning content - if ( - previous_delta_messages - and previous_delta_messages[-1].reasoning is not None - and delta_message.content is not None - ): - # from reasoning to normal content, send done - # event for reasoning - reason_content = "".join( - pm.reasoning - for pm in previous_delta_messages - if pm.reasoning is not None - ) - - # delta message could have both reasoning and - # content. Include current delta's reasoning in the - # finalization since it may carry the tail end of - # reasoning text (e.g. when reasoning end and - # content start arrive in the same delta). - if delta_message.reasoning is not None: - yield _increment_sequence_number_and_return( - ResponseReasoningTextDeltaEvent( - type="response.reasoning_text.delta", - sequence_number=-1, - content_index=current_content_index, - output_index=current_output_index, - item_id=current_item_id, - delta=delta_message.reasoning, - ) - ) - reason_content += delta_message.reasoning - delta_message = DeltaMessage(content=delta_message.content) - - yield _increment_sequence_number_and_return( - ResponseReasoningTextDoneEvent( - type="response.reasoning_text.done", - item_id=current_item_id, - sequence_number=-1, - output_index=current_output_index, - content_index=current_content_index, - text=reason_content, - ) - ) - yield _increment_sequence_number_and_return( - ResponseReasoningPartDoneEvent( - type="response.reasoning_part.done", - sequence_number=-1, - item_id=current_item_id, - output_index=current_output_index, - content_index=current_content_index, - part=ResponseReasoningTextContent( - text=reason_content, - type="reasoning_text", - ), - ) - ) - current_content_index = 0 - reasoning_item = ResponseReasoningItem( - type="reasoning", - content=[ - ResponseReasoningTextContent( - text=reason_content, - type="reasoning_text", - ), - ], - status="completed", - id=current_item_id, - summary=[], - ) - yield _increment_sequence_number_and_return( - ResponseOutputItemDoneEvent( - type="response.output_item.done", - sequence_number=-1, - output_index=current_output_index, - item=reasoning_item, - ) - ) - current_output_index += 1 - current_item_id = str(uuid.uuid4()) - yield _increment_sequence_number_and_return( - ResponseOutputItemAddedEvent( - type="response.output_item.added", - sequence_number=-1, - output_index=current_output_index, - item=ResponseOutputMessage( - id=current_item_id, - type="message", - role="assistant", - content=[], - status="in_progress", - ), - ) - ) - yield _increment_sequence_number_and_return( - ResponseContentPartAddedEvent( - type="response.content_part.added", - sequence_number=-1, - output_index=current_output_index, - item_id=current_item_id, - content_index=current_content_index, - part=ResponseOutputText( - type="output_text", - text="", - annotations=[], - logprobs=[], - ), - ) - ) - # reset previous delta messages - previous_delta_messages = [] - if delta_message.tool_calls and delta_message.tool_calls[0].function: - if delta_message.tool_calls[0].function.arguments: - yield _increment_sequence_number_and_return( - ResponseFunctionCallArgumentsDeltaEvent( - type="response.function_call_arguments.delta", - sequence_number=-1, - output_index=current_output_index, - item_id=current_item_id, - delta=delta_message.tool_calls[0].function.arguments, - ) - ) - # tool call initiated with no arguments - elif delta_message.tool_calls[0].function.name: - # send done with current content part - # and add new function call item - yield _increment_sequence_number_and_return( - ResponseTextDoneEvent( - type="response.output_text.done", - sequence_number=-1, - output_index=current_output_index, - content_index=current_content_index, - text="", - logprobs=[], - item_id=current_item_id, - ) - ) - yield _increment_sequence_number_and_return( - ResponseContentPartDoneEvent( - type="response.content_part.done", - sequence_number=-1, - item_id=current_item_id, - output_index=current_output_index, - content_index=current_content_index, - part=ResponseOutputText( - type="output_text", - text="", - annotations=[], - logprobs=[], - ), - ) - ) - yield _increment_sequence_number_and_return( - ResponseOutputItemDoneEvent( - type="response.output_item.done", - sequence_number=-1, - output_index=current_output_index, - item=ResponseOutputMessage( - id=current_item_id, - type="message", - role="assistant", - content=[], - status="completed", - ), - ) - ) - current_output_index += 1 - current_item_id = random_uuid() - assert delta_message.tool_calls[0].function is not None - current_tool_call_name = delta_message.tool_calls[ - 0 - ].function.name - current_tool_call_id = f"call_{random_uuid()}" - yield _increment_sequence_number_and_return( - ResponseOutputItemAddedEvent( - type="response.output_item.added", - sequence_number=-1, - output_index=current_output_index, - item=ResponseFunctionToolCallItem( - type="function_call", - id=current_item_id, - call_id=current_tool_call_id, - name=current_tool_call_name, - arguments="", - status="in_progress", - ), - ) - ) - # skip content part for tool call - current_content_index = 1 - continue - elif delta_message.reasoning is not None: - yield _increment_sequence_number_and_return( - ResponseReasoningTextDeltaEvent( - type="response.reasoning_text.delta", - sequence_number=-1, - content_index=current_content_index, - output_index=current_output_index, - item_id=current_item_id, - delta=delta_message.reasoning, - ) - ) - elif delta_message.content: - yield _increment_sequence_number_and_return( - ResponseTextDeltaEvent( - type="response.output_text.delta", - sequence_number=-1, - content_index=current_content_index, - output_index=current_output_index, - item_id=current_item_id, - delta=delta_message.content, - logprobs=( - self._create_stream_response_logprobs( - token_ids=output.token_ids, - logprobs=output.logprobs, - tokenizer=tokenizer, - top_logprobs=request.top_logprobs, - ) - if request.is_include_output_logprobs() - else [] - ), - ) - ) - - previous_delta_messages.append(delta_message) - - if previous_delta_messages: - parts = [] - for pm in previous_delta_messages: - if pm.tool_calls: - assert len(pm.tool_calls) == 1, ( - "Multiple tool calls in one delta is not supported" - ) - assert pm.tool_calls[0].function is not None, ( - "Tool call without function is not supported" - ) - parts.append(pm.tool_calls[0].function.arguments or "") - - tool_call_arguments = "".join(parts) - if tool_call_arguments: - yield _increment_sequence_number_and_return( - ResponseFunctionCallArgumentsDoneEvent( - type="response.function_call_arguments.done", - sequence_number=-1, - output_index=current_output_index, - item_id=current_item_id, - arguments=tool_call_arguments, - name=current_tool_call_name, - ) - ) - current_content_index = 0 - function_call_item = ResponseFunctionToolCall( - type="function_call", - name=current_tool_call_name, - arguments=tool_call_arguments, - status="completed", - id=current_item_id, - call_id=current_tool_call_id, - ) - yield _increment_sequence_number_and_return( - ResponseOutputItemDoneEvent( - type="response.output_item.done", - sequence_number=-1, - output_index=current_output_index, - item=function_call_item, - ) - ) - - elif previous_delta_messages[-1].reasoning is not None: - reason_content = "".join( - pm.reasoning - for pm in previous_delta_messages - if pm.reasoning is not None - ) - yield _increment_sequence_number_and_return( - ResponseReasoningTextDoneEvent( - type="response.reasoning_text.done", - item_id=current_item_id, - sequence_number=-1, - output_index=current_output_index, - content_index=current_content_index, - text=reason_content, - ) - ) - yield _increment_sequence_number_and_return( - ResponseReasoningPartDoneEvent( - type="response.reasoning_part.done", - sequence_number=-1, - item_id=current_item_id, - output_index=current_output_index, - content_index=current_content_index, - part=ResponseReasoningTextContent( - text=reason_content, - type="reasoning_text", - ), - ) - ) - reasoning_item = ResponseReasoningItem( - type="reasoning", - content=[ - ResponseReasoningTextContent( - text=reason_content, - type="reasoning_text", - ), - ], - status="completed", - id=current_item_id, - summary=[], - ) - yield _increment_sequence_number_and_return( - ResponseOutputItemDoneEvent( - type="response.output_item.done", - sequence_number=-1, - output_index=current_output_index, - item=reasoning_item, - ) - ) - elif previous_delta_messages[-1].content: - final_content = "".join( - pm.content for pm in previous_delta_messages if pm.content - ) - yield _increment_sequence_number_and_return( - ResponseTextDoneEvent( - type="response.output_text.done", - sequence_number=-1, - output_index=current_output_index, - content_index=current_content_index, - text=final_content, - logprobs=[], - item_id=current_item_id, - ) - ) - part = ResponseOutputText( - text=final_content, - type="output_text", - annotations=[], - ) - yield _increment_sequence_number_and_return( - ResponseContentPartDoneEvent( - type="response.content_part.done", - sequence_number=-1, - item_id=current_item_id, - output_index=current_output_index, - content_index=current_content_index, - part=part, - ) - ) - item = ResponseOutputMessage( - type="message", - role="assistant", - content=[ - part, - ], - status="completed", - id=current_item_id, - summary=[], - ) - yield _increment_sequence_number_and_return( - ResponseOutputItemDoneEvent( - type="response.output_item.done", - sequence_number=-1, - output_index=current_output_index, - item=item, - ) - ) + if ctx.last_output.outputs: + output = ctx.last_output.outputs[0] + self._raise_if_error(output.finish_reason, request.request_id) + delta_message = DeltaMessage(content=output.text) + if delta_message.content: + full_text += delta_message.content + for event in emit_text_delta_events( + delta_message.content, state + ): + yield _increment_sequence_number_and_return(event) + # Emit done events only if we actually sent any text deltas + if state.sent_output_item_added: + for event in emit_text_output_done_events(full_text, state): + yield _increment_sequence_number_and_return(event) async def _process_harmony_streaming_events( self, diff --git a/vllm/parser/__init__.py b/vllm/parser/__init__.py index dc256daaa7e2..89d401fc2724 100644 --- a/vllm/parser/__init__.py +++ b/vllm/parser/__init__.py @@ -4,6 +4,7 @@ from vllm.parser.abstract_parser import ( DelegatingParser, Parser, + StreamingParseState, _WrappedParser, ) from vllm.parser.parser_manager import ParserManager @@ -12,6 +13,7 @@ "Parser", "DelegatingParser", "ParserManager", + "StreamingParseState", "_WrappedParser", ] diff --git a/vllm/parser/abstract_parser.py b/vllm/parser/abstract_parser.py index 0c1dda17b6a3..5c89b1afc3a3 100644 --- a/vllm/parser/abstract_parser.py +++ b/vllm/parser/abstract_parser.py @@ -3,16 +3,29 @@ import contextlib import json +import uuid from abc import abstractmethod -from collections.abc import Sequence +from collections.abc import AsyncGenerator, AsyncIterator, Callable, Sequence +from dataclasses import dataclass, field from functools import cached_property from openai.types.responses import ( + ResponseContentPartAddedEvent, + ResponseContentPartDoneEvent, + ResponseFunctionCallArgumentsDeltaEvent, + ResponseFunctionCallArgumentsDoneEvent, ResponseFunctionToolCall, + ResponseFunctionToolCallItem, ResponseOutputItem, + ResponseOutputItemAddedEvent, + ResponseOutputItemDoneEvent, ResponseOutputMessage, ResponseOutputText, ResponseReasoningItem, + ResponseReasoningTextDeltaEvent, + ResponseReasoningTextDoneEvent, + ResponseTextDeltaEvent, + ResponseTextDoneEvent, ToolChoiceFunction, ) from openai.types.responses.response_output_text import Logprob @@ -32,18 +45,38 @@ FunctionCall, FunctionDefinition, ) +from vllm.entrypoints.openai.responses.context import ConversationContext, SimpleContext from vllm.entrypoints.openai.responses.protocol import ( + ResponseReasoningPartAddedEvent, + ResponseReasoningPartDoneEvent, ResponsesRequest, + StreamingResponsesResponse, ) from vllm.logger import init_logger from vllm.reasoning.abs_reasoning_parsers import ReasoningParser from vllm.tokenizers import TokenizerLike from vllm.tool_parsers.abstract_tool_parser import ToolParser from vllm.utils import random_uuid +from vllm.utils.collection_utils import as_list logger = init_logger(__name__) +@dataclass +class StreamingParseState: + """Per-request state for streaming parse operations. + + Parser instances are shared across requests, so this holds the + mutable state that varies per streaming session. + """ + + reasoning_ended: bool = False + tool_call_text_started: bool = False + previous_text: str = "" + previous_token_ids: list[int] = field(default_factory=list) + prompt_is_reasoning_end: bool | None = None + + class Parser: """ Abstract Parser class that unifies ReasoningParser and ToolParser into @@ -224,6 +257,40 @@ def extract_reasoning_streaming( A DeltaMessage with reasoning and/or content fields, or None. """ + # ========== Unified Streaming Method ========== + + @abstractmethod + def extract_streaming_delta( + self, + state: StreamingParseState, + delta_text: str, + delta_token_ids: list[int], + prompt_token_ids: list[int] | None, + request: ChatCompletionRequest | ResponsesRequest, + ) -> DeltaMessage | None: + """ + Extract a streaming delta message, handling reasoning-to-tool + transitions and dispatching to the appropriate parser. + + This method consolidates the 4-branch dispatch logic for streaming: + - Both reasoning + tool parsers: reasoning first, then tool parsing + - Reasoning only: delegate to reasoning parser + - Tool only: delegate to tool parser + - Neither: return content directly + + Args: + state: Mutable per-request streaming state. + delta_text: The new text in this delta. + delta_token_ids: The new token IDs in this delta. + prompt_token_ids: The prompt token IDs (used on first call to + check if reasoning already ended in prompt). None after first + call. + request: The request object. + + Returns: + A DeltaMessage, or None if no output for this delta. + """ + # ========== Tool Parser Methods ========== def adjust_request(self, request: ChatCompletionRequest) -> ChatCompletionRequest: @@ -302,6 +369,16 @@ class DelegatingParser(Parser): values (no reasoning extraction, no tool calls). """ + def is_reasoning_end(self, input_ids: list[int]) -> bool: + if self._reasoning_parser is None: + return True # No reasoning parser = reasoning is always "ended" + return self._reasoning_parser.is_reasoning_end(input_ids) + + def extract_content_ids(self, input_ids: list[int]) -> list[int]: + if self._reasoning_parser is None: + return input_ids + return self._reasoning_parser.extract_content_ids(input_ids) + def extract_reasoning( self, model_output: str, @@ -518,6 +595,625 @@ def extract_tool_calls_streaming( request, ) + def extract_streaming_delta( + self, + state: StreamingParseState, + delta_text: str, + delta_token_ids: list[int], + prompt_token_ids: list[int] | None, + request: ChatCompletionRequest | ResponsesRequest, + ) -> DeltaMessage | None: + # Initialize prompt_is_reasoning_end on first call + if ( + self._reasoning_parser is not None + and state.prompt_is_reasoning_end is None + and prompt_token_ids is not None + ): + state.prompt_is_reasoning_end = self._reasoning_parser.is_reasoning_end( + prompt_token_ids + ) + + current_text = state.previous_text + delta_text + current_token_ids = state.previous_token_ids + delta_token_ids + + delta_message: DeltaMessage | None = None + + if self._reasoning_parser is not None and self._tool_parser is not None: + # Both parsers: reasoning first, then transition to tool parsing + if state.prompt_is_reasoning_end: + state.reasoning_ended = True + + if not state.reasoning_ended: + delta_message = self._reasoning_parser.extract_reasoning_streaming( + previous_text=state.previous_text, + current_text=current_text, + delta_text=delta_text, + previous_token_ids=state.previous_token_ids, + current_token_ids=current_token_ids, + delta_token_ids=delta_token_ids, + ) + if self._reasoning_parser.is_reasoning_end(delta_token_ids): + state.reasoning_ended = True + current_token_ids = self._reasoning_parser.extract_content_ids( + delta_token_ids + ) + if delta_message and delta_message.content: + current_text = delta_message.content + delta_message.content = None + else: + current_text = "" + + if state.reasoning_ended: + if not state.tool_call_text_started: + state.tool_call_text_started = True + state.previous_text = "" + state.previous_token_ids = [] + delta_text = current_text + delta_token_ids = current_token_ids + + delta_message = self._tool_parser.extract_tool_calls_streaming( + previous_text=state.previous_text, + current_text=current_text, + delta_text=delta_text, + previous_token_ids=state.previous_token_ids, + current_token_ids=current_token_ids, + delta_token_ids=delta_token_ids, + request=request, # type: ignore[arg-type] + ) + + elif self._reasoning_parser is not None: + delta_message = self._reasoning_parser.extract_reasoning_streaming( + previous_text=state.previous_text, + current_text=current_text, + delta_text=delta_text, + previous_token_ids=state.previous_token_ids, + current_token_ids=current_token_ids, + delta_token_ids=delta_token_ids, + ) + + elif self._tool_parser is not None: + delta_message = self._tool_parser.extract_tool_calls_streaming( + previous_text=state.previous_text, + current_text=current_text, + delta_text=delta_text, + previous_token_ids=state.previous_token_ids, + current_token_ids=current_token_ids, + delta_token_ids=delta_token_ids, + request=request, # type: ignore[arg-type] + ) + + else: + delta_message = DeltaMessage(content=delta_text) + + state.previous_text = current_text + state.previous_token_ids = current_token_ids + + return delta_message + + # ========== Streaming Event Generation ========== + + async def process_streaming_events( + self, + request: ResponsesRequest, + result_generator: AsyncIterator[ConversationContext | None], + request_id: str, + raise_if_error: Callable[[str | None, str], None], + create_logprobs: Callable[..., list] | None, + top_logprobs: int | None, + increment_sequence_number: Callable[ + [StreamingResponsesResponse], StreamingResponsesResponse + ], + ) -> AsyncGenerator[StreamingResponsesResponse, None]: + """Generate streaming Responses API events from a result generator. + + This is the default implementation that handles reasoning, content, + and tool call streaming transitions. Subclasses can override for + custom behavior (e.g., attaching metadata, custom event emission). + + Args: + request: The Responses API request. + result_generator: Async iterator yielding SimpleContext objects. + request_id: The request ID for error reporting. + raise_if_error: Callback to raise on error finish reasons. + create_logprobs: Callback to create logprob objects from + (token_ids, logprobs, tokenizer, top_logprobs). None when + logprobs not requested. + top_logprobs: Number of top logprobs to include. + increment_sequence_number: Callback to assign sequence numbers + to streaming events. + """ + current_content_index = 0 + current_output_index = 0 + current_item_id = "" + current_tool_call_id = "" + current_tool_call_name = "" + streaming_state = StreamingParseState() + first_delta_sent = False + previous_delta_messages: list[DeltaMessage] = [] + async for ctx in result_generator: + assert isinstance(ctx, SimpleContext) + if ctx.last_output is None: + continue + if ctx.last_output.outputs: + output = ctx.last_output.outputs[0] + # finish_reason='error' indicates a retryable error + raise_if_error(output.finish_reason, request_id) + delta_text = output.text + delta_token_ids = as_list(output.token_ids) + + # Pass prompt_token_ids on first call only + prompt_token_ids = ( + ctx.last_output.prompt_token_ids + if streaming_state.prompt_is_reasoning_end is None + else None + ) + delta_message = self.extract_streaming_delta( + state=streaming_state, + delta_text=delta_text, + delta_token_ids=delta_token_ids, + prompt_token_ids=prompt_token_ids, + request=request, + ) + if not delta_message: + continue + if not first_delta_sent: + current_item_id = random_uuid() + if delta_message.tool_calls: + current_tool_call_id = f"call_{random_uuid()}" + assert len(delta_message.tool_calls) == 1, ( + "Multiple tool calls in one delta is not supported" + ) + assert delta_message.tool_calls[0].function is not None, ( + "Tool call without function is not supported" + ) + assert delta_message.tool_calls[0].function.name is not None, ( + "Tool call without function name is not supported" + ) + current_tool_call_name = delta_message.tool_calls[ + 0 + ].function.name + yield increment_sequence_number( + ResponseOutputItemAddedEvent( + type="response.output_item.added", + sequence_number=-1, + output_index=current_output_index, + item=ResponseFunctionToolCallItem( + type="function_call", + id=current_item_id, + call_id=current_tool_call_id, + name=current_tool_call_name, + arguments=delta_message.tool_calls[ + 0 + ].function.arguments, + status="in_progress", + ), + ) + ) + elif delta_message.reasoning: + yield increment_sequence_number( + ResponseOutputItemAddedEvent( + type="response.output_item.added", + sequence_number=-1, + output_index=current_output_index, + item=ResponseReasoningItem( + type="reasoning", + id=current_item_id, + summary=[], + status="in_progress", + ), + ) + ) + yield increment_sequence_number( + ResponseReasoningPartAddedEvent( + type="response.reasoning_part.added", + sequence_number=-1, + output_index=current_output_index, + item_id=current_item_id, + content_index=current_content_index, + part=ResponseReasoningTextContent( + text="", + type="reasoning_text", + ), + ) + ) + elif not delta_message.tool_calls: + yield increment_sequence_number( + ResponseOutputItemAddedEvent( + type="response.output_item.added", + sequence_number=-1, + output_index=current_output_index, + item=ResponseOutputMessage( + id=current_item_id, + type="message", + role="assistant", + content=[], + status="in_progress", + ), + ) + ) + yield increment_sequence_number( + ResponseContentPartAddedEvent( + type="response.content_part.added", + sequence_number=-1, + output_index=current_output_index, + item_id=current_item_id, + content_index=current_content_index, + part=ResponseOutputText( + type="output_text", + text="", + annotations=[], + logprobs=[], + ), + ) + ) + first_delta_sent = True + + # check delta message and previous delta message are + # same as content or reasoning content + if ( + previous_delta_messages + and previous_delta_messages[-1].reasoning is not None + and delta_message.content is not None + ): + # from reasoning to normal content, send done + # event for reasoning + reason_content = "".join( + pm.reasoning + for pm in previous_delta_messages + if pm.reasoning is not None + ) + + # delta message could have both reasoning and + # content. Include current delta's reasoning in the + # finalization since it may carry the tail end of + # reasoning text (e.g. when reasoning end and + # content start arrive in the same delta). + if delta_message.reasoning is not None: + yield increment_sequence_number( + ResponseReasoningTextDeltaEvent( + type="response.reasoning_text.delta", + sequence_number=-1, + content_index=current_content_index, + output_index=current_output_index, + item_id=current_item_id, + delta=delta_message.reasoning, + ) + ) + reason_content += delta_message.reasoning + delta_message = DeltaMessage(content=delta_message.content) + + yield increment_sequence_number( + ResponseReasoningTextDoneEvent( + type="response.reasoning_text.done", + item_id=current_item_id, + sequence_number=-1, + output_index=current_output_index, + content_index=current_content_index, + text=reason_content, + ) + ) + yield increment_sequence_number( + ResponseReasoningPartDoneEvent( + type="response.reasoning_part.done", + sequence_number=-1, + item_id=current_item_id, + output_index=current_output_index, + content_index=current_content_index, + part=ResponseReasoningTextContent( + text=reason_content, + type="reasoning_text", + ), + ) + ) + current_content_index = 0 + reasoning_item = ResponseReasoningItem( + type="reasoning", + content=[ + ResponseReasoningTextContent( + text=reason_content, + type="reasoning_text", + ), + ], + status="completed", + id=current_item_id, + summary=[], + ) + yield increment_sequence_number( + ResponseOutputItemDoneEvent( + type="response.output_item.done", + sequence_number=-1, + output_index=current_output_index, + item=reasoning_item, + ) + ) + current_output_index += 1 + current_item_id = str(uuid.uuid4()) + yield increment_sequence_number( + ResponseOutputItemAddedEvent( + type="response.output_item.added", + sequence_number=-1, + output_index=current_output_index, + item=ResponseOutputMessage( + id=current_item_id, + type="message", + role="assistant", + content=[], + status="in_progress", + ), + ) + ) + yield increment_sequence_number( + ResponseContentPartAddedEvent( + type="response.content_part.added", + sequence_number=-1, + output_index=current_output_index, + item_id=current_item_id, + content_index=current_content_index, + part=ResponseOutputText( + type="output_text", + text="", + annotations=[], + logprobs=[], + ), + ) + ) + # reset previous delta messages + previous_delta_messages = [] + if delta_message.tool_calls and delta_message.tool_calls[0].function: + if delta_message.tool_calls[0].function.arguments: + yield increment_sequence_number( + ResponseFunctionCallArgumentsDeltaEvent( + type=("response.function_call_arguments.delta"), + sequence_number=-1, + output_index=current_output_index, + item_id=current_item_id, + delta=delta_message.tool_calls[0].function.arguments, + ) + ) + # tool call initiated with no arguments + elif delta_message.tool_calls[0].function.name: + # send done with current content part + # and add new function call item + yield increment_sequence_number( + ResponseTextDoneEvent( + type="response.output_text.done", + sequence_number=-1, + output_index=current_output_index, + content_index=current_content_index, + text="", + logprobs=[], + item_id=current_item_id, + ) + ) + yield increment_sequence_number( + ResponseContentPartDoneEvent( + type="response.content_part.done", + sequence_number=-1, + item_id=current_item_id, + output_index=current_output_index, + content_index=current_content_index, + part=ResponseOutputText( + type="output_text", + text="", + annotations=[], + logprobs=[], + ), + ) + ) + yield increment_sequence_number( + ResponseOutputItemDoneEvent( + type="response.output_item.done", + sequence_number=-1, + output_index=current_output_index, + item=ResponseOutputMessage( + id=current_item_id, + type="message", + role="assistant", + content=[], + status="completed", + ), + ) + ) + current_output_index += 1 + current_item_id = random_uuid() + assert delta_message.tool_calls[0].function is not None + current_tool_call_name = delta_message.tool_calls[ + 0 + ].function.name + current_tool_call_id = f"call_{random_uuid()}" + yield increment_sequence_number( + ResponseOutputItemAddedEvent( + type="response.output_item.added", + sequence_number=-1, + output_index=current_output_index, + item=ResponseFunctionToolCallItem( + type="function_call", + id=current_item_id, + call_id=current_tool_call_id, + name=current_tool_call_name, + arguments="", + status="in_progress", + ), + ) + ) + # skip content part for tool call + current_content_index = 1 + continue + elif delta_message.reasoning is not None: + yield increment_sequence_number( + ResponseReasoningTextDeltaEvent( + type="response.reasoning_text.delta", + sequence_number=-1, + content_index=current_content_index, + output_index=current_output_index, + item_id=current_item_id, + delta=delta_message.reasoning, + ) + ) + elif delta_message.content: + yield increment_sequence_number( + ResponseTextDeltaEvent( + type="response.output_text.delta", + sequence_number=-1, + content_index=current_content_index, + output_index=current_output_index, + item_id=current_item_id, + delta=delta_message.content, + logprobs=( + create_logprobs( + token_ids=output.token_ids, + logprobs=output.logprobs, + tokenizer=self.model_tokenizer, + top_logprobs=top_logprobs, + ) + if create_logprobs + else [] + ), + ) + ) + + previous_delta_messages.append(delta_message) + + if previous_delta_messages: + parts = [] + for pm in previous_delta_messages: + if pm.tool_calls: + assert len(pm.tool_calls) == 1, ( + "Multiple tool calls in one delta is not supported" + ) + assert pm.tool_calls[0].function is not None, ( + "Tool call without function is not supported" + ) + parts.append(pm.tool_calls[0].function.arguments or "") + + tool_call_arguments = "".join(parts) + if tool_call_arguments: + yield increment_sequence_number( + ResponseFunctionCallArgumentsDoneEvent( + type="response.function_call_arguments.done", + sequence_number=-1, + output_index=current_output_index, + item_id=current_item_id, + arguments=tool_call_arguments, + name=current_tool_call_name, + ) + ) + current_content_index = 0 + function_call_item = ResponseFunctionToolCall( + type="function_call", + name=current_tool_call_name, + arguments=tool_call_arguments, + status="completed", + id=current_item_id, + call_id=current_tool_call_id, + ) + yield increment_sequence_number( + ResponseOutputItemDoneEvent( + type="response.output_item.done", + sequence_number=-1, + output_index=current_output_index, + item=function_call_item, + ) + ) + + elif previous_delta_messages[-1].reasoning is not None: + reason_content = "".join( + pm.reasoning + for pm in previous_delta_messages + if pm.reasoning is not None + ) + yield increment_sequence_number( + ResponseReasoningTextDoneEvent( + type="response.reasoning_text.done", + item_id=current_item_id, + sequence_number=-1, + output_index=current_output_index, + content_index=current_content_index, + text=reason_content, + ) + ) + yield increment_sequence_number( + ResponseReasoningPartDoneEvent( + type="response.reasoning_part.done", + sequence_number=-1, + item_id=current_item_id, + output_index=current_output_index, + content_index=current_content_index, + part=ResponseReasoningTextContent( + text=reason_content, + type="reasoning_text", + ), + ) + ) + reasoning_item = ResponseReasoningItem( + type="reasoning", + content=[ + ResponseReasoningTextContent( + text=reason_content, + type="reasoning_text", + ), + ], + status="completed", + id=current_item_id, + summary=[], + ) + yield increment_sequence_number( + ResponseOutputItemDoneEvent( + type="response.output_item.done", + sequence_number=-1, + output_index=current_output_index, + item=reasoning_item, + ) + ) + elif previous_delta_messages[-1].content: + final_content = "".join( + pm.content for pm in previous_delta_messages if pm.content + ) + yield increment_sequence_number( + ResponseTextDoneEvent( + type="response.output_text.done", + sequence_number=-1, + output_index=current_output_index, + content_index=current_content_index, + text=final_content, + logprobs=[], + item_id=current_item_id, + ) + ) + part = ResponseOutputText( + text=final_content, + type="output_text", + annotations=[], + ) + yield increment_sequence_number( + ResponseContentPartDoneEvent( + type="response.content_part.done", + sequence_number=-1, + item_id=current_item_id, + output_index=current_output_index, + content_index=current_content_index, + part=part, + ) + ) + item = ResponseOutputMessage( + type="message", + role="assistant", + content=[ + part, + ], + status="completed", + id=current_item_id, + summary=[], + ) + yield increment_sequence_number( + ResponseOutputItemDoneEvent( + type="response.output_item.done", + sequence_number=-1, + output_index=current_output_index, + item=item, + ) + ) + class _WrappedParser(DelegatingParser): """