From 858df9c09d40115553633df7374665ba0451ab27 Mon Sep 17 00:00:00 2001 From: Pradyun Ramadorai Date: Sat, 14 Mar 2026 20:45:53 +0000 Subject: [PATCH] [Bugfix] Fix Responses API harmony streaming: token splitting, missing done events, nested sequence_number Three fixes for Responses API streaming with harmony models: 1. Multi-token RequestOutput splitting for speculative decoding: With Eagle, RequestOutputs can contain multiple tokens that span channel boundaries. StreamingHarmonyContext.append_output() processes all tokens in a loop but only yields once, losing intermediate channel transitions (e.g., reasoning to function call). Fix: Split multi- token RequestOutputs into single-token ones before append_output() so the harmony parser processes tokens one at a time. 2. Final message done events not emitted: During streaming, done events (output_text.done, content_part.done, output_item.done) are only emitted when a new message starts (is_expecting_start). The last message never triggers this because the generator ends. Fix: After the async for loop, emit done events for the final message. 3. Nested item sequence_number stuck at -1: Events are created with placeholder sequence_number=-1. _increment_sequence_number_and_return fixes the top-level event but not nested items (e.g., ResponseFunctionToolCall inside ResponseOutputItemDoneEvent.item). Fix: Also set sequence_number on nested item if present. Signed-off-by: Pradyun Ramadorai Co-authored-by: Claude --- vllm/entrypoints/openai/engine/serving.py | 48 ++++++++++++++++++-- vllm/entrypoints/openai/responses/serving.py | 16 +++++++ 2 files changed, 61 insertions(+), 3 deletions(-) diff --git a/vllm/entrypoints/openai/engine/serving.py b/vllm/entrypoints/openai/engine/serving.py index 2049b3adfd3c..4ea38819a3b5 100644 --- a/vllm/entrypoints/openai/engine/serving.py +++ b/vllm/entrypoints/openai/engine/serving.py @@ -994,9 +994,51 @@ async def _generate_with_builtin_tools( ) async for res in generator: - context.append_output(res) - # NOTE(woosuk): The stop condition is handled by the engine. - yield context + # For streaming harmony models with speculative decoding + # (Eagle), multi-token RequestOutputs can span channel + # boundaries. Split them into single-token outputs so the + # harmony parser processes tokens one at a time, preventing + # intermediate channel transitions from being lost. + if ( + isinstance(context, StreamingHarmonyContext) + and res.outputs + and len(res.outputs[0].token_ids) > 1 + ): + token_ids = res.outputs[0].token_ids + for tok_i, token_id in enumerate(token_ids): + is_last = tok_i == len(token_ids) - 1 + single_output = CompletionOutput( + index=res.outputs[0].index, + text="", + token_ids=[token_id], + cumulative_logprob=res.outputs[0].cumulative_logprob, + logprobs=( + res.outputs[0].logprobs[tok_i : tok_i + 1] + if res.outputs[0].logprobs + else None + ), + finish_reason=( + res.outputs[0].finish_reason if is_last else None + ), + stop_reason=( + res.outputs[0].stop_reason if is_last else None + ), + ) + single_res = RequestOutput( + request_id=res.request_id, + prompt=res.prompt, + prompt_token_ids=res.prompt_token_ids, + prompt_logprobs=res.prompt_logprobs, + outputs=[single_output], + finished=res.finished if is_last else False, + ) + context.append_output(single_res) + yield context + else: + context.append_output(res) + # NOTE(woosuk): The stop condition is handled by + # the engine. + yield context if not context.need_builtin_tool_call(): # The model did not ask for a tool call, so we're done. diff --git a/vllm/entrypoints/openai/responses/serving.py b/vllm/entrypoints/openai/responses/serving.py index 6d0041813e35..f159452847eb 100644 --- a/vllm/entrypoints/openai/responses/serving.py +++ b/vllm/entrypoints/openai/responses/serving.py @@ -1807,6 +1807,7 @@ async def _process_harmony_streaming_events( ], ) -> AsyncGenerator[StreamingResponsesResponse, None]: state = StreamingState() + ctx = None async for ctx in result_generator: assert isinstance(ctx, StreamingHarmonyContext) @@ -1829,6 +1830,15 @@ async def _process_harmony_streaming_events( for event in emit_tool_action_events(ctx, state, self.tool_server): yield _increment_sequence_number_and_return(event) + # After the loop ends, emit done events for the last message + # that was being streamed. During the loop, done events are only + # emitted when a *new* message starts (is_expecting_start), so + # the final message never gets its done events. + if ctx is not None and hasattr(ctx, "parser") and len(ctx.parser.messages) > 0: + last_item = ctx.parser.messages[-1] + for event in emit_previous_item_done_events(last_item, state): + yield _increment_sequence_number_and_return(event) + async def responses_stream_generator( self, request: ResponsesRequest, @@ -1854,6 +1864,12 @@ def _increment_sequence_number_and_return( # Set sequence_number if the event has this attribute if hasattr(event, "sequence_number"): event.sequence_number = sequence_number + # Also fix sequence_number on nested items (e.g. + # ResponseFunctionToolCall inside ResponseOutputItemDoneEvent) + # which are created with placeholder sequence_number=-1. + item = getattr(event, "item", None) + if item is not None and hasattr(item, "sequence_number"): + item.sequence_number = sequence_number sequence_number += 1 return event