From 2e1d598352df0a92a2e362e319b159801c8846d3 Mon Sep 17 00:00:00 2001 From: Alex Bilichenko Date: Sun, 1 Feb 2026 20:20:14 +0000 Subject: [PATCH] [Bugfix] Fix tool call streaming for gpt-oss/Harmony models This PR fixes several issues with tool call handling for gpt-oss models using the Harmony streaming parser: 1. **IndexError in streaming generator**: Added `auto_tools_called` check before accessing `prev_tool_call_arr` to prevent IndexError when the array is empty. 2. **Missing tool call IDs in non-streaming responses**: Added proper ID generation for named tool choice and auto tool choice cases that were missing the required `id` field. 3. **Split tool calls in streaming**: Fixed an issue where a single tool call was being split into multiple entries because: - Tool call IDs are now stored by recipient name (e.g., "functions.glob") instead of index number, since `base_index` changes between streaming calls as messages complete. - Continuation chunks now include the same ID as the opening chunk, allowing clients to properly merge them. - DeltaToolCalls with the same index are merged before sending to avoid multiple entries in one SSE chunk. Tested with opencode client against gpt-oss-120b model. --- .../openai/chat_completion/serving.py | 5 ++ .../openai/chat_completion/stream_harmony.py | 66 ++++++++++++++----- 2 files changed, 56 insertions(+), 15 deletions(-) diff --git a/vllm/entrypoints/openai/chat_completion/serving.py b/vllm/entrypoints/openai/chat_completion/serving.py index bf8beb9b97ab..5bbc185536c8 100644 --- a/vllm/entrypoints/openai/chat_completion/serving.py +++ b/vllm/entrypoints/openai/chat_completion/serving.py @@ -524,6 +524,9 @@ async def chat_completion_stream_generator( get_streamable_parser_for_assistant() for _ in range(num_choices) ] harmony_tools_streamed = [False] * num_choices + harmony_tool_call_ids: list[dict[str, tuple[int, str]]] = [ + {} for _ in range(num_choices) + ] tools_streamed = [False] * num_choices if isinstance(request.tool_choice, ChatCompletionNamedToolChoiceParam): @@ -763,6 +766,7 @@ async def chat_completion_stream_generator( token_states=token_states, prev_recipient=prev_recipient, include_reasoning=request.include_reasoning, + tool_call_ids=harmony_tool_call_ids[i], ) ) harmony_tools_streamed[i] |= tools_streamed_flag @@ -1108,6 +1112,7 @@ async def chat_completion_stream_generator( delta_message, output ) and tool_parser + and auto_tools_called ): latest_delta_len = 0 if ( diff --git a/vllm/entrypoints/openai/chat_completion/stream_harmony.py b/vllm/entrypoints/openai/chat_completion/stream_harmony.py index 87f2f9b92275..dec617b41b1f 100644 --- a/vllm/entrypoints/openai/chat_completion/stream_harmony.py +++ b/vllm/entrypoints/openai/chat_completion/stream_harmony.py @@ -17,6 +17,9 @@ DeltaMessage, DeltaToolCall, ) +from vllm.logger import init_logger + +logger = init_logger(__name__) class TokenState(NamedTuple): @@ -30,6 +33,7 @@ def extract_harmony_streaming_delta( token_states: list[TokenState], prev_recipient: str | None, include_reasoning: bool, + tool_call_ids: dict[str, tuple[int, str]] | None = None, ) -> tuple[DeltaMessage | None, bool]: """ Extract a DeltaMessage from harmony parser state during streaming. @@ -39,10 +43,14 @@ def extract_harmony_streaming_delta( token_states: List of TokenState tuples for each token prev_recipient: Previous recipient for detecting tool call transitions include_reasoning: Whether to include reasoning content + tool_call_ids: Optional dict mapping tool call index to its ID, + used to include ID in continuation chunks Returns: A tuple of (DeltaMessage or None, tools_streamed_flag) """ + if tool_call_ids is None: + tool_call_ids = {} if not token_states: return None, False @@ -106,43 +114,51 @@ def extract_harmony_streaming_delta( and group.recipient and group.recipient.startswith("functions.") ): - opened_new_call = False if prev_recipient != group.recipient: - # New tool call - emit the opening message + # New tool call - emit the opening message with any arguments tool_name = group.recipient.split("functions.", 1)[1] + tool_id = make_tool_call_id() + # Store by recipient so we can look it up later even if base_index changes + tool_call_ids[group.recipient] = (next_tool_index, tool_id) + logger.debug( + "New tool call: index=%d, id=%s, recipient=%s", + next_tool_index, tool_id, group.recipient + ) tool_messages.append( DeltaToolCall( - id=make_tool_call_id(), + id=tool_id, type="function", function=DeltaFunctionCall( name=tool_name, - arguments="", + arguments=group.text or "", ), index=next_tool_index, ) ) - opened_new_call = True prev_recipient = group.recipient # Increment for subsequent new tool calls next_tool_index += 1 - - if group.text: - # Stream arguments for the ongoing tool call - if opened_new_call: - # Just opened in this group - tool_call_index = next_tool_index - 1 + elif group.text: + # Continuing arguments for an existing tool call + # Look up the index and ID by recipient + stored = tool_call_ids.get(group.recipient) + if stored: + tool_call_index, tool_id = stored else: - # Continuing from previous chunk - # If ongoing_tool_index is None here, it means - # we're continuing a call but prev_recipient - # wasn't a function. Use base_index. + # Fallback if not found (shouldn't happen) tool_call_index = ( ongoing_tool_index if ongoing_tool_index is not None else base_index ) + tool_id = None + logger.debug( + "Continue tool call: index=%d, id=%s, recipient=%s", + tool_call_index, tool_id, group.recipient + ) tool_messages.append( DeltaToolCall( + id=tool_id, index=tool_call_index, function=DeltaFunctionCall(arguments=group.text), ) @@ -154,6 +170,26 @@ def extract_harmony_streaming_delta( elif group.channel == "analysis" and include_reasoning: combined_reasoning += group.text + # Merge tool messages with the same index to avoid sending multiple + # entries for the same tool call in one SSE chunk + if tool_messages: + merged_tools: dict[int, DeltaToolCall] = {} + for tc in tool_messages: + if tc.index in merged_tools: + # Merge arguments into existing entry + existing = merged_tools[tc.index] + if tc.function and tc.function.arguments: + if existing.function: + existing.function.arguments = ( + (existing.function.arguments or "") + + tc.function.arguments + ) + else: + existing.function = tc.function + else: + merged_tools[tc.index] = tc + tool_messages = list(merged_tools.values()) + # Combine all non-empty fields into a single message if content_encountered or combined_reasoning or tool_messages: delta_kwargs: dict[str, str | list[DeltaToolCall]] = {}