From e13436a262b9deb80be0c276e4337536e04be841 Mon Sep 17 00:00:00 2001 From: madskildegaard Date: Thu, 25 Sep 2025 16:39:50 +0200 Subject: [PATCH 1/4] func call streaming Signed-off-by: madskildegaard --- vllm/entrypoints/openai/serving_responses.py | 204 ++++++++++++++++-- .../tool_parsers/abstract_tool_parser.py | 2 +- 2 files changed, 185 insertions(+), 21 deletions(-) diff --git a/vllm/entrypoints/openai/serving_responses.py b/vllm/entrypoints/openai/serving_responses.py index 0675ad7d5582..aad37b590452 100644 --- a/vllm/entrypoints/openai/serving_responses.py +++ b/vllm/entrypoints/openai/serving_responses.py @@ -19,6 +19,8 @@ # yapf conflicts with isort for this block # yapf: disable from openai.types.responses import (ResponseCreatedEvent, + ResponseFunctionCallArgumentsDeltaEvent, + ResponseFunctionCallArgumentsDoneEvent, ResponseFunctionToolCall, ResponseInProgressEvent, ResponseOutputItem, @@ -1016,6 +1018,11 @@ async def _process_simple_streaming_events( current_content_index = 0 current_output_index = 0 current_item_id = "" + current_tool_call_id = "" + current_tool_call_name = "" + tool_parser = None + if self.tool_parser: + tool_parser = self.tool_parser(tokenizer) reasoning_parser = None if self.reasoning_parser: reasoning_parser = self.reasoning_parser(tokenizer) @@ -1029,6 +1036,18 @@ async def _process_simple_streaming_events( continue if ctx.last_output.outputs: output = ctx.last_output.outputs[0] + delta_message = DeltaMessage(content=output.text) + if tool_parser: + delta_message = tool_parser.extract_tool_calls_streaming( + previous_text=previous_text, + current_text=previous_text + output.text, + delta_text=output.text, + previous_token_ids=previous_token_ids, + current_token_ids=previous_token_ids + + output.token_ids, + delta_token_ids=output.token_ids, + request=request, + ) if reasoning_parser: delta_message = \ reasoning_parser.extract_reasoning_content_streaming( @@ -1040,15 +1059,36 @@ async def _process_simple_streaming_events( output.token_ids, delta_token_ids=output.token_ids, ) - else: - delta_message = DeltaMessage(content=output.text, ) previous_text += output.text previous_token_ids += output.token_ids if not delta_message: continue if not first_delta_sent: current_item_id = str(uuid.uuid4()) - if delta_message.reasoning_content: + if delta_message.tool_calls: + # remove previous delta messages + previous_delta_messages = [] + current_tool_call_id = f"call_{random_uuid()}" + current_tool_call_name = delta_message.tool_calls[ + 0].function.name + yield _send_event( + openai_responses_types. + ResponseOutputItemAddedEvent( + type="response.output_item.added", + sequence_number=-1, + output_index=current_output_index, + item=openai_responses_types. + ResponseFunctionToolCallItem( + type="function_call", + id=current_item_id, + call_id=current_tool_call_id, + name=current_tool_call_name, + arguments=delta_message.tool_calls[0]. + function.arguments, + status="in_progress", + ), + )) + elif delta_message.reasoning_content: yield _send_event( openai_responses_types. ResponseOutputItemAddedEvent( @@ -1079,23 +1119,105 @@ async def _process_simple_streaming_events( status="in_progress", ), )) + if not delta_message.tool_calls: + yield _send_event( + openai_responses_types. + ResponseContentPartAddedEvent( + type="response.content_part.added", + sequence_number=-1, + output_index=current_output_index, + item_id=current_item_id, + content_index=current_content_index, + part=openai_responses_types.ResponseOutputText( + type="output_text", + text="", + annotations=[], + logprobs=[], + ), + )) + for pm in previous_delta_messages: + if pm.content: + current_content_index += 1 + yield _send_event( + openai_responses_types. + ResponseTextDeltaEvent( + type="response.output_text.delta", + sequence_number=-1, + content_index=current_content_index, + output_index=current_output_index, + item_id=current_item_id, + delta=pm.content, + logprobs=self. + _create_stream_response_logprobs( + token_ids=output.token_ids, + logprobs=output.logprobs, + tokenizer=tokenizer, + top_logprobs=request.top_logprobs, + ) if + request.is_include_output_logprobs() + else [], + )) + current_content_index += 1 + first_delta_sent = True + + if previous_delta_messages and previous_delta_messages[ + -1].tool_calls and ( + delta_message.content is not None or + (delta_message.tool_calls + and delta_message.tool_calls[0].function.name)): + tool_call_arguments = ''.join( + pm.tool_calls[0].function.arguments + for pm in previous_delta_messages if pm.tool_calls) yield _send_event( - openai_responses_types.ResponseContentPartAddedEvent( - type="response.content_part.added", + ResponseFunctionCallArgumentsDoneEvent( + type="response.function_call_arguments.done", sequence_number=-1, output_index=current_output_index, item_id=current_item_id, - content_index=current_content_index, - part=openai_responses_types.ResponseOutputText( - type="output_text", - text="", - annotations=[], - logprobs=[], - ), + arguments=tool_call_arguments, + )) + current_content_index = 0 + function_call_item = ResponseFunctionToolCall( + type="function_call", + name=current_tool_call_name, + arguments=tool_call_arguments, + status="completed", + id=current_item_id, + call_id=current_tool_call_id, + ) + yield _send_event( + ResponseOutputItemDoneEvent( + type="response.output_item.done", + sequence_number=-1, + output_index=current_output_index, + item=function_call_item, )) + current_tool_call_name = "" + current_tool_call_id = f"call_{random_uuid()}" + current_item_id = str(uuid.uuid4()) + current_output_index += 1 + if delta_message.tool_calls: + # new tool call output started + yield _send_event( + openai_responses_types. + ResponseOutputItemAddedEvent( + type="response.output_item.added", + sequence_number=-1, + output_index=current_output_index, + item=openai_responses_types. + ResponseFunctionToolCallItem( + type="function_call", + id=current_item_id, + call_id=current_tool_call_id, + name=delta_message.tool_calls[0].function. + name, + arguments=delta_message.tool_calls[0]. + function.arguments, + status="in_progress", + ), + )) current_content_index += 1 - first_delta_sent = True - # todo(kebe7jun) tool call support + previous_delta_messages = [] # check delta message and previous delta message are # same as content or reasoning content @@ -1169,7 +1291,17 @@ async def _process_simple_streaming_events( # reset previous delta messages previous_delta_messages = [] - if delta_message.reasoning_content is not None: + if delta_message.tool_calls: + yield _send_event( + ResponseFunctionCallArgumentsDeltaEvent( + type="response.function_call_arguments.delta", + sequence_number=-1, + output_index=current_output_index, + item_id=current_item_id, + delta=delta_message.tool_calls[0].function. + arguments, + )) + elif delta_message.reasoning_content is not None: yield _send_event( ResponseReasoningTextDeltaEvent( type="response.reasoning_text.delta", @@ -1179,7 +1311,7 @@ async def _process_simple_streaming_events( item_id=current_item_id, delta=delta_message.reasoning_content, )) - elif delta_message.content is not None: + elif delta_message.content: yield _send_event( openai_responses_types.ResponseTextDeltaEvent( type="response.output_text.delta", @@ -1195,11 +1327,43 @@ async def _process_simple_streaming_events( top_logprobs=request.top_logprobs, ) if request.is_include_output_logprobs() else [], )) - current_content_index += 1 + else: + continue + current_content_index += 1 previous_delta_messages.append(delta_message) + if previous_delta_messages: - if previous_delta_messages[-1].reasoning_content is not None: + tool_call_arguments = ''.join(pm.tool_calls[0].function.arguments + for pm in previous_delta_messages + if pm.tool_calls) + if tool_call_arguments: + yield _send_event( + ResponseFunctionCallArgumentsDoneEvent( + type="response.function_call_arguments.done", + sequence_number=-1, + output_index=current_output_index, + item_id=current_item_id, + arguments=tool_call_arguments, + )) + current_content_index = 0 + function_call_item = ResponseFunctionToolCall( + type="function_call", + name=current_tool_call_name, + arguments=tool_call_arguments, + status="completed", + id=current_item_id, + call_id=current_tool_call_id, + ) + yield _send_event( + ResponseOutputItemDoneEvent( + type="response.output_item.done", + sequence_number=-1, + output_index=current_output_index, + item=function_call_item, + )) + + elif previous_delta_messages[-1].reasoning_content is not None: reason_content = ''.join(pm.reasoning_content for pm in previous_delta_messages if pm.reasoning_content is not None) @@ -1232,10 +1396,10 @@ async def _process_simple_streaming_events( output_index=current_output_index, item=reasoning_item, )) - elif previous_delta_messages[-1].content is not None: + elif previous_delta_messages[-1].content: final_content = ''.join(pm.content for pm in previous_delta_messages - if pm.content is not None) + if pm.content) yield _send_event( openai_responses_types.ResponseTextDoneEvent( type="response.output_text.done", diff --git a/vllm/entrypoints/openai/tool_parsers/abstract_tool_parser.py b/vllm/entrypoints/openai/tool_parsers/abstract_tool_parser.py index bb1f67b54069..e891cf7877cc 100644 --- a/vllm/entrypoints/openai/tool_parsers/abstract_tool_parser.py +++ b/vllm/entrypoints/openai/tool_parsers/abstract_tool_parser.py @@ -69,7 +69,7 @@ def extract_tool_calls_streaming( previous_token_ids: Sequence[int], current_token_ids: Sequence[int], delta_token_ids: Sequence[int], - request: ChatCompletionRequest, + request: Union[ChatCompletionRequest, ResponsesRequest], ) -> Union[DeltaMessage, None]: """ Instance method that should be implemented for extracting tool calls From bbda789f8c1ff8acdd3dbfaf0c5e726a9cce377b Mon Sep 17 00:00:00 2001 From: madskildegaard Date: Thu, 25 Sep 2025 16:52:23 +0200 Subject: [PATCH 2/4] add ResponsesRequest type hint for extract_tool_call_streaming Signed-off-by: madskildegaard --- vllm/entrypoints/openai/serving_responses.py | 22 ------------------- .../tool_parsers/deepseekv3_tool_parser.py | 2 +- .../tool_parsers/glm4_moe_tool_parser.py | 2 +- .../granite_20b_fc_tool_parser.py | 2 +- .../tool_parsers/granite_tool_parser.py | 2 +- .../openai/tool_parsers/hermes_tool_parser.py | 2 +- .../tool_parsers/hunyuan_a13b_tool_parser.py | 2 +- .../tool_parsers/internlm2_tool_parser.py | 2 +- .../openai/tool_parsers/jamba_tool_parser.py | 2 +- .../tool_parsers/kimi_k2_tool_parser.py | 2 +- .../llama4_pythonic_tool_parser.py | 2 +- .../openai/tool_parsers/llama_tool_parser.py | 2 +- .../tool_parsers/minimax_tool_parser.py | 2 +- .../tool_parsers/mistral_tool_parser.py | 2 +- .../tool_parsers/phi4mini_tool_parser.py | 2 +- .../tool_parsers/pythonic_tool_parser.py | 2 +- .../tool_parsers/qwen3coder_tool_parser.py | 2 +- .../openai/tool_parsers/step3_tool_parser.py | 2 +- .../openai/tool_parsers/xlam_tool_parser.py | 2 +- 19 files changed, 18 insertions(+), 40 deletions(-) diff --git a/vllm/entrypoints/openai/serving_responses.py b/vllm/entrypoints/openai/serving_responses.py index aad37b590452..d0f012765d5f 100644 --- a/vllm/entrypoints/openai/serving_responses.py +++ b/vllm/entrypoints/openai/serving_responses.py @@ -1135,28 +1135,6 @@ async def _process_simple_streaming_events( logprobs=[], ), )) - for pm in previous_delta_messages: - if pm.content: - current_content_index += 1 - yield _send_event( - openai_responses_types. - ResponseTextDeltaEvent( - type="response.output_text.delta", - sequence_number=-1, - content_index=current_content_index, - output_index=current_output_index, - item_id=current_item_id, - delta=pm.content, - logprobs=self. - _create_stream_response_logprobs( - token_ids=output.token_ids, - logprobs=output.logprobs, - tokenizer=tokenizer, - top_logprobs=request.top_logprobs, - ) if - request.is_include_output_logprobs() - else [], - )) current_content_index += 1 first_delta_sent = True diff --git a/vllm/entrypoints/openai/tool_parsers/deepseekv3_tool_parser.py b/vllm/entrypoints/openai/tool_parsers/deepseekv3_tool_parser.py index ad46b5ede080..6980001b4949 100644 --- a/vllm/entrypoints/openai/tool_parsers/deepseekv3_tool_parser.py +++ b/vllm/entrypoints/openai/tool_parsers/deepseekv3_tool_parser.py @@ -126,7 +126,7 @@ def extract_tool_calls_streaming( previous_token_ids: Sequence[int], current_token_ids: Sequence[int], delta_token_ids: Sequence[int], - request: ChatCompletionRequest, + request: Union[ChatCompletionRequest, ResponsesRequest], ) -> Union[DeltaMessage, None]: logger.debug("delta_text: %s", delta_text) diff --git a/vllm/entrypoints/openai/tool_parsers/glm4_moe_tool_parser.py b/vllm/entrypoints/openai/tool_parsers/glm4_moe_tool_parser.py index 626fe48f5444..c458063243dd 100644 --- a/vllm/entrypoints/openai/tool_parsers/glm4_moe_tool_parser.py +++ b/vllm/entrypoints/openai/tool_parsers/glm4_moe_tool_parser.py @@ -136,7 +136,7 @@ def extract_tool_calls_streaming( previous_token_ids: Sequence[int], current_token_ids: Sequence[int], delta_token_ids: Sequence[int], - request: ChatCompletionRequest, + request: Union[ChatCompletionRequest, ResponsesRequest], ) -> Union[DeltaMessage, None]: self._buffer += delta_text cur_text = self._buffer diff --git a/vllm/entrypoints/openai/tool_parsers/granite_20b_fc_tool_parser.py b/vllm/entrypoints/openai/tool_parsers/granite_20b_fc_tool_parser.py index 71686cd58685..f4fa7043a455 100644 --- a/vllm/entrypoints/openai/tool_parsers/granite_20b_fc_tool_parser.py +++ b/vllm/entrypoints/openai/tool_parsers/granite_20b_fc_tool_parser.py @@ -114,7 +114,7 @@ def extract_tool_calls_streaming( previous_token_ids: Sequence[int], current_token_ids: Sequence[int], delta_token_ids: Sequence[int], - request: ChatCompletionRequest, + request: Union[ChatCompletionRequest, ResponsesRequest], ) -> Union[DeltaMessage, None]: if len(current_text) < len( diff --git a/vllm/entrypoints/openai/tool_parsers/granite_tool_parser.py b/vllm/entrypoints/openai/tool_parsers/granite_tool_parser.py index 7ca4f637059a..2c4034a45087 100644 --- a/vllm/entrypoints/openai/tool_parsers/granite_tool_parser.py +++ b/vllm/entrypoints/openai/tool_parsers/granite_tool_parser.py @@ -100,7 +100,7 @@ def extract_tool_calls_streaming( previous_token_ids: Sequence[int], current_token_ids: Sequence[int], delta_token_ids: Sequence[int], - request: ChatCompletionRequest, + request: Union[ChatCompletionRequest, ResponsesRequest], ) -> Union[DeltaMessage, None]: start_idx = consume_space(0, current_text) diff --git a/vllm/entrypoints/openai/tool_parsers/hermes_tool_parser.py b/vllm/entrypoints/openai/tool_parsers/hermes_tool_parser.py index 3626b96ea9ee..1c76470fd685 100644 --- a/vllm/entrypoints/openai/tool_parsers/hermes_tool_parser.py +++ b/vllm/entrypoints/openai/tool_parsers/hermes_tool_parser.py @@ -163,7 +163,7 @@ def extract_tool_calls_streaming( previous_token_ids: Sequence[int], current_token_ids: Sequence[int], delta_token_ids: Sequence[int], - request: ChatCompletionRequest, + request: Union[ChatCompletionRequest, ResponsesRequest], ) -> Union[DeltaMessage, None]: # 1. All tokens are parsed based on _text, not token_ids. # 2. All incoming text data is processed by the tool_call_delta_buffer diff --git a/vllm/entrypoints/openai/tool_parsers/hunyuan_a13b_tool_parser.py b/vllm/entrypoints/openai/tool_parsers/hunyuan_a13b_tool_parser.py index 2145e62d33c7..5e0f1d146470 100644 --- a/vllm/entrypoints/openai/tool_parsers/hunyuan_a13b_tool_parser.py +++ b/vllm/entrypoints/openai/tool_parsers/hunyuan_a13b_tool_parser.py @@ -164,7 +164,7 @@ def extract_tool_calls_streaming( previous_token_ids: Sequence[int], current_token_ids: Sequence[int], delta_token_ids: Sequence[int], - request: ChatCompletionRequest, + request: Union[ChatCompletionRequest, ResponsesRequest], ) -> Union[DeltaMessage, None]: """ Extract tool calls for streaming mode. diff --git a/vllm/entrypoints/openai/tool_parsers/internlm2_tool_parser.py b/vllm/entrypoints/openai/tool_parsers/internlm2_tool_parser.py index 6e7c80ea5758..e1ff737433de 100644 --- a/vllm/entrypoints/openai/tool_parsers/internlm2_tool_parser.py +++ b/vllm/entrypoints/openai/tool_parsers/internlm2_tool_parser.py @@ -59,7 +59,7 @@ def extract_tool_calls_streaming( previous_token_ids: Sequence[int], current_token_ids: Sequence[int], delta_token_ids: Sequence[int], - request: ChatCompletionRequest, + request: Union[ChatCompletionRequest, ResponsesRequest], ) -> Union[DeltaMessage, None]: if '<|action_start|>' not in current_text: self.position = len(current_text) diff --git a/vllm/entrypoints/openai/tool_parsers/jamba_tool_parser.py b/vllm/entrypoints/openai/tool_parsers/jamba_tool_parser.py index b0a1f6196648..f88f22109b66 100644 --- a/vllm/entrypoints/openai/tool_parsers/jamba_tool_parser.py +++ b/vllm/entrypoints/openai/tool_parsers/jamba_tool_parser.py @@ -131,7 +131,7 @@ def extract_tool_calls_streaming( previous_token_ids: Sequence[int], current_token_ids: Sequence[int], delta_token_ids: Sequence[int], - request: ChatCompletionRequest, + request: Union[ChatCompletionRequest, ResponsesRequest], ) -> Union[DeltaMessage, None]: # if the tool call token is not in the tokens generated so far, append diff --git a/vllm/entrypoints/openai/tool_parsers/kimi_k2_tool_parser.py b/vllm/entrypoints/openai/tool_parsers/kimi_k2_tool_parser.py index 4611364a236b..38adb3ffa54d 100644 --- a/vllm/entrypoints/openai/tool_parsers/kimi_k2_tool_parser.py +++ b/vllm/entrypoints/openai/tool_parsers/kimi_k2_tool_parser.py @@ -130,7 +130,7 @@ def extract_tool_calls_streaming( previous_token_ids: Sequence[int], current_token_ids: Sequence[int], delta_token_ids: Sequence[int], - request: ChatCompletionRequest, + request: Union[ChatCompletionRequest, ResponsesRequest], ) -> Union[DeltaMessage, None]: logger.debug("delta_text: %s", delta_text) diff --git a/vllm/entrypoints/openai/tool_parsers/llama4_pythonic_tool_parser.py b/vllm/entrypoints/openai/tool_parsers/llama4_pythonic_tool_parser.py index 784bbe213fea..0f887bd4933e 100644 --- a/vllm/entrypoints/openai/tool_parsers/llama4_pythonic_tool_parser.py +++ b/vllm/entrypoints/openai/tool_parsers/llama4_pythonic_tool_parser.py @@ -119,7 +119,7 @@ def extract_tool_calls_streaming( previous_token_ids: Sequence[int], current_token_ids: Sequence[int], delta_token_ids: Sequence[int], - request: ChatCompletionRequest, + request: Union[ChatCompletionRequest, ResponsesRequest], ) -> Union[DeltaMessage, None]: if not current_text.startswith("[") and not current_text.startswith( diff --git a/vllm/entrypoints/openai/tool_parsers/llama_tool_parser.py b/vllm/entrypoints/openai/tool_parsers/llama_tool_parser.py index 1e414437eb8b..8485f333146c 100644 --- a/vllm/entrypoints/openai/tool_parsers/llama_tool_parser.py +++ b/vllm/entrypoints/openai/tool_parsers/llama_tool_parser.py @@ -123,7 +123,7 @@ def extract_tool_calls_streaming( previous_token_ids: Sequence[int], current_token_ids: Sequence[int], delta_token_ids: Sequence[int], - request: ChatCompletionRequest, + request: Union[ChatCompletionRequest, ResponsesRequest], ) -> Union[DeltaMessage, None]: if not (current_text.startswith(self.bot_token) diff --git a/vllm/entrypoints/openai/tool_parsers/minimax_tool_parser.py b/vllm/entrypoints/openai/tool_parsers/minimax_tool_parser.py index 24cd1b18f417..00a99e660ec7 100644 --- a/vllm/entrypoints/openai/tool_parsers/minimax_tool_parser.py +++ b/vllm/entrypoints/openai/tool_parsers/minimax_tool_parser.py @@ -677,7 +677,7 @@ def extract_tool_calls_streaming( previous_token_ids: Sequence[int], current_token_ids: Sequence[int], delta_token_ids: Sequence[int], - request: ChatCompletionRequest, + request: Union[ChatCompletionRequest, ResponsesRequest], ) -> Union[DeltaMessage, None]: self._update_thinking_state(current_text) diff --git a/vllm/entrypoints/openai/tool_parsers/mistral_tool_parser.py b/vllm/entrypoints/openai/tool_parsers/mistral_tool_parser.py index 2ecf62499c0d..0514b84b10ee 100644 --- a/vllm/entrypoints/openai/tool_parsers/mistral_tool_parser.py +++ b/vllm/entrypoints/openai/tool_parsers/mistral_tool_parser.py @@ -186,7 +186,7 @@ def extract_tool_calls_streaming( previous_token_ids: Sequence[int], current_token_ids: Sequence[int], delta_token_ids: Sequence[int], - request: ChatCompletionRequest, + request: Union[ChatCompletionRequest, ResponsesRequest], ) -> Union[DeltaMessage, None]: # if the tool call token is not in the tokens generated so far, append diff --git a/vllm/entrypoints/openai/tool_parsers/phi4mini_tool_parser.py b/vllm/entrypoints/openai/tool_parsers/phi4mini_tool_parser.py index 0cfa9c7f6975..b15afa63fbd3 100644 --- a/vllm/entrypoints/openai/tool_parsers/phi4mini_tool_parser.py +++ b/vllm/entrypoints/openai/tool_parsers/phi4mini_tool_parser.py @@ -109,7 +109,7 @@ def extract_tool_calls_streaming( previous_token_ids: Sequence[int], current_token_ids: Sequence[int], delta_token_ids: Sequence[int], - request: ChatCompletionRequest, + request: Union[ChatCompletionRequest, ResponsesRequest], ) -> Optional[DeltaMessage]: return None diff --git a/vllm/entrypoints/openai/tool_parsers/pythonic_tool_parser.py b/vllm/entrypoints/openai/tool_parsers/pythonic_tool_parser.py index 6dc2e96ee1f6..2b71980d6e77 100644 --- a/vllm/entrypoints/openai/tool_parsers/pythonic_tool_parser.py +++ b/vllm/entrypoints/openai/tool_parsers/pythonic_tool_parser.py @@ -115,7 +115,7 @@ def extract_tool_calls_streaming( previous_token_ids: Sequence[int], current_token_ids: Sequence[int], delta_token_ids: Sequence[int], - request: ChatCompletionRequest, + request: Union[ChatCompletionRequest, ResponsesRequest], ) -> Union[DeltaMessage, None]: if not current_text.startswith("["): diff --git a/vllm/entrypoints/openai/tool_parsers/qwen3coder_tool_parser.py b/vllm/entrypoints/openai/tool_parsers/qwen3coder_tool_parser.py index ac5fa54253d4..06a3d088f999 100644 --- a/vllm/entrypoints/openai/tool_parsers/qwen3coder_tool_parser.py +++ b/vllm/entrypoints/openai/tool_parsers/qwen3coder_tool_parser.py @@ -314,7 +314,7 @@ def extract_tool_calls_streaming( previous_token_ids: Sequence[int], current_token_ids: Sequence[int], delta_token_ids: Sequence[int], - request: ChatCompletionRequest, + request: Union[ChatCompletionRequest, ResponsesRequest], ) -> Union[DeltaMessage, None]: # Store request for type conversion if not previous_text: diff --git a/vllm/entrypoints/openai/tool_parsers/step3_tool_parser.py b/vllm/entrypoints/openai/tool_parsers/step3_tool_parser.py index e073ef30dc0f..e9676d71b291 100644 --- a/vllm/entrypoints/openai/tool_parsers/step3_tool_parser.py +++ b/vllm/entrypoints/openai/tool_parsers/step3_tool_parser.py @@ -115,7 +115,7 @@ def extract_tool_calls_streaming( previous_token_ids: Sequence[int], current_token_ids: Sequence[int], delta_token_ids: Sequence[int], - request: ChatCompletionRequest, + request: Union[ChatCompletionRequest, ResponsesRequest], ) -> Union[DeltaMessage, None]: # The main loop processes the stream from the last known position. diff --git a/vllm/entrypoints/openai/tool_parsers/xlam_tool_parser.py b/vllm/entrypoints/openai/tool_parsers/xlam_tool_parser.py index 08d7e0cb891e..7a6fe90f908f 100644 --- a/vllm/entrypoints/openai/tool_parsers/xlam_tool_parser.py +++ b/vllm/entrypoints/openai/tool_parsers/xlam_tool_parser.py @@ -187,7 +187,7 @@ def extract_tool_calls_streaming( previous_token_ids: Sequence[int], current_token_ids: Sequence[int], delta_token_ids: Sequence[int], - request: ChatCompletionRequest, + request: Union[ChatCompletionRequest, ResponsesRequest], ) -> Union[DeltaMessage, None]: """ Extract tool calls for streaming mode. From 09b62a64a0c0d87d6fb241529c239c640fe8dbe4 Mon Sep 17 00:00:00 2001 From: madskildegaard Date: Fri, 26 Sep 2025 10:06:35 +0200 Subject: [PATCH 3/4] added expected arguments test Signed-off-by: madskildegaard --- .../openai/responses/test_function_call.py | 104 ++++++++++++ vllm/entrypoints/openai/serving_responses.py | 156 ++++++++++++++++-- 2 files changed, 246 insertions(+), 14 deletions(-) diff --git a/tests/v1/entrypoints/openai/responses/test_function_call.py b/tests/v1/entrypoints/openai/responses/test_function_call.py index 9cea8c56d489..0536adfe5e26 100644 --- a/tests/v1/entrypoints/openai/responses/test_function_call.py +++ b/tests/v1/entrypoints/openai/responses/test_function_call.py @@ -194,3 +194,107 @@ def get_weather(latitude: float, longitude: float) -> str: input=input_messages) # check the output assert len(response_2.output_text) > 0 + + +@pytest.mark.asyncio +@pytest.mark.parametrize("model_name", [MODEL_NAME]) +async def test_function_calling_with_streaming_expected_arguments( + client: openai.AsyncOpenAI, model_name: str): + + tools = [{ + "type": "function", + "name": "get_weather", + "description": + "Get current temperature for provided location in celsius.", + "parameters": { + "type": "object", + "properties": { + "location": { + "type": "string" + }, + }, + "required": ["location"], + "additionalProperties": False + }, + "strict": True + }] + + stream_response = await client.responses.create( + model=model_name, + input="Can you tell me what the current weather is in Berlin?", + tools=tools, + stream=True, + ) + + tool_call_item = None + async for event in stream_response: + if (event.type == 'response.output_item.added' + and event.item.type == "function_call"): + tool_call_item = event.item + elif (event.type == 'response.function_call_arguments.delta' + and tool_call_item): + tool_call_item.arguments += event.delta + + assert tool_call_item is not None + assert tool_call_item.type == "function_call" + assert tool_call_item.name == "get_weather" + args = json.loads(tool_call_item.arguments) + assert "location" in args + assert args["location"] is not None + + +@pytest.mark.asyncio +@pytest.mark.parametrize("model_name", [MODEL_NAME]) +async def test_function_calling_with_streaming_types( + client: openai.AsyncOpenAI, model_name: str): + + # this links the "done" type with the "start" type + # so every "done" type should have a corresponding "start" type + # and every open block should be closed by the end of the stream + pairs_of_event_types = { + "response.completed": + "response.created", + "response.output_item.done": + "response.output_item.added", + "response.output_text.done": + "response.output_text.delta", + "response.content_part.done": + "response.content_part.added", + "response.reasoning_text.done": + "response.reasoning_text.delta", + "response.reasoning_part.done": + "response.reasoning_part.added", + "response.function_call_arguments.done": + "response.function_call_arguments.delta", # noqa + } + + input_list = [{ + "role": "user", + "content": + "Can you tell me what the current weather is in Berlin and the "\ + "forecast for the next 5 days, in fahrenheit?" + }] + stream_response = await client.responses.create( + model=model_name, + input=input_list, + tools=tools, + stream=True, + ) + + stack_of_event_types = [] + async for event in stream_response: + if event.type == 'response.created': + stack_of_event_types.append(event.type) + elif event.type == 'response.completed': + assert stack_of_event_types[-1] == pairs_of_event_types[event.type] + stack_of_event_types.pop() + if event.type.endswith("added"): + stack_of_event_types.append(event.type) + elif event.type.endswith("delta"): + if stack_of_event_types[-1] == event.type: + continue + stack_of_event_types.append(event.type) + elif event.type.endswith("done"): + assert stack_of_event_types[-1] == pairs_of_event_types[event.type] + stack_of_event_types.pop() + assert len(stack_of_event_types) == 0 diff --git a/vllm/entrypoints/openai/serving_responses.py b/vllm/entrypoints/openai/serving_responses.py index d0f012765d5f..f44f1cd9c2a6 100644 --- a/vllm/entrypoints/openai/serving_responses.py +++ b/vllm/entrypoints/openai/serving_responses.py @@ -1048,7 +1048,8 @@ async def _process_simple_streaming_events( delta_token_ids=output.token_ids, request=request, ) - if reasoning_parser: + if (reasoning_parser and delta_message + and not delta_message.tool_calls): delta_message = \ reasoning_parser.extract_reasoning_content_streaming( previous_text=previous_text, @@ -1066,8 +1067,6 @@ async def _process_simple_streaming_events( if not first_delta_sent: current_item_id = str(uuid.uuid4()) if delta_message.tool_calls: - # remove previous delta messages - previous_delta_messages = [] current_tool_call_id = f"call_{random_uuid()}" current_tool_call_name = delta_message.tool_calls[ 0].function.name @@ -1119,6 +1118,7 @@ async def _process_simple_streaming_events( status="in_progress", ), )) + # skip content_part for tool calls if not delta_message.tool_calls: yield _send_event( openai_responses_types. @@ -1194,6 +1194,38 @@ async def _process_simple_streaming_events( status="in_progress", ), )) + elif delta_message.content: + # back to normal message output + yield _send_event( + openai_responses_types. + ResponseOutputItemAddedEvent( + type="response.output_item.added", + sequence_number=-1, + output_index=current_output_index, + item=openai_responses_types. + ResponseOutputMessage( + id=current_item_id, + type="message", + role="assistant", + content=[], + status="in_progress", + ), + )) + yield _send_event( + openai_responses_types. + ResponseContentPartAddedEvent( + type="response.content_part.added", + sequence_number=-1, + output_index=current_output_index, + item_id=current_item_id, + content_index=current_content_index, + part=openai_responses_types.ResponseOutputText( + type="output_text", + text="", + annotations=[], + logprobs=[], + ), + )) current_content_index += 1 previous_delta_messages = [] @@ -1216,6 +1248,20 @@ async def _process_simple_streaming_events( content_index=current_content_index, text=reason_content, )) + yield _send_event( + openai_responses_types.ResponseContentPartDoneEvent( + type="response.content_part.done", + sequence_number=-1, + item_id=current_item_id, + output_index=current_output_index, + content_index=current_content_index, + part=openai_responses_types.ResponseOutputText( + type="output_text", + text=reason_content, + annotations=[], + logprobs=[], + ), + )) current_content_index = 0 reasoning_item = ResponseReasoningItem( type="reasoning", @@ -1236,6 +1282,8 @@ async def _process_simple_streaming_events( output_index=current_output_index, item=reasoning_item, )) + current_output_index += 1 + current_item_id = str(uuid.uuid4()) yield _send_event( openai_responses_types.ResponseOutputItemAddedEvent( type="response.output_item.added", @@ -1249,8 +1297,6 @@ async def _process_simple_streaming_events( status="in_progress", ), )) - current_output_index += 1 - current_item_id = str(uuid.uuid4()) yield _send_event( openai_responses_types.ResponseContentPartAddedEvent( type="response.content_part.added", @@ -1270,15 +1316,83 @@ async def _process_simple_streaming_events( previous_delta_messages = [] if delta_message.tool_calls: - yield _send_event( - ResponseFunctionCallArgumentsDeltaEvent( - type="response.function_call_arguments.delta", - sequence_number=-1, - output_index=current_output_index, - item_id=current_item_id, - delta=delta_message.tool_calls[0].function. - arguments, - )) + if delta_message.tool_calls[0].function.arguments: + yield _send_event( + ResponseFunctionCallArgumentsDeltaEvent( + type="response.function_call_arguments.delta", + sequence_number=-1, + output_index=current_output_index, + item_id=current_item_id, + delta=delta_message.tool_calls[0].function. + arguments, + )) + # tool call initiated with no arguments + # send done with current content part + # and add new function call item + elif delta_message.tool_calls[0].function.name: + yield _send_event( + openai_responses_types.ResponseTextDoneEvent( + type="response.output_text.done", + sequence_number=-1, + output_index=current_output_index, + content_index=current_content_index, + text="", + logprobs=[], + item_id=current_item_id, + )) + yield _send_event( + openai_responses_types. + ResponseContentPartDoneEvent( + type="response.content_part.done", + sequence_number=-1, + item_id=current_item_id, + output_index=current_output_index, + content_index=current_content_index, + part=openai_responses_types.ResponseOutputText( + type="output_text", + text="", + annotations=[], + logprobs=[], + ), + )) + yield _send_event( + ResponseOutputItemDoneEvent( + type="response.output_item.done", + sequence_number=-1, + output_index=current_output_index, + item=openai_responses_types. + ResponseOutputMessage( + id=current_item_id, + type="message", + role="assistant", + content=[], + status="completed", + ), + )) + current_output_index += 1 + current_item_id = str(uuid.uuid4()) + current_tool_call_id = f"call_{random_uuid()}" + current_tool_call_name = delta_message.tool_calls[ + 0].function.name + yield _send_event( + openai_responses_types. + ResponseOutputItemAddedEvent( + type="response.output_item.added", + sequence_number=-1, + output_index=current_output_index, + item=openai_responses_types. + ResponseFunctionToolCallItem( + type="function_call", + id=current_item_id, + call_id=current_tool_call_id, + name=current_tool_call_name, + arguments="", + status="in_progress", + ), + )) + # skip content part for tool call + current_content_index = 1 + continue elif delta_message.reasoning_content is not None: yield _send_event( ResponseReasoningTextDeltaEvent( @@ -1354,6 +1468,20 @@ async def _process_simple_streaming_events( content_index=current_content_index, text=reason_content, )) + yield _send_event( + openai_responses_types.ResponseContentPartDoneEvent( + type="response.content_part.done", + sequence_number=-1, + item_id=current_item_id, + output_index=current_output_index, + content_index=current_content_index, + part=openai_responses_types.ResponseOutputText( + type="output_text", + text=reason_content, + annotations=[], + logprobs=[], + ), + )) current_content_index += 1 reasoning_item = ResponseReasoningItem( type="reasoning", From 410433b882765129621cbfa33e9440aa1021a08b Mon Sep 17 00:00:00 2001 From: madskildegaard Date: Fri, 26 Sep 2025 11:32:35 +0200 Subject: [PATCH 4/4] cleanup and added some comments Signed-off-by: madskildegaard --- vllm/entrypoints/openai/serving_responses.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/vllm/entrypoints/openai/serving_responses.py b/vllm/entrypoints/openai/serving_responses.py index f44f1cd9c2a6..be1d61e59249 100644 --- a/vllm/entrypoints/openai/serving_responses.py +++ b/vllm/entrypoints/openai/serving_responses.py @@ -1138,6 +1138,8 @@ async def _process_simple_streaming_events( current_content_index += 1 first_delta_sent = True + # tool call is done when current delta message has + # content or name is defined on tool_calls if previous_delta_messages and previous_delta_messages[ -1].tool_calls and ( delta_message.content is not None or @@ -1170,12 +1172,13 @@ async def _process_simple_streaming_events( output_index=current_output_index, item=function_call_item, )) - current_tool_call_name = "" - current_tool_call_id = f"call_{random_uuid()}" current_item_id = str(uuid.uuid4()) current_output_index += 1 if delta_message.tool_calls: # new tool call output started + current_tool_call_name = delta_message.tool_calls[ + 0].function.name + current_tool_call_id = f"call_{random_uuid()}" yield _send_event( openai_responses_types. ResponseOutputItemAddedEvent( @@ -1187,8 +1190,7 @@ async def _process_simple_streaming_events( type="function_call", id=current_item_id, call_id=current_tool_call_id, - name=delta_message.tool_calls[0].function. - name, + name=current_tool_call_name, arguments=delta_message.tool_calls[0]. function.arguments, status="in_progress", @@ -1327,9 +1329,9 @@ async def _process_simple_streaming_events( arguments, )) # tool call initiated with no arguments - # send done with current content part - # and add new function call item elif delta_message.tool_calls[0].function.name: + # send done with current content part + # and add new function call item yield _send_event( openai_responses_types.ResponseTextDoneEvent( type="response.output_text.done", @@ -1371,9 +1373,9 @@ async def _process_simple_streaming_events( )) current_output_index += 1 current_item_id = str(uuid.uuid4()) - current_tool_call_id = f"call_{random_uuid()}" current_tool_call_name = delta_message.tool_calls[ 0].function.name + current_tool_call_id = f"call_{random_uuid()}" yield _send_event( openai_responses_types. ResponseOutputItemAddedEvent(