Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 45 additions & 3 deletions vllm/entrypoints/openai/engine/serving.py
Original file line number Diff line number Diff line change
Expand Up @@ -994,9 +994,51 @@ async def _generate_with_builtin_tools(
)

async for res in generator:
context.append_output(res)
# NOTE(woosuk): The stop condition is handled by the engine.
yield context
# For streaming harmony models with speculative decoding
# (Eagle), multi-token RequestOutputs can span channel
# boundaries. Split them into single-token outputs so the
# harmony parser processes tokens one at a time, preventing
# intermediate channel transitions from being lost.
if (
isinstance(context, StreamingHarmonyContext)
and res.outputs
and len(res.outputs[0].token_ids) > 1
):
token_ids = res.outputs[0].token_ids
for tok_i, token_id in enumerate(token_ids):
is_last = tok_i == len(token_ids) - 1
single_output = CompletionOutput(
index=res.outputs[0].index,
text="",
token_ids=[token_id],
cumulative_logprob=res.outputs[0].cumulative_logprob,
logprobs=(
res.outputs[0].logprobs[tok_i : tok_i + 1]
if res.outputs[0].logprobs
else None
),
finish_reason=(
res.outputs[0].finish_reason if is_last else None
),
stop_reason=(
res.outputs[0].stop_reason if is_last else None
),
)
single_res = RequestOutput(
request_id=res.request_id,
prompt=res.prompt,
prompt_token_ids=res.prompt_token_ids,
prompt_logprobs=res.prompt_logprobs,
outputs=[single_output],
finished=res.finished if is_last else False,
)
context.append_output(single_res)
yield context
else:
context.append_output(res)
# NOTE(woosuk): The stop condition is handled by
# the engine.
yield context

if not context.need_builtin_tool_call():
# The model did not ask for a tool call, so we're done.
Expand Down
16 changes: 16 additions & 0 deletions vllm/entrypoints/openai/responses/serving.py
Original file line number Diff line number Diff line change
Expand Up @@ -1807,6 +1807,7 @@ async def _process_harmony_streaming_events(
],
) -> AsyncGenerator[StreamingResponsesResponse, None]:
state = StreamingState()
ctx = None

async for ctx in result_generator:
assert isinstance(ctx, StreamingHarmonyContext)
Expand All @@ -1829,6 +1830,15 @@ async def _process_harmony_streaming_events(
for event in emit_tool_action_events(ctx, state, self.tool_server):
yield _increment_sequence_number_and_return(event)

# After the loop ends, emit done events for the last message
# that was being streamed. During the loop, done events are only
# emitted when a *new* message starts (is_expecting_start), so
# the final message never gets its done events.
if ctx is not None and hasattr(ctx, "parser") and len(ctx.parser.messages) > 0:
last_item = ctx.parser.messages[-1]
for event in emit_previous_item_done_events(last_item, state):
yield _increment_sequence_number_and_return(event)

async def responses_stream_generator(
self,
request: ResponsesRequest,
Expand All @@ -1854,6 +1864,12 @@ def _increment_sequence_number_and_return(
# Set sequence_number if the event has this attribute
if hasattr(event, "sequence_number"):
event.sequence_number = sequence_number
# Also fix sequence_number on nested items (e.g.
# ResponseFunctionToolCall inside ResponseOutputItemDoneEvent)
# which are created with placeholder sequence_number=-1.
item = getattr(event, "item", None)
if item is not None and hasattr(item, "sequence_number"):
item.sequence_number = sequence_number
sequence_number += 1
return event

Expand Down
Loading