Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 46 additions & 11 deletions python/packages/ag-ui/agent_framework_ag_ui/_message_adapters.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,32 @@ def _sanitize_tool_history(messages: list[ChatMessage]) -> list[ChatMessage]:
confirm_changes_call = content
break

sanitized.append(msg)
# Filter out confirm_changes from assistant messages before sending to LLM.
# confirm_changes is a synthetic tool for the approval UI flow - the LLM shouldn't
# see it because it may contain stale function_arguments that confuse the model
# (e.g., showing 5 steps when only 2 were approved).
# When we filter out confirm_changes, we also remove it from tool_ids and don't
# set pending_confirm_changes_id, so no synthetic result is injected for it.
# This is required because OpenAI validates that every tool result has a matching
# tool call in the previous assistant message.
if confirm_changes_call:
filtered_contents = [
c for c in (msg.contents or []) if not (c.type == "function_call" and c.name == "confirm_changes")
]
if filtered_contents:
# Update the existing message without confirm_changes, preserving metadata
msg.contents = filtered_contents
sanitized.append(msg)
# If no contents left after filtering, don't append anything

# Remove confirm_changes from tool_ids since we filtered it from the message
if confirm_changes_call.call_id:
tool_ids.discard(str(confirm_changes_call.call_id))
# Don't set pending_confirm_changes_id - we don't want a synthetic result
confirm_changes_call = None
else:
sanitized.append(msg)

pending_tool_call_ids = tool_ids if tool_ids else None
pending_confirm_changes_id = (
str(confirm_changes_call.call_id) if confirm_changes_call and confirm_changes_call.call_id else None
Expand All @@ -67,7 +92,7 @@ def _sanitize_tool_history(messages: list[ChatMessage]) -> list[ChatMessage]:
if approval_call_ids and pending_tool_call_ids:
pending_tool_call_ids -= approval_call_ids
logger.info(
f"FunctionApprovalResponseContent found for call_ids={sorted(approval_call_ids)} - "
f"function_approval_response content found for call_ids={sorted(approval_call_ids)} - "
"framework will handle execution"
)

Expand Down Expand Up @@ -150,6 +175,10 @@ def _sanitize_tool_history(messages: list[ChatMessage]) -> list[ChatMessage]:
call_id = str(content.call_id)
if call_id in pending_tool_call_ids:
keep = True
# Remove the call_id from pending since we now have its result.
# This prevents duplicate synthetic "skipped" results from being
# injected when a user message arrives later.
pending_tool_call_ids.discard(call_id)
if call_id == pending_confirm_changes_id:
pending_confirm_changes_id = None
break
Expand Down Expand Up @@ -338,7 +367,7 @@ def _filter_modified_args(
result: list[ChatMessage] = []
for msg in messages:
# Handle standard tool result messages early (role="tool") to preserve provider invariants
# This path maps AG‑UI tool messages to FunctionResultContent with the correct tool_call_id
# This path maps AG‑UI tool messages to function_result content with the correct tool_call_id
role_str = normalize_agui_role(msg.get("role", "user"))
if role_str == "tool":
# Prefer explicit tool_call_id fields; fall back to backend fields only if necessary
Expand Down Expand Up @@ -371,7 +400,7 @@ def _filter_modified_args(

if is_approval:
# Look for the matching function call in previous messages to create
# a proper FunctionApprovalResponseContent. This enables the agent framework
# proper function_approval_response content. This enables the agent framework
# to execute the approved tool (fix for GitHub issue #3034).
accepted = parsed.get("accepted", False) if parsed is not None else False
approval_payload_text = result_content if isinstance(result_content, str) else json.dumps(parsed)
Expand Down Expand Up @@ -448,11 +477,17 @@ def _filter_modified_args(
merged_args["steps"] = merged_steps
state_args = merged_args

# Keep the original tool call and AG-UI snapshot in sync with approved args.
updated_args = (
json.dumps(merged_args) if isinstance(matching_func_call.arguments, str) else merged_args
# Update the ChatMessage tool call with only enabled steps (for LLM context).
# The LLM should only see the steps that were actually approved/executed.
updated_args_for_llm = (
json.dumps(filtered_args)
if isinstance(matching_func_call.arguments, str)
else filtered_args
)
matching_func_call.arguments = updated_args
matching_func_call.arguments = updated_args_for_llm

# Update raw messages with all steps + status (for MESSAGES_SNAPSHOT display).
# This allows the UI to show which steps were enabled/disabled.
_update_tool_call_arguments(messages, str(approval_call_id), merged_args)
# Create a new FunctionCallContent with the modified arguments
func_call_for_approval = Content.from_function_call(
Expand All @@ -465,7 +500,7 @@ def _filter_modified_args(
# No modified arguments - use the original function call
func_call_for_approval = matching_func_call

# Create FunctionApprovalResponseContent for the agent framework
# Create function_approval_response content for the agent framework
approval_response = Content.from_function_approval_response(
approved=accepted,
id=str(approval_call_id),
Expand All @@ -489,7 +524,7 @@ def _filter_modified_args(
result.append(chat_msg)
continue

# Cast result_content to acceptable type for FunctionResultContent
# Cast result_content to acceptable type for function_result content
func_result: str | dict[str, Any] | list[Any]
if isinstance(result_content, str):
func_result = result_content
Expand Down Expand Up @@ -566,7 +601,7 @@ def _filter_modified_args(

# Check if this message contains function approvals
if "function_approvals" in msg and msg["function_approvals"]:
# Convert function approvals to FunctionApprovalResponseContent
# Convert function approvals to function_approval_response content
approval_contents: list[Any] = []
for approval in msg["function_approvals"]:
# Create FunctionCallContent with the modified arguments
Expand Down
147 changes: 135 additions & 12 deletions python/packages/ag-ui/agent_framework_ag_ui/_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
convert_agui_tools_to_agent_framework,
generate_event_id,
get_conversation_id_from_update,
get_role_value,
make_json_safe,
)

Expand Down Expand Up @@ -344,7 +345,7 @@ def _emit_tool_result(
flow: FlowState,
predictive_handler: PredictiveStateHandler | None = None,
) -> list[BaseEvent]:
"""Emit ToolCallResult events for FunctionResultContent."""
"""Emit ToolCallResult events for function_result content."""
events: list[BaseEvent] = []

# Cannot emit tool result without a call_id to associate it with
Expand Down Expand Up @@ -385,6 +386,13 @@ def _emit_tool_result(
# After tool result, any subsequent text should start a new message
flow.tool_call_id = None
flow.tool_call_name = None

# Close any open text message before resetting message_id (issue #3568)
# This handles the case where a TextMessageStartEvent was emitted for tool-only
# messages (Feature #4) but needs to be closed before starting a new message
if flow.message_id:
logger.debug("Closing text message (issue #3568 fix): message_id=%s", flow.message_id)
events.append(TextMessageEndEvent(message_id=flow.message_id))
Comment thread
moonbox3 marked this conversation as resolved.
flow.message_id = None # Reset so next text content starts a new message

return events
Expand Down Expand Up @@ -454,9 +462,21 @@ def _emit_approval_request(
"function_arguments": make_json_safe(func_call.parse_arguments()) or {},
"steps": [{"description": f"Execute {func_name}", "status": "enabled"}],
}
events.append(ToolCallArgsEvent(tool_call_id=confirm_id, delta=json.dumps(args)))
args_json = json.dumps(args)
events.append(ToolCallArgsEvent(tool_call_id=confirm_id, delta=args_json))
events.append(ToolCallEndEvent(tool_call_id=confirm_id))

# Track confirm_changes in pending_tool_calls for MessagesSnapshotEvent
# The frontend needs to see this in the snapshot to render the confirmation dialog
confirm_entry = {
"id": confirm_id,
"type": "function",
"function": {"name": "confirm_changes", "arguments": args_json},
}
flow.pending_tool_calls.append(confirm_entry)
flow.tool_calls_by_id[confirm_id] = confirm_entry
flow.tool_calls_ended.add(confirm_id) # Mark as ended since we emit End event

flow.waiting_for_approval = True
return events

Expand Down Expand Up @@ -558,8 +578,8 @@ async def _resolve_approval_responses(
) -> None:
"""Execute approved function calls and replace approval content with results.

This modifies the messages list in place, replacing FunctionApprovalResponseContent
with FunctionResultContent containing the actual tool execution result.
This modifies the messages list in place, replacing function_approval_response
content with function_result content containing the actual tool execution result.

Args:
messages: List of messages (will be modified in place)
Expand Down Expand Up @@ -622,6 +642,76 @@ async def _resolve_approval_responses(

_replace_approval_contents_with_results(messages, fcc_todo, normalized_results) # type: ignore

# Post-process: Convert user messages with function_result content to proper tool messages.
# After _replace_approval_contents_with_results, approved tool calls have their results
# placed in user messages. OpenAI requires tool results to be in role="tool" messages.
# This transformation ensures the message history is valid for the LLM provider.
_convert_approval_results_to_tool_messages(messages)
Comment thread
moonbox3 marked this conversation as resolved.


def _convert_approval_results_to_tool_messages(messages: list[Any]) -> None:
"""Convert function_result content in user messages to proper tool messages.

After approval processing, tool results end up in user messages. OpenAI and other
providers require tool results to be in role="tool" messages. This function
extracts function_result content from user messages and creates proper tool messages.

This modifies the messages list in place.

Args:
messages: List of ChatMessage objects to process
"""
i = 0
while i < len(messages):
msg = messages[i]
role_value = get_role_value(msg)

if role_value != "user":
i += 1
continue

# Check if this user message has function_result content
function_results: list[Content] = []
other_contents: list[Any] = []

for content in msg.contents or []:
if getattr(content, "type", None) == "function_result":
function_results.append(content)
else:
other_contents.append(content)

if not function_results:
i += 1
continue

# We have function results in a user message - need to fix this
logger.info(
f"Converting {len(function_results)} function_result content(s) from user message to tool message(s)"
)

# Create tool messages for each function result
new_tool_messages = []
for func_result in function_results:
tool_msg = ChatMessage(
role="tool",
contents=[func_result],
)
new_tool_messages.append(tool_msg)

if other_contents:
# Keep the user message with remaining contents
msg.contents = other_contents
# Insert tool messages after this user message
for j, tool_msg in enumerate(new_tool_messages):
messages.insert(i + 1 + j, tool_msg)
i += 1 + len(new_tool_messages)
else:
# No other contents - replace user message with tool messages
messages.pop(i)
for j, tool_msg in enumerate(new_tool_messages):
messages.insert(i + j, tool_msg)
i += len(new_tool_messages)


def _build_messages_snapshot(
flow: FlowState,
Expand All @@ -630,25 +720,29 @@ def _build_messages_snapshot(
"""Build MessagesSnapshotEvent from current flow state."""
all_messages = list(snapshot_messages)

# Add assistant message with tool calls
# Add assistant message with tool calls only (no content)
if flow.pending_tool_calls:
tool_call_message = {
"id": flow.message_id or generate_event_id(),
"role": "assistant",
"tool_calls": flow.pending_tool_calls.copy(),
}
if flow.accumulated_text:
tool_call_message["content"] = flow.accumulated_text
all_messages.append(tool_call_message)

# Add tool results
all_messages.extend(flow.tool_results)

# Add text-only assistant message if no tool calls
if flow.accumulated_text and not flow.pending_tool_calls:
# Add text-only assistant message if there is accumulated text
# This is a separate message from the tool calls message to maintain
# the expected AG-UI protocol format (see issue #3619)
if flow.accumulated_text:
# Use a new ID for the content message if we had tool calls (separate message)
content_message_id = (
generate_event_id() if flow.pending_tool_calls else (flow.message_id or generate_event_id())
)
all_messages.append(
{
"id": flow.message_id or generate_event_id(),
"id": content_message_id,
"role": "assistant",
"content": flow.accumulated_text,
}
Expand Down Expand Up @@ -827,6 +921,8 @@ async def run_agent_stream(

# Emit events for each content item
for content in update.contents:
content_type = getattr(content, "type", None)
logger.debug(f"Processing content type={content_type}, message_id={flow.message_id}")
for event in _emit_content(
content,
flow,
Expand Down Expand Up @@ -922,6 +1018,20 @@ async def run_agent_stream(
tool_call_id,
)

# Parse function arguments - skip confirm_changes if we can't parse
# (we can't ask user to confirm something we can't properly display)
try:
function_arguments = json.loads(tool_call.get("function", {}).get("arguments", "{}"))
Comment thread
moonbox3 marked this conversation as resolved.
except json.JSONDecodeError:
Comment thread
moonbox3 marked this conversation as resolved.
logger.warning(
"Failed to decode JSON arguments for confirm_changes tool '%s' "
"(tool_call_id=%s). Skipping confirmation flow - cannot display "
"malformed arguments to user for approval.",
tool_name,
tool_call_id,
)
continue # Skip to next tool call without emitting confirm_changes

# Emit confirm_changes tool call
confirm_id = generate_event_id()
yield ToolCallStartEvent(
Expand All @@ -932,15 +1042,28 @@ async def run_agent_stream(
confirm_args = {
"function_name": tool_name,
"function_call_id": tool_call_id,
"function_arguments": json.loads(tool_call.get("function", {}).get("arguments", "{}")),
"function_arguments": function_arguments,
"steps": [{"description": f"Execute {tool_name}", "status": "enabled"}],
}
yield ToolCallArgsEvent(tool_call_id=confirm_id, delta=json.dumps(confirm_args))
confirm_args_json = json.dumps(confirm_args)
yield ToolCallArgsEvent(tool_call_id=confirm_id, delta=confirm_args_json)
yield ToolCallEndEvent(tool_call_id=confirm_id)

# Track confirm_changes in pending_tool_calls for MessagesSnapshotEvent
# The frontend needs to see this in the snapshot to render the confirmation dialog
confirm_entry = {
"id": confirm_id,
"type": "function",
"function": {"name": "confirm_changes", "arguments": confirm_args_json},
}
flow.pending_tool_calls.append(confirm_entry)
flow.tool_calls_by_id[confirm_id] = confirm_entry
flow.tool_calls_ended.add(confirm_id) # Mark as ended since we emit End event
flow.waiting_for_approval = True

# Close any open message
if flow.message_id:
logger.info(f"End of run: closing text message message_id={flow.message_id}")
yield TextMessageEndEvent(message_id=flow.message_id)

# Emit MessagesSnapshotEvent if we have tool calls or results
Expand Down
12 changes: 10 additions & 2 deletions python/packages/ag-ui/tests/test_message_adapters.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,14 @@ def test_agui_tool_result_to_agent_framework():


def test_agui_tool_approval_updates_tool_call_arguments():
"""Tool approval updates matching tool call arguments for snapshots and agent context."""
"""Tool approval updates matching tool call arguments for snapshots and agent context.

The LLM context (ChatMessage) should contain only enabled steps, so the LLM
generates responses based on what was actually approved/executed.

The raw messages (for MESSAGES_SNAPSHOT) should contain all steps with status,
so the UI can show which steps were enabled/disabled.
"""
messages_input = [
{
"role": "assistant",
Expand Down Expand Up @@ -142,13 +149,14 @@ def test_agui_tool_approval_updates_tool_call_arguments():
assert len(messages) == 2
assistant_msg = messages[0]
func_call = next(content for content in assistant_msg.contents if content.type == "function_call")
# LLM context should only have enabled steps (what was actually approved)
assert func_call.arguments == {
"steps": [
{"description": "Boil water", "status": "enabled"},
{"description": "Brew coffee", "status": "disabled"},
{"description": "Serve coffee", "status": "enabled"},
]
}
# Raw messages (for MESSAGES_SNAPSHOT) should have all steps with status
assert messages_input[0]["tool_calls"][0]["function"]["arguments"] == {
"steps": [
{"description": "Boil water", "status": "enabled"},
Expand Down
Loading
Loading