Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 52 additions & 18 deletions vllm/entrypoints/openai/chat_completion/serving.py
Original file line number Diff line number Diff line change
Expand Up @@ -708,24 +708,64 @@ async def chat_completion_stream_generator(
harmony_parser = harmony_parsers[i]
prev_recipient = harmony_parser.current_recipient

# Track accumulated content per token with their state
# Process each token and extract its delta immediately,
# before processing the next token. This is critical
# because extract_harmony_streaming_delta reads
# harmony_parser.messages to compute tool call indices.
# Processing all tokens first would advance the parser
# state past the point where early tokens' indices are
# correct, causing tool call argument fragments to be
# split across different tool call indices.
token_states: list[TokenState] = []
combined_delta: DeltaMessage | None = None
for token_id in output.token_ids:
harmony_parser.process(token_id)
token_delta = harmony_parser.last_content_delta or ""
token_states.append(
TokenState(
harmony_parser.current_channel,
harmony_parser.current_recipient,
token_delta,
ts = TokenState(
harmony_parser.current_channel,
harmony_parser.current_recipient,
token_delta,
)
token_states.append(ts)

# Extract delta for THIS token immediately
per_tok_delta, per_tok_tools = (
extract_harmony_streaming_delta(
harmony_parser=harmony_parser,
token_states=[ts],
prev_recipient=prev_recipient,
include_reasoning=request.include_reasoning,
)
)
if per_tok_tools:
harmony_tools_streamed[i] = True
if ts.recipient:
prev_recipient = ts.recipient
if per_tok_delta is not None:
if combined_delta is None:
combined_delta = per_tok_delta
else:
if per_tok_delta.content:
combined_delta.content = (
combined_delta.content or ""
) + per_tok_delta.content
if per_tok_delta.reasoning:
combined_delta.reasoning = (
combined_delta.reasoning or ""
) + per_tok_delta.reasoning
if per_tok_delta.tool_calls:
if combined_delta.tool_calls is None:
combined_delta.tool_calls = []
combined_delta.tool_calls.extend(
per_tok_delta.tool_calls
)

delta_text = "".join(delta for _, _, delta in token_states)
cur_channel = harmony_parser.current_channel

# handle the case where several tokens where generated at once
# including the final token, leading to a delta in the text
# but the current channel to be empty (start state)
# handle the case where several tokens where generated
# at once including the final token, leading to a delta
# in the text but the current channel to be empty
if not cur_channel and delta_text:
cur_channel = "final"
else:
Expand Down Expand Up @@ -757,15 +797,8 @@ async def chat_completion_stream_generator(
current_token_ids = as_list(output.token_ids)

if self.use_harmony:
delta_message, tools_streamed_flag = (
extract_harmony_streaming_delta(
harmony_parser=harmony_parser,
token_states=token_states,
prev_recipient=prev_recipient,
include_reasoning=request.include_reasoning,
)
)
harmony_tools_streamed[i] |= tools_streamed_flag
# Delta already computed in the per-token loop above
delta_message = combined_delta
# handle streaming deltas for tools with named tool_choice
elif tool_choice_function_name:
# When encountering think end id in prompt_token_ids
Expand Down Expand Up @@ -1108,6 +1141,7 @@ async def chat_completion_stream_generator(
delta_message, output
)
and tool_parser
and not self.use_harmony
):
latest_delta_len = 0
if (
Expand Down
Loading