diff --git a/vllm/entrypoints/openai/chat_completion/serving.py b/vllm/entrypoints/openai/chat_completion/serving.py index bf8beb9b97ab..47ec3e414e62 100644 --- a/vllm/entrypoints/openai/chat_completion/serving.py +++ b/vllm/entrypoints/openai/chat_completion/serving.py @@ -708,24 +708,64 @@ async def chat_completion_stream_generator( harmony_parser = harmony_parsers[i] prev_recipient = harmony_parser.current_recipient - # Track accumulated content per token with their state + # Process each token and extract its delta immediately, + # before processing the next token. This is critical + # because extract_harmony_streaming_delta reads + # harmony_parser.messages to compute tool call indices. + # Processing all tokens first would advance the parser + # state past the point where early tokens' indices are + # correct, causing tool call argument fragments to be + # split across different tool call indices. token_states: list[TokenState] = [] + combined_delta: DeltaMessage | None = None for token_id in output.token_ids: harmony_parser.process(token_id) token_delta = harmony_parser.last_content_delta or "" - token_states.append( - TokenState( - harmony_parser.current_channel, - harmony_parser.current_recipient, - token_delta, + ts = TokenState( + harmony_parser.current_channel, + harmony_parser.current_recipient, + token_delta, + ) + token_states.append(ts) + + # Extract delta for THIS token immediately + per_tok_delta, per_tok_tools = ( + extract_harmony_streaming_delta( + harmony_parser=harmony_parser, + token_states=[ts], + prev_recipient=prev_recipient, + include_reasoning=request.include_reasoning, ) ) + if per_tok_tools: + harmony_tools_streamed[i] = True + if ts.recipient: + prev_recipient = ts.recipient + if per_tok_delta is not None: + if combined_delta is None: + combined_delta = per_tok_delta + else: + if per_tok_delta.content: + combined_delta.content = ( + combined_delta.content or "" + ) + per_tok_delta.content + if per_tok_delta.reasoning: + combined_delta.reasoning = ( + combined_delta.reasoning or "" + ) + per_tok_delta.reasoning + if per_tok_delta.tool_calls: + if combined_delta.tool_calls is None: + combined_delta.tool_calls = [] + combined_delta.tool_calls.extend( + per_tok_delta.tool_calls + ) + delta_text = "".join(delta for _, _, delta in token_states) cur_channel = harmony_parser.current_channel - # handle the case where several tokens where generated at once - # including the final token, leading to a delta in the text - # but the current channel to be empty (start state) + # handle the case where several tokens where generated + # at once including the final token, leading to a delta + # in the text but the current channel to be empty if not cur_channel and delta_text: cur_channel = "final" else: @@ -757,15 +797,8 @@ async def chat_completion_stream_generator( current_token_ids = as_list(output.token_ids) if self.use_harmony: - delta_message, tools_streamed_flag = ( - extract_harmony_streaming_delta( - harmony_parser=harmony_parser, - token_states=token_states, - prev_recipient=prev_recipient, - include_reasoning=request.include_reasoning, - ) - ) - harmony_tools_streamed[i] |= tools_streamed_flag + # Delta already computed in the per-token loop above + delta_message = combined_delta # handle streaming deltas for tools with named tool_choice elif tool_choice_function_name: # When encountering think end id in prompt_token_ids @@ -1108,6 +1141,7 @@ async def chat_completion_stream_generator( delta_message, output ) and tool_parser + and not self.use_harmony ): latest_delta_len = 0 if (