Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions python/packages/core/agent_framework/_workflows/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
handler,
)
from ._function_executor import FunctionExecutor, executor
from ._handoff import HandoffBuilder, HandoffUserInputRequest
from ._magentic import (
MagenticAgentDeltaEvent,
MagenticAgentExecutor,
Expand Down Expand Up @@ -126,6 +127,8 @@
"FileCheckpointStorage",
"FunctionExecutor",
"GraphConnectivityError",
"HandoffBuilder",
"HandoffUserInputRequest",
"InMemoryCheckpointStorage",
"InProcRunnerContext",
"MagenticAgentDeltaEvent",
Expand Down
3 changes: 3 additions & 0 deletions python/packages/core/agent_framework/_workflows/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ from ._executor import (
handler,
)
from ._function_executor import FunctionExecutor, executor
from ._handoff import HandoffBuilder, HandoffUserInputRequest
from ._magentic import (
MagenticAgentDeltaEvent,
MagenticAgentExecutor,
Expand Down Expand Up @@ -124,6 +125,8 @@ __all__ = [
"FileCheckpointStorage",
"FunctionExecutor",
"GraphConnectivityError",
"HandoffBuilder",
"HandoffUserInputRequest",
"InMemoryCheckpointStorage",
"InProcRunnerContext",
"MagenticAgentDeltaEvent",
Expand Down
57 changes: 41 additions & 16 deletions python/packages/core/agent_framework/_workflows/_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
AgentThread,
BaseAgent,
ChatMessage,
FunctionApprovalRequestContent,
FunctionApprovalResponseContent,
FunctionCallContent,
FunctionResultContent,
Role,
Expand Down Expand Up @@ -266,16 +268,20 @@ def _convert_workflow_event_to_agent_update(
# Store the pending request for later correlation
self.pending_requests[request_id] = event

# Convert to function call content
# TODO(ekzhu): update this to FunctionApprovalRequestContent
# monitor: https://github.com/microsoft/agent-framework/issues/285
args = self.RequestInfoFunctionArgs(request_id=request_id, data=event.data).to_dict()

function_call = FunctionCallContent(
call_id=request_id,
name=self.REQUEST_INFO_FUNCTION_NAME,
arguments=self.RequestInfoFunctionArgs(request_id=request_id, data=event.data).to_dict(),
arguments=args,
)
approval_request = FunctionApprovalRequestContent(
id=request_id,
function_call=function_call,
additional_properties={"request_id": request_id},
)
return AgentRunResponseUpdate(
contents=[function_call],
contents=[function_call, approval_request],
role=Role.ASSISTANT,
author_name=self.name,
response_id=response_id,
Expand All @@ -293,26 +299,45 @@ def _extract_function_responses(self, input_messages: list[ChatMessage]) -> dict
function_responses: dict[str, Any] = {}
for message in input_messages:
for content in message.contents:
# TODO(ekzhu): update this to FunctionApprovalResponseContent
# monitor: https://github.com/microsoft/agent-framework/issues/285
if isinstance(content, FunctionResultContent):
if isinstance(content, FunctionApprovalResponseContent):
# Parse the function arguments to recover request payload
arguments_payload = content.function_call.arguments
if isinstance(arguments_payload, str):
try:
parsed_args = self.RequestInfoFunctionArgs.from_json(arguments_payload)
except ValueError as exc:
raise AgentExecutionException(
"FunctionApprovalResponseContent arguments must decode to a mapping."
) from exc
elif isinstance(arguments_payload, dict):
parsed_args = self.RequestInfoFunctionArgs.from_dict(arguments_payload)
else:
raise AgentExecutionException(
"FunctionApprovalResponseContent arguments must be a mapping or JSON string."
)

request_id = parsed_args.request_id or content.id
if not content.approved:
raise AgentExecutionException(f"Request '{request_id}' was not approved by the caller.")

if request_id in self.pending_requests:
function_responses[request_id] = parsed_args.data
elif bool(self.pending_requests):
raise AgentExecutionException(
"Only responses for pending requests are allowed when there are outstanding approvals."
)
elif isinstance(content, FunctionResultContent):
request_id = content.call_id
# Check if we have a pending request for this call_id
if request_id in self.pending_requests:
response_data = content.result if hasattr(content, "result") else str(content)
function_responses[request_id] = response_data
elif bool(self.pending_requests):
# Function result for unknown request when we have pending requests - this is an error
raise AgentExecutionException(
"Only FunctionResultContent for pending requests is allowed in input messages "
"when there are pending requests."
"Only function responses for pending requests are allowed while requests are outstanding."
)
else:
if bool(self.pending_requests):
# Non-function content when we have pending requests - this is an error
raise AgentExecutionException(
"Only FunctionResultContent is allowed in input messages when there are pending requests."
)
raise AgentExecutionException("Unexpected content type while awaiting request info responses.")
return function_responses

class _ResponseState(TypedDict):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# Copyright (c) Microsoft. All rights reserved.

from collections.abc import Iterable
from typing import Any, cast

from agent_framework import ChatMessage, Role

from ._runner_context import _decode_checkpoint_value, _encode_checkpoint_value # type: ignore

"""Utilities for serializing and deserializing chat conversations for persistence.

These helpers convert rich `ChatMessage` instances to checkpoint-friendly payloads
using the same encoding primitives as the workflow runner. This preserves
`additional_properties` and other metadata without relying on unsafe mechanisms
such as pickling.
"""


def encode_chat_messages(messages: Iterable[ChatMessage]) -> list[dict[str, Any]]:
"""Serialize chat messages into checkpoint-safe payloads."""
encoded: list[dict[str, Any]] = []
for message in messages:
encoded.append({
"role": _encode_checkpoint_value(message.role),
"contents": [_encode_checkpoint_value(content) for content in message.contents],
"author_name": message.author_name,
"message_id": message.message_id,
"additional_properties": {
key: _encode_checkpoint_value(value) for key, value in message.additional_properties.items()
},
})
return encoded


def decode_chat_messages(payload: Iterable[dict[str, Any]]) -> list[ChatMessage]:
"""Restore chat messages from checkpoint-safe payloads."""
restored: list[ChatMessage] = []
for item in payload:
if not isinstance(item, dict):
continue

role_value = _decode_checkpoint_value(item.get("role"))
if isinstance(role_value, Role):
role = role_value
elif isinstance(role_value, dict):
role_dict = cast(dict[str, Any], role_value)
role = Role.from_dict(role_dict)
elif isinstance(role_value, str):
role = Role(value=role_value)
else:
role = Role.ASSISTANT

contents_field = item.get("contents", [])
contents: list[Any] = []
if isinstance(contents_field, list):
contents_iter: list[Any] = contents_field # type: ignore[assignment]
for entry in contents_iter:
decoded_entry: Any = _decode_checkpoint_value(entry)
contents.append(decoded_entry)

additional_field = item.get("additional_properties", {})
additional: dict[str, Any] = {}
if isinstance(additional_field, dict):
additional_dict = cast(dict[str, Any], additional_field)
for key, value in additional_dict.items():
additional[key] = _decode_checkpoint_value(value)

restored.append(
ChatMessage(
role=role,
contents=contents,
author_name=item.get("author_name"),
message_id=item.get("message_id"),
additional_properties=additional,
)
)
return restored
Loading