Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions python/packages/core/agent_framework/_workflows/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
handler,
)
from ._function_executor import FunctionExecutor, executor
from ._handoff import HandoffBuilder, HandoffUserInputRequest
from ._magentic import (
MagenticAgentDeltaEvent,
MagenticAgentExecutor,
Expand Down Expand Up @@ -127,6 +128,8 @@
"FileCheckpointStorage",
"FunctionExecutor",
"GraphConnectivityError",
"HandoffBuilder",
"HandoffUserInputRequest",
"InMemoryCheckpointStorage",
"InProcRunnerContext",
"MagenticAgentDeltaEvent",
Expand Down
3 changes: 3 additions & 0 deletions python/packages/core/agent_framework/_workflows/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ from ._executor import (
handler,
)
from ._function_executor import FunctionExecutor, executor
from ._handoff import HandoffBuilder, HandoffUserInputRequest
from ._magentic import (
MagenticAgentDeltaEvent,
MagenticAgentExecutor,
Expand Down Expand Up @@ -125,6 +126,8 @@ __all__ = [
"FileCheckpointStorage",
"FunctionExecutor",
"GraphConnectivityError",
"HandoffBuilder",
"HandoffUserInputRequest",
"InMemoryCheckpointStorage",
"InProcRunnerContext",
"MagenticAgentDeltaEvent",
Expand Down
57 changes: 41 additions & 16 deletions python/packages/core/agent_framework/_workflows/_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
AgentThread,
BaseAgent,
ChatMessage,
FunctionApprovalRequestContent,
FunctionApprovalResponseContent,
FunctionCallContent,
FunctionResultContent,
Role,
Expand Down Expand Up @@ -266,16 +268,20 @@ def _convert_workflow_event_to_agent_update(
# Store the pending request for later correlation
self.pending_requests[request_id] = event

# Convert to function call content
# TODO(ekzhu): update this to FunctionApprovalRequestContent
# monitor: https://github.com/microsoft/agent-framework/issues/285
args = self.RequestInfoFunctionArgs(request_id=request_id, data=event.data).to_dict()

function_call = FunctionCallContent(
call_id=request_id,
name=self.REQUEST_INFO_FUNCTION_NAME,
arguments=self.RequestInfoFunctionArgs(request_id=request_id, data=event.data).to_dict(),
arguments=args,
)
approval_request = FunctionApprovalRequestContent(
id=request_id,
function_call=function_call,
additional_properties={"request_id": request_id},
)
return AgentRunResponseUpdate(
contents=[function_call],
contents=[function_call, approval_request],
role=Role.ASSISTANT,
author_name=self.name,
response_id=response_id,
Expand All @@ -293,26 +299,45 @@ def _extract_function_responses(self, input_messages: list[ChatMessage]) -> dict
function_responses: dict[str, Any] = {}
for message in input_messages:
for content in message.contents:
# TODO(ekzhu): update this to FunctionApprovalResponseContent
# monitor: https://github.com/microsoft/agent-framework/issues/285
if isinstance(content, FunctionResultContent):
if isinstance(content, FunctionApprovalResponseContent):
# Parse the function arguments to recover request payload
arguments_payload = content.function_call.arguments
if isinstance(arguments_payload, str):
try:
parsed_args = self.RequestInfoFunctionArgs.from_json(arguments_payload)
except ValueError as exc:
raise AgentExecutionException(
"FunctionApprovalResponseContent arguments must decode to a mapping."
) from exc
elif isinstance(arguments_payload, dict):
parsed_args = self.RequestInfoFunctionArgs.from_dict(arguments_payload)
else:
raise AgentExecutionException(
"FunctionApprovalResponseContent arguments must be a mapping or JSON string."
)

request_id = parsed_args.request_id or content.id
if not content.approved:
raise AgentExecutionException(f"Request '{request_id}' was not approved by the caller.")

if request_id in self.pending_requests:
function_responses[request_id] = parsed_args.data
elif bool(self.pending_requests):
raise AgentExecutionException(
"Only responses for pending requests are allowed when there are outstanding approvals."
)
elif isinstance(content, FunctionResultContent):
request_id = content.call_id
# Check if we have a pending request for this call_id
if request_id in self.pending_requests:
response_data = content.result if hasattr(content, "result") else str(content)
function_responses[request_id] = response_data
elif bool(self.pending_requests):
# Function result for unknown request when we have pending requests - this is an error
raise AgentExecutionException(
"Only FunctionResultContent for pending requests is allowed in input messages "
"when there are pending requests."
"Only function responses for pending requests are allowed while requests are outstanding."
)
else:
if bool(self.pending_requests):
# Non-function content when we have pending requests - this is an error
raise AgentExecutionException(
"Only FunctionResultContent is allowed in input messages when there are pending requests."
)
raise AgentExecutionException("Unexpected content type while awaiting request info responses.")
return function_responses

class _ResponseState(TypedDict):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# Copyright (c) Microsoft. All rights reserved.

from collections.abc import Iterable
from typing import Any, cast

from agent_framework import ChatMessage, Role

from ._checkpoint_encoding import decode_checkpoint_value, encode_checkpoint_value # type: ignore

"""Utilities for serializing and deserializing chat conversations for persistence.

These helpers convert rich `ChatMessage` instances to checkpoint-friendly payloads
using the same encoding primitives as the workflow runner. This preserves
`additional_properties` and other metadata without relying on unsafe mechanisms
such as pickling.
"""


def encode_chat_messages(messages: Iterable[ChatMessage]) -> list[dict[str, Any]]:
"""Serialize chat messages into checkpoint-safe payloads."""
encoded: list[dict[str, Any]] = []
for message in messages:
encoded.append({
"role": encode_checkpoint_value(message.role),
"contents": [encode_checkpoint_value(content) for content in message.contents],
"author_name": message.author_name,
"message_id": message.message_id,
"additional_properties": {
key: encode_checkpoint_value(value) for key, value in message.additional_properties.items()
},
})
return encoded


def decode_chat_messages(payload: Iterable[dict[str, Any]]) -> list[ChatMessage]:
"""Restore chat messages from checkpoint-safe payloads."""
restored: list[ChatMessage] = []
for item in payload:
if not isinstance(item, dict):
continue

role_value = decode_checkpoint_value(item.get("role"))
if isinstance(role_value, Role):
role = role_value
elif isinstance(role_value, dict):
role_dict = cast(dict[str, Any], role_value)
role = Role.from_dict(role_dict)
elif isinstance(role_value, str):
role = Role(value=role_value)
else:
role = Role.ASSISTANT

contents_field = item.get("contents", [])
contents: list[Any] = []
if isinstance(contents_field, list):
contents_iter: list[Any] = contents_field # type: ignore[assignment]
for entry in contents_iter:
decoded_entry: Any = decode_checkpoint_value(entry)
contents.append(decoded_entry)

additional_field = item.get("additional_properties", {})
additional: dict[str, Any] = {}
if isinstance(additional_field, dict):
additional_dict = cast(dict[str, Any], additional_field)
for key, value in additional_dict.items():
additional[key] = decode_checkpoint_value(value)

restored.append(
ChatMessage(
role=role,
contents=contents,
author_name=item.get("author_name"),
message_id=item.get("message_id"),
additional_properties=additional,
)
)
return restored
Loading
Loading