[Frontend] Add MCP tool streaming support to Responses API#31761
[Frontend] Add MCP tool streaming support to Responses API#31761chaunceyjiang merged 1 commit intovllm-project:mainfrom
Conversation
There was a problem hiding this comment.
Code Review
This pull request adds streaming support for MCP tools to the Responses API, which is a significant feature enhancement. The changes include extending the Harmony utilities and refactoring the response serving infrastructure to handle tool streaming events. Several new tests have been added to cover MCP tool streaming, multi-turn conversations, and to ensure event types are correctly dispatched.
The refactoring of the streaming event logic in serving_responses.py into smaller, more manageable functions is a good improvement for maintainability. However, I've identified a few critical logical errors in the new event dispatching code that could lead to incorrect behavior, such as dead code paths and duplicate event emissions for certain tool types. I've provided suggestions to fix these issues. I also found a minor issue in one of the new tests that could make it brittle.
Overall, this is a great contribution, but the identified issues should be addressed to ensure the new streaming logic is robust and correct.
| elif ( | ||
| ctx.parser.current_channel == "commentary" | ||
| or ctx.parser.current_channel == "analysis" | ||
| ) and ctx.parser.current_recipient is not None: | ||
| recipient = ctx.parser.current_recipient | ||
| # Check for function calls first - they have their own event handling | ||
| if recipient.startswith("functions."): | ||
| return self._emit_function_call_delta_events(ctx, state) | ||
| is_mcp_tool = self._is_mcp_tool_by_namespace(recipient) | ||
| if is_mcp_tool: | ||
| return self._emit_mcp_tool_delta_events(ctx, state, recipient) | ||
| else: | ||
| return self._emit_code_interpreter_delta_events(ctx, state) | ||
| elif ( | ||
| ( | ||
| ctx.parser.current_channel == "commentary" | ||
| or ctx.parser.current_channel == "analysis" | ||
| ) | ||
| and ctx.parser.current_recipient is not None | ||
| and ctx.parser.current_recipient.startswith("mcp.") | ||
| ): | ||
| return self._emit_mcp_prefix_delta_events(ctx, state) | ||
|
|
There was a problem hiding this comment.
The logic for dispatching delta events has unreachable code. The elif block checking for ctx.parser.current_recipient.startswith("mcp.") (lines 2042-2050) will never be reached because the preceding elif block (lines 2029-2041) already handles all cases where ctx.parser.current_recipient is not None.
Specifically, if recipient starts with "mcp.", _is_mcp_tool_by_namespace(recipient) will return True, causing _emit_mcp_tool_delta_events to be called and the function to return, thus never reaching the intended _emit_mcp_prefix_delta_events. Additionally, the else branch calling _emit_code_interpreter_delta_events is also unreachable because _is_mcp_tool_by_namespace will always be true if the recipient.startswith("functions.") check fails.
The logic should be restructured to correctly prioritize the "mcp." prefix and remove the dead code.
elif (
ctx.parser.current_channel == "commentary"
or ctx.parser.current_channel == "analysis"
) and ctx.parser.current_recipient is not None:
recipient = ctx.parser.current_recipient
# Check for function calls first
if recipient.startswith("functions."):
return self._emit_function_call_delta_events(ctx, state)
# Handle mcp. prefixed tools
if recipient.startswith("mcp."):
return self._emit_mcp_prefix_delta_events(ctx, state)
# Handle other MCP-style tools (including python/code_interpreter)
is_mcp_tool = self._is_mcp_tool_by_namespace(recipient)
if is_mcp_tool:
return self._emit_mcp_tool_delta_events(ctx, state, recipient)| if ( | ||
| self.tool_server is not None | ||
| and previous_item.recipient is not None | ||
| and state.current_item_id is not None | ||
| and state.sent_output_item_added | ||
| ): | ||
| recipient = previous_item.recipient | ||
| # Handle MCP tool completion | ||
| is_mcp_tool = self._is_mcp_tool_by_namespace( | ||
| recipient | ||
| ) and state.current_item_id.startswith("mcp_") | ||
| if is_mcp_tool: | ||
| events.extend( | ||
| self._emit_mcp_tool_completion_events(previous_item, state) | ||
| ) | ||
| else: | ||
| events.extend( | ||
| self._emit_code_interpreter_completion_events(previous_item, state) | ||
| ) | ||
|
|
||
| # Handle MCP prefix tool completion | ||
| if previous_item.recipient is not None and previous_item.recipient.startswith( | ||
| "mcp." | ||
| ): | ||
| events.extend(self._emit_mcp_prefix_completion_events(previous_item, state)) | ||
|
|
||
| return events |
There was a problem hiding this comment.
There is a logical flaw in how tool completion events are emitted. The code has two separate if blocks for handling tool completions. One for general MCP/code-interpreter tools, and another specifically for tools with an mcp. prefix. If a tool recipient starts with mcp., it will match the conditions for both blocks, leading to _emit_mcp_tool_completion_events and _emit_mcp_prefix_completion_events both being called. This will result in duplicate or conflicting events being sent to the client.
The logic should be restructured into a single if/elif/else chain to ensure that only one completion event handler is called for any given tool.
# Handle tool completion
if (
self.tool_server is not None
and previous_item.recipient is not None
and state.current_item_id is not None
and state.sent_output_item_added
):
recipient = previous_item.recipient
# Handle MCP prefix tool completion first
if recipient.startswith("mcp."):
events.extend(self._emit_mcp_prefix_completion_events(previous_item, state))
else:
# Handle other MCP tool and code interpreter completion
is_mcp_tool = self._is_mcp_tool_by_namespace(
recipient
) and state.current_item_id.startswith("mcp_")
if is_mcp_tool:
events.extend(
self._emit_mcp_tool_completion_events(previous_item, state)
)
else:
events.extend(
self._emit_code_interpreter_completion_events(previous_item, state)
)
return events| if ( | ||
| event.type.endswith("added") | ||
| or event.type == "response.mcp_call.in_progress" | ||
| ): | ||
| stack_of_event_types.append(event.type) |
There was a problem hiding this comment.
The if statement at line 252 should be an elif. The current structure with two separate if/elif chains is confusing and less robust. A single if/elif chain for event type dispatching would make the logic clearer and prevent potential bugs if new event types are added in the future.
| if ( | |
| event.type.endswith("added") | |
| or event.type == "response.mcp_call.in_progress" | |
| ): | |
| stack_of_event_types.append(event.type) | |
| elif ( | |
| event.type.endswith("added") | |
| or event.type == "response.mcp_call.in_progress" | |
| ): | |
| stack_of_event_types.append(event.type) |
|
/cc @qandrew PTAL. |
|
Hi @daniel-salib, the pre-commit checks have failed. Please run: uv pip install pre-commit
pre-commit install
pre-commit run --all-filesThen, commit the changes and push to your branch. For future commits, Tip Is
|
c8a55fb to
a2538dd
Compare
|
|
a2538dd to
ee69de3
Compare
|
@chaunceyjiang could you "mark as ready" first? Daniel is taking over again from my work in #30301 and there might be some UT issues before I review |
|
ok |
1993cf3 to
99d93b5
Compare
|
Hi @daniel-salib, the pre-commit checks have failed. Please run: uv pip install pre-commit
pre-commit install
pre-commit run --all-filesThen, commit the changes and push to your branch. For future commits, Tip Is
|
99d93b5 to
3b104e4
Compare
|
Hi @daniel-salib, the pre-commit checks have failed. Please run: uv pip install pre-commit
pre-commit install
pre-commit run --all-filesThen, commit the changes and push to your branch. For future commits, Tip Is
|
afa5a58 to
7c2c258
Compare
|
Hi @daniel-salib, the pre-commit checks have failed. Please run: uv pip install pre-commit
pre-commit install
pre-commit run --all-filesThen, commit the changes and push to your branch. For future commits, Tip Is
|
7c2c258 to
58d0103
Compare
|
Hi @daniel-salib, the pre-commit checks have failed. Please run: uv pip install pre-commit
pre-commit install
pre-commit run --all-filesThen, commit the changes and push to your branch. For future commits, Tip Is
|
d41d2d4 to
7d76b3a
Compare
3966e8b to
0ca0d27
Compare
Signed-off-by: Daniel Salib <danielsalib@meta.com>
0ca0d27 to
4f1b8f9
Compare
| for event in self._emit_previous_item_done_events( | ||
| previous_item, state | ||
| ): | ||
| yield _increment_sequence_number_and_return(event) |
There was a problem hiding this comment.
thanks for the refactor, this looks a lot more clean. Ideally we may want to move all the harmony streaming to another file, but i think it's fine for now.
chaunceyjiang
left a comment
There was a problem hiding this comment.
Thanks~ @qandrew @daniel-salib
|
https://buildkite.com/vllm/ci/builds/46594#019baee4-4801-4332-98f5-e9fcafc09c6f @daniel-salib PTAL. |
thanks for flagging - I think the test is flaky because the math question is too simple to always trigger the mcp tool call with the python tool - I find a slightly larger math problem like "123 *. 456" triggers it consistently. Opened a new PR with a fix: https://github.com/vllm-project/vllm/pull/32153/files |
…ect#31761) Signed-off-by: Daniel Salib <danielsalib@meta.com>
…ect#31761) Signed-off-by: Daniel Salib <danielsalib@meta.com> Signed-off-by: dsuhinin <suhinin.dmitriy@gmail.com>
…ect#31761) Signed-off-by: Daniel Salib <danielsalib@meta.com>
Purpose
This change enables streaming support for MCP tools when using GPT OSS. It extends the harmony utilities and response serving infrastructure to handle tool streaming, allowing tool calls and their results to be incrementally streamed back to clients rather than returned as a single batch.
This PR is taken over from #30301
Test Plan
Test Result