diff --git a/python/packages/core/agent_framework/_workflows/__init__.py b/python/packages/core/agent_framework/_workflows/__init__.py index fe4ca2123b..7c8b43dcc6 100644 --- a/python/packages/core/agent_framework/_workflows/__init__.py +++ b/python/packages/core/agent_framework/_workflows/__init__.py @@ -52,6 +52,7 @@ handler, ) from ._function_executor import FunctionExecutor, executor +from ._handoff import HandoffBuilder, HandoffUserInputRequest from ._magentic import ( MagenticAgentDeltaEvent, MagenticAgentExecutor, @@ -127,6 +128,8 @@ "FileCheckpointStorage", "FunctionExecutor", "GraphConnectivityError", + "HandoffBuilder", + "HandoffUserInputRequest", "InMemoryCheckpointStorage", "InProcRunnerContext", "MagenticAgentDeltaEvent", diff --git a/python/packages/core/agent_framework/_workflows/__init__.pyi b/python/packages/core/agent_framework/_workflows/__init__.pyi index 7ad32e115b..8055e625a2 100644 --- a/python/packages/core/agent_framework/_workflows/__init__.pyi +++ b/python/packages/core/agent_framework/_workflows/__init__.pyi @@ -50,6 +50,7 @@ from ._executor import ( handler, ) from ._function_executor import FunctionExecutor, executor +from ._handoff import HandoffBuilder, HandoffUserInputRequest from ._magentic import ( MagenticAgentDeltaEvent, MagenticAgentExecutor, @@ -125,6 +126,8 @@ __all__ = [ "FileCheckpointStorage", "FunctionExecutor", "GraphConnectivityError", + "HandoffBuilder", + "HandoffUserInputRequest", "InMemoryCheckpointStorage", "InProcRunnerContext", "MagenticAgentDeltaEvent", diff --git a/python/packages/core/agent_framework/_workflows/_agent.py b/python/packages/core/agent_framework/_workflows/_agent.py index 9e766d354c..cf50f192c5 100644 --- a/python/packages/core/agent_framework/_workflows/_agent.py +++ b/python/packages/core/agent_framework/_workflows/_agent.py @@ -14,6 +14,8 @@ AgentThread, BaseAgent, ChatMessage, + FunctionApprovalRequestContent, + FunctionApprovalResponseContent, FunctionCallContent, FunctionResultContent, Role, @@ -266,16 +268,20 @@ def _convert_workflow_event_to_agent_update( # Store the pending request for later correlation self.pending_requests[request_id] = event - # Convert to function call content - # TODO(ekzhu): update this to FunctionApprovalRequestContent - # monitor: https://github.com/microsoft/agent-framework/issues/285 + args = self.RequestInfoFunctionArgs(request_id=request_id, data=event.data).to_dict() + function_call = FunctionCallContent( call_id=request_id, name=self.REQUEST_INFO_FUNCTION_NAME, - arguments=self.RequestInfoFunctionArgs(request_id=request_id, data=event.data).to_dict(), + arguments=args, + ) + approval_request = FunctionApprovalRequestContent( + id=request_id, + function_call=function_call, + additional_properties={"request_id": request_id}, ) return AgentRunResponseUpdate( - contents=[function_call], + contents=[function_call, approval_request], role=Role.ASSISTANT, author_name=self.name, response_id=response_id, @@ -293,26 +299,45 @@ def _extract_function_responses(self, input_messages: list[ChatMessage]) -> dict function_responses: dict[str, Any] = {} for message in input_messages: for content in message.contents: - # TODO(ekzhu): update this to FunctionApprovalResponseContent - # monitor: https://github.com/microsoft/agent-framework/issues/285 - if isinstance(content, FunctionResultContent): + if isinstance(content, FunctionApprovalResponseContent): + # Parse the function arguments to recover request payload + arguments_payload = content.function_call.arguments + if isinstance(arguments_payload, str): + try: + parsed_args = self.RequestInfoFunctionArgs.from_json(arguments_payload) + except ValueError as exc: + raise AgentExecutionException( + "FunctionApprovalResponseContent arguments must decode to a mapping." + ) from exc + elif isinstance(arguments_payload, dict): + parsed_args = self.RequestInfoFunctionArgs.from_dict(arguments_payload) + else: + raise AgentExecutionException( + "FunctionApprovalResponseContent arguments must be a mapping or JSON string." + ) + + request_id = parsed_args.request_id or content.id + if not content.approved: + raise AgentExecutionException(f"Request '{request_id}' was not approved by the caller.") + + if request_id in self.pending_requests: + function_responses[request_id] = parsed_args.data + elif bool(self.pending_requests): + raise AgentExecutionException( + "Only responses for pending requests are allowed when there are outstanding approvals." + ) + elif isinstance(content, FunctionResultContent): request_id = content.call_id - # Check if we have a pending request for this call_id if request_id in self.pending_requests: response_data = content.result if hasattr(content, "result") else str(content) function_responses[request_id] = response_data elif bool(self.pending_requests): - # Function result for unknown request when we have pending requests - this is an error raise AgentExecutionException( - "Only FunctionResultContent for pending requests is allowed in input messages " - "when there are pending requests." + "Only function responses for pending requests are allowed while requests are outstanding." ) else: if bool(self.pending_requests): - # Non-function content when we have pending requests - this is an error - raise AgentExecutionException( - "Only FunctionResultContent is allowed in input messages when there are pending requests." - ) + raise AgentExecutionException("Unexpected content type while awaiting request info responses.") return function_responses class _ResponseState(TypedDict): diff --git a/python/packages/core/agent_framework/_workflows/_conversation_state.py b/python/packages/core/agent_framework/_workflows/_conversation_state.py new file mode 100644 index 0000000000..8c21513f6c --- /dev/null +++ b/python/packages/core/agent_framework/_workflows/_conversation_state.py @@ -0,0 +1,77 @@ +# Copyright (c) Microsoft. All rights reserved. + +from collections.abc import Iterable +from typing import Any, cast + +from agent_framework import ChatMessage, Role + +from ._checkpoint_encoding import decode_checkpoint_value, encode_checkpoint_value + +"""Utilities for serializing and deserializing chat conversations for persistence. + +These helpers convert rich `ChatMessage` instances to checkpoint-friendly payloads +using the same encoding primitives as the workflow runner. This preserves +`additional_properties` and other metadata without relying on unsafe mechanisms +such as pickling. +""" + + +def encode_chat_messages(messages: Iterable[ChatMessage]) -> list[dict[str, Any]]: + """Serialize chat messages into checkpoint-safe payloads.""" + encoded: list[dict[str, Any]] = [] + for message in messages: + encoded.append({ + "role": encode_checkpoint_value(message.role), + "contents": [encode_checkpoint_value(content) for content in message.contents], + "author_name": message.author_name, + "message_id": message.message_id, + "additional_properties": { + key: encode_checkpoint_value(value) for key, value in message.additional_properties.items() + }, + }) + return encoded + + +def decode_chat_messages(payload: Iterable[dict[str, Any]]) -> list[ChatMessage]: + """Restore chat messages from checkpoint-safe payloads.""" + restored: list[ChatMessage] = [] + for item in payload: + if not isinstance(item, dict): + continue + + role_value = decode_checkpoint_value(item.get("role")) + if isinstance(role_value, Role): + role = role_value + elif isinstance(role_value, dict): + role_dict = cast(dict[str, Any], role_value) + role = Role.from_dict(role_dict) + elif isinstance(role_value, str): + role = Role(value=role_value) + else: + role = Role.ASSISTANT + + contents_field = item.get("contents", []) + contents: list[Any] = [] + if isinstance(contents_field, list): + contents_iter: list[Any] = contents_field # type: ignore[assignment] + for entry in contents_iter: + decoded_entry: Any = decode_checkpoint_value(entry) + contents.append(decoded_entry) + + additional_field = item.get("additional_properties", {}) + additional: dict[str, Any] = {} + if isinstance(additional_field, dict): + additional_dict = cast(dict[str, Any], additional_field) + for key, value in additional_dict.items(): + additional[key] = decode_checkpoint_value(value) + + restored.append( + ChatMessage( + role=role, + contents=contents, + author_name=item.get("author_name"), + message_id=item.get("message_id"), + additional_properties=additional, + ) + ) + return restored diff --git a/python/packages/core/agent_framework/_workflows/_handoff.py b/python/packages/core/agent_framework/_workflows/_handoff.py new file mode 100644 index 0000000000..725e50cb25 --- /dev/null +++ b/python/packages/core/agent_framework/_workflows/_handoff.py @@ -0,0 +1,1374 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""High-level builder for conversational handoff workflows. + +The handoff pattern models a coordinator agent that optionally routes +control to specialist agents before handing the conversation back to the user. +The flow is intentionally cyclical: + + user input -> coordinator -> optional specialist -> request user input -> ... + +Key properties: +- The entire conversation is maintained and reused on every hop +- The coordinator signals a handoff by invoking a tool call that names the specialist +- After a specialist responds, the workflow immediately requests new user input +""" + +import logging +import re +from collections.abc import Awaitable, Callable, Mapping, Sequence +from dataclasses import dataclass, field +from typing import Any + +from agent_framework import ( + AgentProtocol, + AgentRunResponse, + AIFunction, + ChatMessage, + FunctionApprovalRequestContent, + FunctionCallContent, + FunctionResultContent, + Role, + ai_function, +) + +from .._agents import ChatAgent +from .._middleware import FunctionInvocationContext, FunctionMiddleware +from ._agent_executor import AgentExecutor, AgentExecutorRequest, AgentExecutorResponse +from ._checkpoint import CheckpointStorage +from ._conversation_state import decode_chat_messages, encode_chat_messages +from ._executor import Executor, handler +from ._request_info_executor import RequestInfoExecutor, RequestInfoMessage, RequestResponse +from ._workflow import Workflow +from ._workflow_builder import WorkflowBuilder +from ._workflow_context import WorkflowContext + +logger = logging.getLogger(__name__) + + +_HANDOFF_TOOL_PATTERN = re.compile(r"(?:handoff|transfer)[_\s-]*to[_\s-]*(?P[\w-]+)", re.IGNORECASE) + + +def _sanitize_alias(value: str) -> str: + """Normalise an agent alias into a lowercase identifier-safe string.""" + cleaned = re.sub(r"[^0-9a-zA-Z]+", "_", value).strip("_") + if not cleaned: + cleaned = "agent" + if cleaned[0].isdigit(): + cleaned = f"agent_{cleaned}" + return cleaned.lower() + + +def _create_handoff_tool(alias: str, description: str | None = None) -> AIFunction[Any, Any]: + """Construct the synthetic handoff tool that signals routing to `alias`.""" + sanitized = _sanitize_alias(alias) + tool_name = f"handoff_to_{sanitized}" + doc = description or f"Handoff to the {alias} agent." + + # Note: approval_mode is intentionally NOT set for handoff tools. + # Handoff tools are framework-internal signals that trigger routing logic, + # not actual function executions. They are automatically intercepted and + # never actually execute, so approval is unnecessary and causes issues + # with tool_calls/responses pairing when cleaning conversations. + @ai_function(name=tool_name, description=doc) + def _handoff_tool(context: str | None = None) -> str: + """Return a deterministic acknowledgement that encodes the target alias.""" + return f"Handoff to {alias}" + + return _handoff_tool + + +def _clone_chat_agent(agent: ChatAgent) -> ChatAgent: + """Produce a deep copy of the ChatAgent while preserving runtime configuration.""" + options = agent.chat_options + middleware = list(agent.middleware or []) + + return ChatAgent( + chat_client=agent.chat_client, + instructions=options.instructions, + id=agent.id, + name=agent.name, + description=agent.description, + chat_message_store_factory=agent.chat_message_store_factory, + context_providers=agent.context_provider, + middleware=middleware, + frequency_penalty=options.frequency_penalty, + logit_bias=dict(options.logit_bias) if options.logit_bias else None, + max_tokens=options.max_tokens, + metadata=dict(options.metadata) if options.metadata else None, + model_id=options.model_id, + presence_penalty=options.presence_penalty, + response_format=options.response_format, + seed=options.seed, + stop=options.stop, + store=options.store, + temperature=options.temperature, + tool_choice=options.tool_choice, # type: ignore[arg-type] + tools=list(options.tools) if options.tools else None, + top_p=options.top_p, + user=options.user, + additional_chat_options=dict(options.additional_properties), + ) + + +@dataclass +class HandoffUserInputRequest(RequestInfoMessage): + """Request message emitted when the workflow needs fresh user input.""" + + conversation: list[ChatMessage] = field(default_factory=lambda: []) # type: ignore[misc] + awaiting_agent_id: str | None = None + prompt: str | None = None + + +@dataclass +class _ConversationWithUserInput: + """Internal message carrying full conversation + new user messages from gateway to coordinator.""" + + full_conversation: list[ChatMessage] = field(default_factory=lambda: []) # type: ignore[misc] + + +class _AutoHandoffMiddleware(FunctionMiddleware): + """Intercept handoff tool invocations and short-circuit execution with synthetic results.""" + + def __init__(self, handoff_targets: Mapping[str, str]) -> None: + """Initialise middleware with the mapping from tool name to specialist id.""" + self._targets = {name.lower(): target for name, target in handoff_targets.items()} + + async def process( + self, + context: FunctionInvocationContext, + next: Callable[[FunctionInvocationContext], Awaitable[None]], + ) -> None: + """Intercept matching handoff tool calls and inject synthetic results.""" + name = getattr(context.function, "name", "") + normalized = name.lower() if name else "" + target = self._targets.get(normalized) + if target is None: + await next(context) + return + + # Short-circuit execution and provide deterministic response payload for the tool call. + context.result = {"handoff_to": target} + context.terminate = True + + +class _InputToConversation(Executor): + """Normalises initial workflow input into a list[ChatMessage].""" + + @handler + async def from_str(self, prompt: str, ctx: WorkflowContext[list[ChatMessage]]) -> None: + """Convert a raw user prompt into a conversation containing a single user message.""" + await ctx.send_message([ChatMessage(Role.USER, text=prompt)]) + + @handler + async def from_message(self, message: ChatMessage, ctx: WorkflowContext[list[ChatMessage]]) -> None: # type: ignore[name-defined] + """Pass through an existing chat message as the initial conversation.""" + await ctx.send_message([message]) + + @handler + async def from_messages( + self, + messages: list[ChatMessage], + ctx: WorkflowContext[list[ChatMessage]], + ) -> None: # type: ignore[name-defined] + """Forward a list of chat messages as the starting conversation history.""" + await ctx.send_message(list(messages)) + + +@dataclass +class _HandoffResolution: + """Result of handoff detection containing the target alias and originating call.""" + + target: str + function_call: FunctionCallContent | None = None + + +def _resolve_handoff_target(agent_response: AgentRunResponse) -> _HandoffResolution | None: + """Detect handoff intent from tool call metadata.""" + for message in agent_response.messages: + resolution = _resolution_from_message(message) + if resolution: + return resolution + + for request in agent_response.user_input_requests: + if isinstance(request, FunctionApprovalRequestContent): + resolution = _resolution_from_function_call(request.function_call) + if resolution: + return resolution + + return None + + +def _resolution_from_message(message: ChatMessage) -> _HandoffResolution | None: + """Inspect an assistant message for embedded handoff tool metadata.""" + for content in getattr(message, "contents", ()): + if isinstance(content, FunctionApprovalRequestContent): + resolution = _resolution_from_function_call(content.function_call) + if resolution: + return resolution + elif isinstance(content, FunctionCallContent): + resolution = _resolution_from_function_call(content) + if resolution: + return resolution + return None + + +def _resolution_from_function_call(function_call: FunctionCallContent | None) -> _HandoffResolution | None: + """Wrap the target resolved from a function call in a `_HandoffResolution`.""" + if function_call is None: + return None + target = _target_from_function_call(function_call) + if not target: + return None + return _HandoffResolution(target=target, function_call=function_call) + + +def _target_from_function_call(function_call: FunctionCallContent) -> str | None: + """Extract the handoff target from the tool name or structured arguments.""" + name_candidate = _target_from_tool_name(function_call.name) + if name_candidate: + return name_candidate + + arguments = function_call.parse_arguments() + if isinstance(arguments, Mapping): + value = arguments.get("handoff_to") + if isinstance(value, str) and value.strip(): + return value.strip() + elif isinstance(arguments, str): + stripped = arguments.strip() + if stripped: + name_candidate = _target_from_tool_name(stripped) + if name_candidate: + return name_candidate + return stripped + + return None + + +def _target_from_tool_name(name: str | None) -> str | None: + """Parse the specialist alias encoded in a handoff tool's name.""" + if not name: + return None + match = _HANDOFF_TOOL_PATTERN.search(name) + if match: + parsed = match.group("target").strip() + if parsed: + return parsed + return None + + +class _HandoffCoordinator(Executor): + """Coordinates agent-to-agent transfers and user turn requests.""" + + def __init__( + self, + *, + starting_agent_id: str, + specialist_ids: Mapping[str, str], + input_gateway_id: str, + termination_condition: Callable[[list[ChatMessage]], bool], + id: str, + handoff_tool_targets: Mapping[str, str] | None = None, + ) -> None: + """Create a coordinator that manages routing between specialists and the user.""" + super().__init__(id) + self._starting_agent_id = starting_agent_id + self._specialist_by_alias = dict(specialist_ids) + self._specialist_ids = set(specialist_ids.values()) + self._input_gateway_id = input_gateway_id + self._termination_condition = termination_condition + self._full_conversation: list[ChatMessage] = [] + self._handoff_tool_targets = {k.lower(): v for k, v in (handoff_tool_targets or {}).items()} + + @handler + async def handle_agent_response( + self, + response: AgentExecutorResponse, + ctx: WorkflowContext[AgentExecutorRequest | list[ChatMessage], list[ChatMessage]], + ) -> None: + """Process an agent's response and determine whether to route, request input, or terminate.""" + # Hydrate coordinator state (and detect new run) using checkpointable executor state + state = await ctx.get_executor_state() + if not state: + self._full_conversation = [] + elif not self._full_conversation: + restored = self._restore_conversation_from_state(state) + if restored: + self._full_conversation = restored + + source = ctx.get_source_executor_id() + is_starting_agent = source == self._starting_agent_id + + # On first turn of a run, full_conversation is empty + # Track new messages only, build authoritative history incrementally + if not self._full_conversation: + # First response from starting agent - initialize with authoritative conversation snapshot + # Keep the FULL conversation including tool calls (OpenAI SDK default behavior) + full_conv = self._conversation_from_response(response) + self._full_conversation = list(full_conv) + else: + # Subsequent responses - append only new messages from this agent + # Keep ALL messages including tool calls to maintain complete history + new_messages = list(response.agent_run_response.messages) + self._full_conversation.extend(new_messages) + + self._apply_response_metadata(self._full_conversation, response.agent_run_response) + + conversation = list(self._full_conversation) + + # Check for handoff from ANY agent (starting agent or specialist) + target = self._resolve_specialist(response.agent_run_response, conversation) + if target is not None: + await self._persist_state(ctx) + # Clean tool-related content before sending to next agent + cleaned = self._get_cleaned_conversation(conversation) + request = AgentExecutorRequest(messages=cleaned, should_respond=True) + await ctx.send_message(request, target_id=target) + return + + # No handoff detected - response must come from starting agent or known specialist + if not is_starting_agent and source not in self._specialist_ids: + raise RuntimeError(f"HandoffCoordinator received response from unknown executor '{source}'.") + + await self._persist_state(ctx) + + if self._termination_condition(conversation): + logger.info("Handoff workflow termination condition met. Ending conversation.") + await ctx.yield_output(list(conversation)) + return + + await ctx.send_message(list(conversation), target_id=self._input_gateway_id) + + @handler + async def handle_user_input( + self, + message: _ConversationWithUserInput, + ctx: WorkflowContext[AgentExecutorRequest, list[ChatMessage]], + ) -> None: + """Receive full conversation with new user input from gateway, update history, trim for agent.""" + # Update authoritative full conversation + self._full_conversation = list(message.full_conversation) + await self._persist_state(ctx) + + # Check termination before sending to agent + if self._termination_condition(self._full_conversation): + logger.info("Handoff workflow termination condition met. Ending conversation.") + await ctx.yield_output(list(self._full_conversation)) + return + + # Clean before sending to starting agent + cleaned = self._get_cleaned_conversation(self._full_conversation) + request = AgentExecutorRequest(messages=cleaned, should_respond=True) + await ctx.send_message(request, target_id=self._starting_agent_id) + + def _resolve_specialist(self, agent_response: AgentRunResponse, conversation: list[ChatMessage]) -> str | None: + """Resolve the specialist executor id requested by the agent response, if any.""" + resolution = _resolve_handoff_target(agent_response) + if not resolution: + return None + + candidate = resolution.target + normalized = candidate.lower() + resolved_id: str | None + if normalized in self._handoff_tool_targets: + resolved_id = self._handoff_tool_targets[normalized] + else: + resolved_id = self._specialist_by_alias.get(candidate) + + if resolved_id: + if resolution.function_call: + self._append_tool_acknowledgement(conversation, resolution.function_call, resolved_id) + return resolved_id + + lowered = candidate.lower() + for alias, exec_id in self._specialist_by_alias.items(): + if alias.lower() == lowered: + if resolution.function_call: + self._append_tool_acknowledgement(conversation, resolution.function_call, exec_id) + return exec_id + + logger.warning("Handoff requested unknown specialist '%s'.", candidate) + return None + + def _append_tool_acknowledgement( + self, + conversation: list[ChatMessage], + function_call: FunctionCallContent, + resolved_id: str, + ) -> None: + """Append a synthetic tool result acknowledging the resolved specialist id.""" + call_id = getattr(function_call, "call_id", None) + if not call_id: + return + + result_payload: Any = {"handoff_to": resolved_id} + result_content = FunctionResultContent(call_id=call_id, result=result_payload) + tool_message = ChatMessage( + role=Role.TOOL, + contents=[result_content], + author_name=function_call.name, + ) + # Add tool acknowledgement to both the conversation being sent and the full history + conversation.append(tool_message) + self._full_conversation.append(tool_message) + + def _conversation_from_response(self, response: AgentExecutorResponse) -> list[ChatMessage]: + """Return the authoritative conversation snapshot from an executor response.""" + conversation = response.full_conversation + if conversation is None: + raise RuntimeError( + "AgentExecutorResponse.full_conversation missing; AgentExecutor must populate it in handoff workflows." + ) + return list(conversation) + + def _get_cleaned_conversation(self, conversation: list[ChatMessage]) -> list[ChatMessage]: + """Create a cleaned copy of conversation with tool-related content removed. + + This method creates a copy of the conversation and removes tool-related content + before passing it to agents. The original conversation is preserved for handoff + detection and state management. + + During handoffs, tool calls (including handoff tools) cause OpenAI API errors. The OpenAI + API requires that: + 1. Assistant messages with tool_calls must be followed by corresponding tool responses + 2. Tool response messages must follow an assistant message with tool_calls + + To avoid these errors, we remove ALL tool-related content from the conversation: + - FunctionApprovalRequestContent and FunctionCallContent from assistant messages + - Tool response messages (Role.TOOL) + + This follows the pattern from OpenAI Agents SDK's `remove_all_tools` filter, which strips + all tool-related content from conversation history during handoffs. + + Removes: + - FunctionApprovalRequestContent: Approval requests for tools + - FunctionCallContent: Tool calls made by the agent + - Tool response messages (Role.TOOL with FunctionResultContent) + - Messages with only tool calls and no text content + + Preserves: + - User messages + - Assistant messages with text content (tool calls are stripped out) + """ + # Create a copy to avoid modifying the original + cleaned: list[ChatMessage] = [] + for msg in conversation: + # Skip tool response messages - they must be paired with tool calls which we're removing + if msg.role == Role.TOOL: + continue + + # Check if message has tool-related content + has_tool_content = False + if msg.contents: + has_tool_content = any( + isinstance(content, (FunctionApprovalRequestContent, FunctionCallContent)) + for content in msg.contents + ) + + # If no tool content, keep the original message + if not has_tool_content: + cleaned.append(msg) + continue + + # Message has tool content - only keep if it also has text + if msg.text and msg.text.strip(): + # Create fresh text-only message to avoid tool_calls being regenerated + msg_copy = ChatMessage( + role=msg.role, + text=msg.text, + author_name=msg.author_name, + ) + cleaned.append(msg_copy) + + return cleaned + + async def _persist_state(self, ctx: WorkflowContext[Any, Any]) -> None: + """Store authoritative conversation snapshot without losing rich metadata.""" + state_payload = {"full_conversation": encode_chat_messages(self._full_conversation)} + await ctx.set_executor_state(state_payload) + + def _restore_conversation_from_state(self, state: Mapping[str, Any]) -> list[ChatMessage]: + """Rehydrate the coordinator's conversation history from checkpointed state.""" + raw_conv = state.get("full_conversation") + if not isinstance(raw_conv, list): + return [] + return decode_chat_messages(raw_conv) # type: ignore[arg-type] + + def _apply_response_metadata(self, conversation: list[ChatMessage], agent_response: AgentRunResponse) -> None: + """Merge top-level response metadata into the latest assistant message.""" + if not agent_response.additional_properties: + return + + # Find the most recent assistant message contributed by this response + for message in reversed(conversation): + if message.role == Role.ASSISTANT: + metadata = agent_response.additional_properties or {} + if not metadata: + return + # Merge metadata without mutating shared dict from agent response + merged = dict(message.additional_properties or {}) + for key, value in metadata.items(): + merged.setdefault(key, value) + message.additional_properties = merged + break + + +class _UserInputGateway(Executor): + """Bridges conversation context with RequestInfoExecutor and re-enters the loop.""" + + def __init__( + self, + *, + request_executor_id: str, + starting_agent_id: str, + prompt: str | None, + id: str, + ) -> None: + """Initialise the gateway that requests user input and forwards responses.""" + super().__init__(id) + self._request_executor_id = request_executor_id + self._starting_agent_id = starting_agent_id + self._prompt = prompt or "Provide your next input for the conversation." + + @handler + async def request_input( + self, + conversation: list[ChatMessage], + ctx: WorkflowContext[HandoffUserInputRequest], + ) -> None: + """Emit a `HandoffUserInputRequest` capturing the conversation snapshot.""" + if not conversation: + raise ValueError("Handoff workflow requires non-empty conversation before requesting user input.") + request = HandoffUserInputRequest( + conversation=list(conversation), + awaiting_agent_id=self._starting_agent_id, + prompt=self._prompt, + ) + request.source_executor_id = self.id + await ctx.send_message(request, target_id=self._request_executor_id) + + @handler + async def resume_from_user( + self, + response: RequestResponse[HandoffUserInputRequest, Any], + ctx: WorkflowContext[_ConversationWithUserInput], + ) -> None: + """Convert user input responses back into chat messages and resume the workflow.""" + # Reconstruct full conversation with new user input + conversation = list(response.original_request.conversation) + user_messages = _as_user_messages(response.data) + conversation.extend(user_messages) + + # Send full conversation back to coordinator (not trimmed) + # Coordinator will update its authoritative history and trim for agent + message = _ConversationWithUserInput(full_conversation=conversation) + # CRITICAL: Must specify target to avoid broadcasting to all connected executors + # Gateway is connected to both request_info and coordinator, we want coordinator only + await ctx.send_message(message, target_id="handoff-coordinator") + + +def _as_user_messages(payload: Any) -> list[ChatMessage]: + """Normalise arbitrary payloads into user-authored chat messages.""" + if isinstance(payload, ChatMessage): + if payload.role == Role.USER: + return [payload] + return [ChatMessage(Role.USER, text=payload.text)] + if isinstance(payload, list): + # Check if all items are ChatMessage instances + all_chat_messages = all(isinstance(msg, ChatMessage) for msg in payload) # type: ignore[arg-type] + if all_chat_messages: + messages: list[ChatMessage] = payload # type: ignore[assignment] + return [msg if msg.role == Role.USER else ChatMessage(Role.USER, text=msg.text) for msg in messages] + if isinstance(payload, Mapping): # User supplied structured data + text = payload.get("text") or payload.get("content") # type: ignore[union-attr] + if isinstance(text, str) and text.strip(): + return [ChatMessage(Role.USER, text=text.strip())] + return [ChatMessage(Role.USER, text=str(payload))] # type: ignore[arg-type] + + +def _default_termination_condition(conversation: list[ChatMessage]) -> bool: + """Default termination: stop after 10 user messages to prevent infinite loops.""" + user_message_count = sum(1 for msg in conversation if msg.role == Role.USER) + return user_message_count >= 10 + + +class HandoffBuilder: + r"""Fluent builder for conversational handoff workflows with coordinator and specialist agents. + + The handoff pattern enables a coordinator agent to route requests to specialist agents. + A termination condition determines when the workflow should stop requesting input and complete. + + Routing Patterns: + + **Single-Tier (Default):** Only the coordinator can hand off to specialists. After any specialist + responds, control returns to the user for more input. This creates a cyclical flow: + user -> coordinator -> [optional specialist] -> user -> coordinator -> ... + + **Multi-Tier (Advanced):** Specialists can hand off to other specialists using `.add_handoff()`. + This provides more flexibility for complex workflows but is less controllable than the single-tier + pattern. Users lose real-time visibility into intermediate steps during specialist-to-specialist + handoffs (though the full conversation history including all handoffs is preserved and can be + inspected afterward). + + + Key Features: + - **Automatic handoff detection**: The coordinator invokes a handoff tool whose + arguments (for example ``{"handoff_to": "shipping_agent"}``) identify the specialist to receive control. + - **Auto-generated tools**: By default the builder synthesizes `handoff_to_` tools for the coordinator, + so you don't manually define placeholder functions. + - **Full conversation history**: The entire conversation (including any + `ChatMessage.additional_properties`) is preserved and passed to each agent. + - **Termination control**: By default, terminates after 10 user messages. Override with + `.with_termination_condition(lambda conv: ...)` for custom logic (e.g., detect "goodbye"). + - **Checkpointing**: Optional persistence for resumable workflows. + + Usage (Single-Tier): + + .. code-block:: python + + from agent_framework import HandoffBuilder + from agent_framework.openai import OpenAIChatClient + + chat_client = OpenAIChatClient() + + # Create coordinator and specialist agents + coordinator = chat_client.create_agent( + instructions=( + "You are a frontline support agent. Assess the user's issue and decide " + "whether to hand off to 'refund_agent' or 'shipping_agent'. When delegation is " + "required, call the matching handoff tool (for example `handoff_to_refund_agent`)." + ), + name="coordinator_agent", + ) + + refund = chat_client.create_agent( + instructions="You handle refund requests. Ask for order details and process refunds.", + name="refund_agent", + ) + + shipping = chat_client.create_agent( + instructions="You resolve shipping issues. Track packages and update delivery status.", + name="shipping_agent", + ) + + # Build the handoff workflow - default single-tier routing + workflow = ( + HandoffBuilder( + name="customer_support", + participants=[coordinator, refund, shipping], + ) + .set_coordinator("coordinator_agent") + .build() + ) + + # Run the workflow + events = await workflow.run_stream("My package hasn't arrived yet") + async for event in events: + if isinstance(event, RequestInfoEvent): + # Request user input + user_response = input("You: ") + await workflow.send_response(event.data.request_id, user_response) + + **Multi-Tier Routing with .add_handoff():** + + .. code-block:: python + + # Enable specialist-to-specialist handoffs with fluent API + workflow = ( + HandoffBuilder(participants=[coordinator, replacement, delivery, billing]) + .set_coordinator("coordinator_agent") + .add_handoff(coordinator, [replacement, delivery, billing]) # Coordinator routes to all + .add_handoff(replacement, [delivery, billing]) # Replacement delegates to delivery/billing + .add_handoff(delivery, billing) # Delivery escalates to billing + .build() + ) + + # Flow: User → Coordinator → Replacement → Delivery → Back to User + # (Replacement hands off to Delivery without returning to user) + + **Custom Termination Condition:** + + .. code-block:: python + + # Terminate when user says goodbye or after 5 exchanges + workflow = ( + HandoffBuilder(participants=[coordinator, refund, shipping]) + .set_coordinator("coordinator_agent") + .with_termination_condition( + lambda conv: sum(1 for msg in conv if msg.role.value == "user") >= 5 + or any("goodbye" in msg.text.lower() for msg in conv[-2:]) + ) + .build() + ) + + **Checkpointing:** + + .. code-block:: python + + from agent_framework import InMemoryCheckpointStorage + + storage = InMemoryCheckpointStorage() + workflow = ( + HandoffBuilder(participants=[coordinator, refund, shipping]) + .set_coordinator("coordinator_agent") + .with_checkpointing(storage) + .build() + ) + + Args: + name: Optional workflow name for identification and logging. + participants: List of agents (AgentProtocol) or executors to participate in the handoff. + The first agent you specify as coordinator becomes the orchestrating agent. + description: Optional human-readable description of the workflow. + + Raises: + ValueError: If participants list is empty, contains duplicates, or coordinator not specified. + TypeError: If participants are not AgentProtocol or Executor instances. + """ + + def __init__( + self, + *, + name: str | None = None, + participants: Sequence[AgentProtocol | Executor] | None = None, + description: str | None = None, + ) -> None: + r"""Initialize a HandoffBuilder for creating conversational handoff workflows. + + The builder starts in an unconfigured state and requires you to call: + 1. `.participants([...])` - Register agents + 2. `.set_coordinator(...)` - Designate which agent receives initial user input + 3. `.build()` - Construct the final Workflow + + Optional configuration methods allow you to customize context management, + termination logic, and persistence. + + Args: + name: Optional workflow identifier used in logging and debugging. + If not provided, a default name will be generated. + participants: Optional list of agents (AgentProtocol) or executors that will + participate in the handoff workflow. You can also call + `.participants([...])` later. Each participant must have a + unique identifier (name for agents, id for executors). + description: Optional human-readable description explaining the workflow's + purpose. Useful for documentation and observability. + + Note: + Participants must have stable names/ids because the workflow maps the + handoff tool arguments to these identifiers. Agent names should match + the strings emitted by the coordinator's handoff tool (e.g., a tool that + outputs ``{\"handoff_to\": \"billing\"}`` requires an agent named ``billing``). + """ + self._name = name + self._description = description + self._executors: dict[str, Executor] = {} + self._aliases: dict[str, str] = {} + self._starting_agent_id: str | None = None + self._checkpoint_storage: CheckpointStorage | None = None + self._request_prompt: str | None = None + self._termination_condition: Callable[[list[ChatMessage]], bool] = _default_termination_condition + self._auto_register_handoff_tools: bool = True + self._handoff_config: dict[str, list[str]] = {} # Maps agent_id -> [target_agent_ids] + + if participants: + self.participants(participants) + + def participants(self, participants: Sequence[AgentProtocol | Executor]) -> "HandoffBuilder": + """Register the agents or executors that will participate in the handoff workflow. + + Each participant must have a unique identifier (name for agents, id for executors). + The workflow will automatically create an alias map so agents can be referenced by + their name, display_name, or executor id when routing. + + Args: + participants: Sequence of AgentProtocol or Executor instances. Each must have + a unique identifier. For agents, the name attribute is used as the + primary identifier and must match handoff target strings. + + Returns: + Self for method chaining. + + Raises: + ValueError: If participants is empty or contains duplicates. + TypeError: If participants are not AgentProtocol or Executor instances. + + Example: + + .. code-block:: python + + from agent_framework import HandoffBuilder + from agent_framework.openai import OpenAIChatClient + + client = OpenAIChatClient() + coordinator = client.create_agent(instructions="...", name="coordinator") + refund = client.create_agent(instructions="...", name="refund_agent") + billing = client.create_agent(instructions="...", name="billing_agent") + + builder = HandoffBuilder().participants([coordinator, refund, billing]) + # Now you can call .set_coordinator() to designate the entry point + + Note: + This method resets any previously configured coordinator, so you must call + `.set_coordinator(...)` again after changing participants. + """ + if not participants: + raise ValueError("participants cannot be empty") + + wrapped: list[Executor] = [] + seen_ids: set[str] = set() + alias_map: dict[str, str] = {} + + def _register_alias(alias: str | None, exec_id: str) -> None: + """Record canonical and sanitised aliases that resolve to the executor id.""" + if not alias: + return + alias_map[alias] = exec_id + sanitized = _sanitize_alias(alias) + if sanitized and sanitized not in alias_map: + alias_map[sanitized] = exec_id + + for p in participants: + executor = self._wrap_participant(p) + if executor.id in seen_ids: + raise ValueError(f"Duplicate participant with id '{executor.id}' detected") + seen_ids.add(executor.id) + wrapped.append(executor) + + _register_alias(executor.id, executor.id) + if isinstance(p, AgentProtocol): + name = getattr(p, "name", None) + _register_alias(name, executor.id) + display = getattr(p, "display_name", None) + if isinstance(display, str) and display: + _register_alias(display, executor.id) + + self._executors = {executor.id: executor for executor in wrapped} + self._aliases = alias_map + self._starting_agent_id = None + return self + + def set_coordinator(self, agent: str | AgentProtocol | Executor) -> "HandoffBuilder": + r"""Designate which agent receives initial user input and orchestrates specialist routing. + + The coordinator agent is responsible for analyzing user requests and deciding whether to: + 1. Handle the request directly and respond to the user, OR + 2. Hand off to a specialist agent by including handoff metadata in the response + + After a specialist responds, the workflow automatically returns control to the user, + creating a cyclical flow: user -> coordinator -> [specialist] -> user -> ... + + Args: + agent: The agent to use as the coordinator. Can be: + - Agent name (str): e.g., "coordinator_agent" + - AgentProtocol instance: The actual agent object + - Executor instance: A custom executor wrapping an agent + + Returns: + Self for method chaining. + + Raises: + ValueError: If participants(...) hasn't been called yet, or if the specified + agent is not in the participants list. + + Example: + + .. code-block:: python + + # Use agent name + builder = HandoffBuilder().participants([coordinator, refund, billing]).set_coordinator("coordinator") + + # Or pass the agent object directly + builder = HandoffBuilder().participants([coordinator, refund, billing]).set_coordinator(coordinator) + + Note: + The coordinator determines routing by invoking a handoff tool call whose + arguments identify the target specialist (for example ``{\"handoff_to\": \"billing\"}``). + Decorate the tool with ``approval_mode="always_require"`` to ensure the workflow + intercepts the call before execution and can make the transition. + """ + if not self._executors: + raise ValueError("Call participants(...) before coordinator(...)") + resolved = self._resolve_to_id(agent) + if resolved not in self._executors: + raise ValueError(f"coordinator '{resolved}' is not part of the participants list") + self._starting_agent_id = resolved + return self + + def add_handoff( + self, + source: str | AgentProtocol | Executor, + targets: str | AgentProtocol | Executor | Sequence[str | AgentProtocol | Executor], + *, + tool_name: str | None = None, + tool_description: str | None = None, + ) -> "HandoffBuilder": + """Add handoff routing from a source agent to one or more target agents. + + This method enables specialist-to-specialist handoffs by configuring which agents + can hand off to which others. Call this method multiple times to build a complete + routing graph. By default, only the starting agent can hand off to all other participants; + use this method to enable additional routing paths. + + Args: + source: The agent that can initiate the handoff. Can be: + - Agent name (str): e.g., "triage_agent" + - AgentProtocol instance: The actual agent object + - Executor instance: A custom executor wrapping an agent + targets: One or more target agents that the source can hand off to. Can be: + - Single agent: "billing_agent" or agent_instance + - Multiple agents: ["billing_agent", "support_agent"] or [agent1, agent2] + tool_name: Optional custom name for the handoff tool. If not provided, generates + "handoff_to_" for single targets or "handoff_to__agent" + for multiple targets based on target names. + tool_description: Optional custom description for the handoff tool. If not provided, + generates "Handoff to the agent." + + Returns: + Self for method chaining. + + Raises: + ValueError: If source or targets are not in the participants list, or if + participants(...) hasn't been called yet. + + Examples: + Single target: + + .. code-block:: python + + builder.add_handoff("triage_agent", "billing_agent") + + Multiple targets (using agent names): + + .. code-block:: python + + builder.add_handoff("triage_agent", ["billing_agent", "support_agent", "escalation_agent"]) + + Multiple targets (using agent instances): + + .. code-block:: python + + builder.add_handoff(triage, [billing, support, escalation]) + + Chain multiple configurations: + + .. code-block:: python + + workflow = ( + HandoffBuilder(participants=[triage, replacement, delivery, billing]) + .set_coordinator(triage) + .add_handoff(triage, [replacement, delivery, billing]) + .add_handoff(replacement, [delivery, billing]) + .add_handoff(delivery, billing) + .build() + ) + + Custom tool names and descriptions: + + .. code-block:: python + + builder.add_handoff( + "support_agent", + "escalation_agent", + tool_name="escalate_to_l2", + tool_description="Escalate this issue to Level 2 support", + ) + + Note: + - Handoff tools are automatically registered for each source agent + - If a source agent is configured multiple times via add_handoff, targets are merged + """ + if not self._executors: + raise ValueError("Call participants(...) before add_handoff(...)") + + # Resolve source agent ID + source_id = self._resolve_to_id(source) + if source_id not in self._executors: + raise ValueError(f"Source agent '{source}' is not in the participants list") + + # Normalize targets to list + target_list = [targets] if isinstance(targets, (str, AgentProtocol, Executor)) else list(targets) + + # Resolve all target IDs + target_ids: list[str] = [] + for target in target_list: + target_id = self._resolve_to_id(target) + if target_id not in self._executors: + raise ValueError(f"Target agent '{target}' is not in the participants list") + target_ids.append(target_id) + + # Merge with existing handoff configuration for this source + if source_id in self._handoff_config: + # Add new targets to existing list, avoiding duplicates + existing = self._handoff_config[source_id] + for target_id in target_ids: + if target_id not in existing: + existing.append(target_id) + else: + self._handoff_config[source_id] = target_ids + + return self + + def auto_register_handoff_tools(self, enabled: bool) -> "HandoffBuilder": + """Configure whether the builder should synthesize handoff tools for the starting agent.""" + self._auto_register_handoff_tools = enabled + return self + + def _apply_auto_tools(self, agent: ChatAgent, specialists: Mapping[str, Executor]) -> dict[str, str]: + """Attach synthetic handoff tools to a chat agent and return the target lookup table.""" + chat_options = agent.chat_options + existing_tools = list(chat_options.tools or []) + existing_names = {getattr(tool, "name", "") for tool in existing_tools if hasattr(tool, "name")} + + tool_targets: dict[str, str] = {} + new_tools: list[Any] = [] + for exec_id in specialists: + alias = exec_id + sanitized = _sanitize_alias(alias) + tool = _create_handoff_tool(alias) + if tool.name not in existing_names: + new_tools.append(tool) + tool_targets[tool.name.lower()] = exec_id + tool_targets[sanitized] = exec_id + tool_targets[alias.lower()] = exec_id + + if new_tools: + chat_options.tools = existing_tools + new_tools + else: + chat_options.tools = existing_tools + + return tool_targets + + def _resolve_agent_id(self, agent_identifier: str) -> str: + """Resolve an agent identifier to an executor ID. + + Args: + agent_identifier: Can be agent name, display name, or executor ID + + Returns: + The executor ID + + Raises: + ValueError: If the identifier cannot be resolved + """ + # Check if it's already an executor ID + if agent_identifier in self._executors: + return agent_identifier + + # Check if it's an alias + if agent_identifier in self._aliases: + return self._aliases[agent_identifier] + + # Not found + raise ValueError(f"Agent identifier '{agent_identifier}' not found in participants") + + def _prepare_agent_with_handoffs( + self, + executor: AgentExecutor, + target_agents: Mapping[str, Executor], + ) -> tuple[AgentExecutor, dict[str, str]]: + """Prepare an agent by adding handoff tools for the specified target agents. + + Args: + executor: The agent executor to prepare + target_agents: Map of executor IDs to target executors this agent can hand off to + + Returns: + Tuple of (updated executor, tool_targets map) + """ + agent = getattr(executor, "_agent", None) + if not isinstance(agent, ChatAgent): + return executor, {} + + cloned_agent = _clone_chat_agent(agent) + tool_targets = self._apply_auto_tools(cloned_agent, target_agents) + if tool_targets: + middleware = _AutoHandoffMiddleware(tool_targets) + existing_middleware = list(cloned_agent.middleware or []) + existing_middleware.append(middleware) + cloned_agent.middleware = existing_middleware + + new_executor = AgentExecutor( + cloned_agent, + agent_thread=getattr(executor, "_agent_thread", None), + output_response=getattr(executor, "_output_response", False), + id=executor.id, + ) + return new_executor, tool_targets + + def request_prompt(self, prompt: str | None) -> "HandoffBuilder": + """Set a custom prompt message displayed when requesting user input. + + By default, the workflow uses a generic prompt: "Provide your next input for the + conversation." Use this method to customize the message shown to users when the + workflow needs their response. + + Args: + prompt: Custom prompt text to display, or None to use the default prompt. + + Returns: + Self for method chaining. + + Example: + + .. code-block:: python + + workflow = ( + HandoffBuilder(participants=[triage, refund, billing]) + .set_coordinator("triage") + .request_prompt("How can we help you today?") + .build() + ) + + # For more context-aware prompts, you can access the prompt via + # RequestInfoEvent.data.prompt in your event handling loop + + Note: + The prompt is static and set once during workflow construction. If you need + dynamic prompts based on conversation state, you'll need to handle that in + your application's event processing logic. + """ + self._request_prompt = prompt + return self + + def with_checkpointing(self, checkpoint_storage: CheckpointStorage) -> "HandoffBuilder": + """Enable workflow state persistence for resumable conversations. + + Checkpointing allows the workflow to save its state at key points, enabling you to: + - Resume conversations after application restarts + - Implement long-running support tickets that span multiple sessions + - Recover from failures without losing conversation context + - Audit and replay conversation history + + Args: + checkpoint_storage: Storage backend implementing CheckpointStorage interface. + Common implementations: InMemoryCheckpointStorage (testing), + database-backed storage (production). + + Returns: + Self for method chaining. + + Example (In-Memory): + + .. code-block:: python + + from agent_framework import InMemoryCheckpointStorage + + storage = InMemoryCheckpointStorage() + workflow = ( + HandoffBuilder(participants=[triage, refund, billing]) + .set_coordinator("triage") + .with_checkpointing(storage) + .build() + ) + + # Run workflow with a session ID for resumption + async for event in workflow.run_stream("Help me", session_id="user_123"): + # Process events... + pass + + # Later, resume the same conversation + async for event in workflow.run_stream("I need a refund", session_id="user_123"): + # Conversation continues from where it left off + pass + + Use Cases: + - Customer support systems with persistent ticket history + - Multi-day conversations that need to survive server restarts + - Compliance requirements for conversation auditing + - A/B testing different agent configurations on same conversation + + Note: + Checkpointing adds overhead for serialization and storage I/O. Use it when + persistence is required, not for simple stateless request-response patterns. + """ + self._checkpoint_storage = checkpoint_storage + return self + + def with_termination_condition(self, condition: Callable[[list[ChatMessage]], bool]) -> "HandoffBuilder": + """Set a custom termination condition for the handoff workflow. + + Args: + condition: Function that receives the full conversation and returns True + if the workflow should terminate (not request further user input). + + Returns: + Self for chaining. + + Example: + + .. code-block:: python + + builder.with_termination_condition( + lambda conv: len(conv) > 20 or any("goodbye" in msg.text.lower() for msg in conv[-2:]) + ) + """ + self._termination_condition = condition + return self + + def build(self) -> Workflow: + """Construct the final Workflow instance from the configured builder. + + This method validates the configuration and assembles all internal components: + - Input normalization executor + - Starting agent executor + - Handoff coordinator + - Specialist agent executors + - User input gateway + - Request/response handling + + Returns: + A fully configured Workflow ready to execute via `.run()` or `.run_stream()`. + + Raises: + ValueError: If participants or coordinator were not configured, or if + required configuration is invalid. + + Example (Minimal): + + .. code-block:: python + + workflow = ( + HandoffBuilder(participants=[coordinator, refund, billing]).set_coordinator("coordinator").build() + ) + + # Run the workflow + async for event in workflow.run_stream("I need help"): + # Handle events... + pass + + Example (Full Configuration): + + .. code-block:: python + + from agent_framework import InMemoryCheckpointStorage + + storage = InMemoryCheckpointStorage() + workflow = ( + HandoffBuilder( + name="support_workflow", + participants=[coordinator, refund, billing], + description="Customer support with specialist routing", + ) + .set_coordinator("coordinator") + .with_termination_condition(lambda conv: len(conv) > 20) + .request_prompt("How can we help?") + .with_checkpointing(storage) + .build() + ) + + Note: + After calling build(), the builder instance should not be reused. Create a + new builder if you need to construct another workflow with different configuration. + """ + if not self._executors: + raise ValueError("No participants provided. Call participants([...]) first.") + if self._starting_agent_id is None: + raise ValueError("coordinator must be defined before build().") + + starting_executor = self._executors[self._starting_agent_id] + specialists = { + exec_id: executor for exec_id, executor in self._executors.items() if exec_id != self._starting_agent_id + } + + # Build handoff tool registry for all agents that need them + handoff_tool_targets: dict[str, str] = {} + if self._auto_register_handoff_tools: + # Determine which agents should have handoff tools + if self._handoff_config: + # Use explicit handoff configuration from add_handoff() calls + for source_exec_id, target_exec_ids in self._handoff_config.items(): + executor = self._executors.get(source_exec_id) + if not executor: + raise ValueError(f"Handoff source agent '{source_exec_id}' not found in participants") + + if isinstance(executor, AgentExecutor): + # Build targets map for this source agent + targets_map: dict[str, Executor] = {} + for target_exec_id in target_exec_ids: + target_executor = self._executors.get(target_exec_id) + if not target_executor: + raise ValueError(f"Handoff target agent '{target_exec_id}' not found in participants") + targets_map[target_exec_id] = target_executor + + # Register handoff tools for this agent + updated_executor, tool_targets = self._prepare_agent_with_handoffs(executor, targets_map) + self._executors[source_exec_id] = updated_executor + handoff_tool_targets.update(tool_targets) + else: + # Default behavior: only coordinator gets handoff tools to all specialists + if isinstance(starting_executor, AgentExecutor) and specialists: + starting_executor, tool_targets = self._prepare_agent_with_handoffs(starting_executor, specialists) + self._executors[self._starting_agent_id] = starting_executor + handoff_tool_targets.update(tool_targets) # Update references after potential agent modifications + starting_executor = self._executors[self._starting_agent_id] + specialists = { + exec_id: executor for exec_id, executor in self._executors.items() if exec_id != self._starting_agent_id + } + + if not specialists: + logger.warning("Handoff workflow has no specialist agents; the coordinator will loop with the user.") + + input_node = _InputToConversation(id="input-conversation") + request_info = RequestInfoExecutor(id=f"{starting_executor.id}_handoff_requests") + user_gateway = _UserInputGateway( + request_executor_id=request_info.id, + starting_agent_id=starting_executor.id, + prompt=self._request_prompt, + id="handoff-user-input", + ) + coordinator = _HandoffCoordinator( + starting_agent_id=starting_executor.id, + specialist_ids={alias: exec_id for alias, exec_id in self._aliases.items() if exec_id in specialists}, + input_gateway_id=user_gateway.id, + termination_condition=self._termination_condition, + id="handoff-coordinator", + handoff_tool_targets=handoff_tool_targets, + ) + + builder = WorkflowBuilder(name=self._name, description=self._description) + builder.set_start_executor(input_node) + builder.add_edge(input_node, starting_executor) + builder.add_edge(starting_executor, coordinator) + + for specialist in specialists.values(): + builder.add_edge(coordinator, specialist) + builder.add_edge(specialist, coordinator) + + builder.add_edge(coordinator, user_gateway) + builder.add_edge(user_gateway, request_info) + builder.add_edge(request_info, user_gateway) + builder.add_edge(user_gateway, coordinator) # Route back to coordinator, not directly to agent + builder.add_edge(coordinator, starting_executor) # Coordinator sends trimmed request to agent + + if self._checkpoint_storage is not None: + builder = builder.with_checkpointing(self._checkpoint_storage) + + return builder.build() + + def _wrap_participant(self, participant: AgentProtocol | Executor) -> Executor: + """Ensure every participant is represented as an Executor instance.""" + if isinstance(participant, Executor): + return participant + if isinstance(participant, AgentProtocol): + name = getattr(participant, "name", None) + if not name: + raise ValueError( + "Agents used in handoff workflows must have a stable name so they can be addressed during routing." + ) + return AgentExecutor(participant, id=name) + raise TypeError(f"Participants must be AgentProtocol or Executor instances. Got {type(participant).__name__}.") + + def _resolve_to_id(self, candidate: str | AgentProtocol | Executor) -> str: + """Resolve a participant reference into a concrete executor identifier.""" + if isinstance(candidate, Executor): + return candidate.id + if isinstance(candidate, AgentProtocol): + name: str | None = getattr(candidate, "name", None) + if not name: + raise ValueError("AgentProtocol without a name cannot be resolved to an executor id.") + return self._aliases.get(name, name) + if isinstance(candidate, str): + if candidate in self._aliases: + return self._aliases[candidate] + return candidate + raise TypeError(f"Invalid starting agent reference: {type(candidate).__name__}") diff --git a/python/packages/core/tests/workflow/test_handoff.py b/python/packages/core/tests/workflow/test_handoff.py new file mode 100644 index 0000000000..ea8d7faead --- /dev/null +++ b/python/packages/core/tests/workflow/test_handoff.py @@ -0,0 +1,359 @@ +# Copyright (c) Microsoft. All rights reserved. + +from collections.abc import AsyncIterable, AsyncIterator +from dataclasses import dataclass +from typing import Any, cast + +import pytest + +from agent_framework import ( + AgentRunResponse, + AgentRunResponseUpdate, + BaseAgent, + ChatMessage, + FunctionCallContent, + HandoffBuilder, + HandoffUserInputRequest, + RequestInfoEvent, + Role, + TextContent, + WorkflowEvent, + WorkflowOutputEvent, +) + + +@dataclass +class _ComplexMetadata: + reason: str + payload: dict[str, str] + + +@pytest.fixture +def complex_metadata() -> _ComplexMetadata: + return _ComplexMetadata(reason="route", payload={"code": "X1"}) + + +def _metadata_from_conversation(conversation: list[ChatMessage], key: str) -> list[object]: + return [msg.additional_properties[key] for msg in conversation if key in msg.additional_properties] + + +def _conversation_debug(conversation: list[ChatMessage]) -> list[tuple[str, str | None, str]]: + return [ + (msg.role.value if hasattr(msg.role, "value") else str(msg.role), msg.author_name, msg.text) + for msg in conversation + ] + + +class _RecordingAgent(BaseAgent): + def __init__( + self, + *, + name: str, + handoff_to: str | None = None, + text_handoff: bool = False, + extra_properties: dict[str, object] | None = None, + ) -> None: + super().__init__(id=name, name=name, display_name=name) + self.handoff_to = handoff_to + self.calls: list[list[ChatMessage]] = [] + self._text_handoff = text_handoff + self._extra_properties = dict(extra_properties or {}) + self._call_index = 0 + + async def run( # type: ignore[override] + self, + messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None, + *, + thread: Any = None, + **kwargs: Any, + ) -> AgentRunResponse: + conversation = _normalise(messages) + self.calls.append(conversation) + additional_properties = _merge_additional_properties( + self.handoff_to, self._text_handoff, self._extra_properties + ) + contents = _build_reply_contents(self.name, self.handoff_to, self._text_handoff, self._next_call_id()) + reply = ChatMessage( + role=Role.ASSISTANT, + contents=contents, + author_name=self.display_name, + additional_properties=additional_properties, + ) + return AgentRunResponse(messages=[reply]) + + async def run_stream( # type: ignore[override] + self, + messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None, + *, + thread: Any = None, + **kwargs: Any, + ) -> AsyncIterator[AgentRunResponseUpdate]: + conversation = _normalise(messages) + self.calls.append(conversation) + additional_props = _merge_additional_properties(self.handoff_to, self._text_handoff, self._extra_properties) + contents = _build_reply_contents(self.name, self.handoff_to, self._text_handoff, self._next_call_id()) + yield AgentRunResponseUpdate( + contents=contents, + role=Role.ASSISTANT, + additional_properties=additional_props, + ) + + def _next_call_id(self) -> str | None: + if not self.handoff_to: + return None + call_id = f"{self.id}-handoff-{self._call_index}" + self._call_index += 1 + return call_id + + +def _merge_additional_properties( + handoff_to: str | None, use_text_hint: bool, extras: dict[str, object] +) -> dict[str, object]: + additional_properties: dict[str, object] = {} + if handoff_to and not use_text_hint: + additional_properties["handoff_to"] = handoff_to + additional_properties.update(extras) + return additional_properties + + +def _build_reply_contents( + agent_name: str, + handoff_to: str | None, + use_text_hint: bool, + call_id: str | None, +) -> list[TextContent | FunctionCallContent]: + contents: list[TextContent | FunctionCallContent] = [] + if handoff_to and call_id: + contents.append( + FunctionCallContent(call_id=call_id, name=f"handoff_to_{handoff_to}", arguments={"handoff_to": handoff_to}) + ) + text = f"{agent_name} reply" + if use_text_hint and handoff_to: + text += f"\nHANDOFF_TO: {handoff_to}" + contents.append(TextContent(text=text)) + return contents + + +def _normalise(messages: str | ChatMessage | list[str] | list[ChatMessage] | None) -> list[ChatMessage]: + if isinstance(messages, list): + result: list[ChatMessage] = [] + for msg in messages: + if isinstance(msg, ChatMessage): + result.append(msg) + elif isinstance(msg, str): + result.append(ChatMessage(Role.USER, text=msg)) + return result + if isinstance(messages, ChatMessage): + return [messages] + if isinstance(messages, str): + return [ChatMessage(Role.USER, text=messages)] + return [] + + +async def _drain(stream: AsyncIterable[WorkflowEvent]) -> list[WorkflowEvent]: + return [event async for event in stream] + + +async def test_handoff_routes_to_specialist_and_requests_user_input(): + triage = _RecordingAgent(name="triage", handoff_to="specialist") + specialist = _RecordingAgent(name="specialist") + + workflow = HandoffBuilder(participants=[triage, specialist]).set_coordinator("triage").build() + + events = await _drain(workflow.run_stream("Need help with a refund")) + + assert triage.calls, "Starting agent should receive initial conversation" + assert specialist.calls, "Specialist should be invoked after handoff" + assert len(specialist.calls[0]) == 2 # user + triage reply + + requests = [ev for ev in events if isinstance(ev, RequestInfoEvent)] + assert requests, "Workflow should request additional user input" + request_payload = requests[-1].data + assert isinstance(request_payload, HandoffUserInputRequest) + assert len(request_payload.conversation) == 4 # user, triage tool call, tool ack, specialist + assert request_payload.conversation[2].role == Role.TOOL + assert request_payload.conversation[3].role == Role.ASSISTANT + assert "specialist reply" in request_payload.conversation[3].text + + follow_up = await _drain(workflow.send_responses_streaming({requests[-1].request_id: "Thanks"})) + assert any(isinstance(ev, RequestInfoEvent) for ev in follow_up) + + +async def test_specialist_to_specialist_handoff(): + """Test that specialists can hand off to other specialists via .add_handoff() configuration.""" + triage = _RecordingAgent(name="triage", handoff_to="specialist") + specialist = _RecordingAgent(name="specialist", handoff_to="escalation") + escalation = _RecordingAgent(name="escalation") + + workflow = ( + HandoffBuilder(participants=[triage, specialist, escalation]) + .set_coordinator(triage) + .add_handoff(triage, [specialist, escalation]) + .add_handoff(specialist, escalation) + .with_termination_condition(lambda conv: sum(1 for m in conv if m.role == Role.USER) >= 2) + .build() + ) + + # Start conversation - triage hands off to specialist + events = await _drain(workflow.run_stream("Need technical support")) + requests = [ev for ev in events if isinstance(ev, RequestInfoEvent)] + assert requests + + # Specialist should have been called + assert len(specialist.calls) > 0 + + # Second user message - specialist hands off to escalation + events = await _drain(workflow.send_responses_streaming({requests[-1].request_id: "This is complex"})) + outputs = [ev for ev in events if isinstance(ev, WorkflowOutputEvent)] + assert outputs + + # Escalation should have been called + assert len(escalation.calls) > 0 + + +async def test_handoff_preserves_complex_additional_properties(complex_metadata: _ComplexMetadata): + triage = _RecordingAgent(name="triage", handoff_to="specialist", extra_properties={"complex": complex_metadata}) + specialist = _RecordingAgent(name="specialist") + + # Sanity check: agent response contains complex metadata before entering workflow + triage_response = await triage.run([ChatMessage(role=Role.USER, text="Need help with a return")]) + assert triage_response.messages + assert "complex" in triage_response.messages[0].additional_properties + + workflow = ( + HandoffBuilder(participants=[triage, specialist]) + .set_coordinator("triage") + .with_termination_condition(lambda conv: sum(1 for msg in conv if msg.role == Role.USER) >= 2) + .build() + ) + + # Initial run should preserve complex metadata in the triage response + events = await _drain(workflow.run_stream("Need help with a return")) + agent_events = [ev for ev in events if hasattr(ev, "data") and hasattr(ev.data, "messages")] + if agent_events: + first_agent_event = agent_events[0] + first_agent_event_data = first_agent_event.data + if first_agent_event_data and hasattr(first_agent_event_data, "messages"): + first_agent_message = first_agent_event_data.messages[0] # type: ignore[attr-defined] + assert "complex" in first_agent_message.additional_properties, "Agent event lost complex metadata" + requests = [ev for ev in events if isinstance(ev, RequestInfoEvent)] + assert requests, "Workflow should request additional user input" + + request_data = requests[-1].data + assert isinstance(request_data, HandoffUserInputRequest) + conversation_snapshot = request_data.conversation + metadata_values = _metadata_from_conversation(conversation_snapshot, "complex") + assert metadata_values, ( + "Expected triage message in conversation, found " + f"additional_properties={[msg.additional_properties for msg in conversation_snapshot]}," + f" messages={_conversation_debug(conversation_snapshot)}" + ) + assert any(isinstance(value, _ComplexMetadata) for value in metadata_values), ( + "Complex metadata lost after first hop" + ) + restored_meta = next(value for value in metadata_values if isinstance(value, _ComplexMetadata)) + assert restored_meta.payload["code"] == "X1" + + # Respond and ensure metadata survives subsequent cycles + follow_up_events = await _drain( + workflow.send_responses_streaming({requests[-1].request_id: "Here are more details"}) + ) + follow_up_requests = [ev for ev in follow_up_events if isinstance(ev, RequestInfoEvent)] + outputs = [ev for ev in follow_up_events if isinstance(ev, WorkflowOutputEvent)] + + follow_up_conversation: list[ChatMessage] + if follow_up_requests: + follow_up_request_data = follow_up_requests[-1].data + assert isinstance(follow_up_request_data, HandoffUserInputRequest) + follow_up_conversation = follow_up_request_data.conversation + else: + assert outputs, "Workflow produced neither follow-up request nor output" + output_data = outputs[-1].data + follow_up_conversation = cast(list[ChatMessage], output_data) if isinstance(output_data, list) else [] + + metadata_values_after = _metadata_from_conversation(follow_up_conversation, "complex") + assert metadata_values_after, "Expected triage message after follow-up" + assert any(isinstance(value, _ComplexMetadata) for value in metadata_values_after), ( + "Complex metadata lost after restore" + ) + + restored_meta_after = next(value for value in metadata_values_after if isinstance(value, _ComplexMetadata)) + assert restored_meta_after.payload["code"] == "X1" + + +async def test_tool_call_handoff_detection_with_text_hint(): + triage = _RecordingAgent(name="triage", handoff_to="specialist", text_handoff=True) + specialist = _RecordingAgent(name="specialist") + + workflow = HandoffBuilder(participants=[triage, specialist]).set_coordinator("triage").build() + + await _drain(workflow.run_stream("Package arrived broken")) + + assert specialist.calls, "Specialist should be invoked using handoff tool call" + assert len(specialist.calls[0]) >= 2 + + +def test_build_fails_without_coordinator(): + """Verify that build() raises ValueError when set_coordinator() was not called.""" + triage = _RecordingAgent(name="triage") + specialist = _RecordingAgent(name="specialist") + + with pytest.raises(ValueError, match="coordinator must be defined before build"): + HandoffBuilder(participants=[triage, specialist]).build() + + +def test_build_fails_without_participants(): + """Verify that build() raises ValueError when no participants are provided.""" + with pytest.raises(ValueError, match="No participants provided"): + HandoffBuilder().build() + + +async def test_multiple_runs_dont_leak_conversation(): + """Verify that running the same workflow multiple times doesn't leak conversation history.""" + triage = _RecordingAgent(name="triage", handoff_to="specialist") + specialist = _RecordingAgent(name="specialist") + + workflow = ( + HandoffBuilder(participants=[triage, specialist]) + .set_coordinator("triage") + .with_termination_condition(lambda conv: sum(1 for m in conv if m.role == Role.USER) >= 2) + .build() + ) + + # First run + events = await _drain(workflow.run_stream("First run message")) + requests = [ev for ev in events if isinstance(ev, RequestInfoEvent)] + assert requests + events = await _drain(workflow.send_responses_streaming({requests[-1].request_id: "Second message"})) + outputs = [ev for ev in events if isinstance(ev, WorkflowOutputEvent)] + assert outputs, "First run should emit output" + + first_run_conversation = outputs[-1].data + assert isinstance(first_run_conversation, list) + first_run_conv_list = cast(list[ChatMessage], first_run_conversation) + first_run_user_messages = [msg for msg in first_run_conv_list if msg.role == Role.USER] + assert len(first_run_user_messages) == 2 + assert any("First run message" in msg.text for msg in first_run_user_messages if msg.text) + + # Second run - should start fresh, not include first run's messages + triage.calls.clear() + specialist.calls.clear() + + events = await _drain(workflow.run_stream("Second run different message")) + requests = [ev for ev in events if isinstance(ev, RequestInfoEvent)] + assert requests + events = await _drain(workflow.send_responses_streaming({requests[-1].request_id: "Another message"})) + outputs = [ev for ev in events if isinstance(ev, WorkflowOutputEvent)] + assert outputs, "Second run should emit output" + + second_run_conversation = outputs[-1].data + assert isinstance(second_run_conversation, list) + second_run_conv_list = cast(list[ChatMessage], second_run_conversation) + second_run_user_messages = [msg for msg in second_run_conv_list if msg.role == Role.USER] + assert len(second_run_user_messages) == 2, ( + "Second run should have exactly 2 user messages, not accumulate first run" + ) + assert any("Second run different message" in msg.text for msg in second_run_user_messages if msg.text) + assert not any("First run message" in msg.text for msg in second_run_user_messages if msg.text), ( + "Second run should NOT contain first run's messages" + ) diff --git a/python/packages/core/tests/workflow/test_workflow_agent.py b/python/packages/core/tests/workflow/test_workflow_agent.py index 842ec142ef..567944a104 100644 --- a/python/packages/core/tests/workflow/test_workflow_agent.py +++ b/python/packages/core/tests/workflow/test_workflow_agent.py @@ -11,8 +11,9 @@ AgentRunUpdateEvent, ChatMessage, Executor, + FunctionApprovalRequestContent, + FunctionApprovalResponseContent, FunctionCallContent, - FunctionResultContent, RequestInfoExecutor, RequestInfoMessage, Role, @@ -163,35 +164,56 @@ async def test_end_to_end_request_info_handling(self): updates: list[AgentRunResponseUpdate] = [] async for update in agent.run_stream("Start request"): updates.append(update) - # Should have received a function call for the request info + # Should have received an approval request for the request info assert len(updates) > 0 - # Find the function call update (RequestInfoEvent converted to function call) - function_call_update: AgentRunResponseUpdate | None = None + approval_update: AgentRunResponseUpdate | None = None for update in updates: - if update.contents and hasattr(update.contents[0], "name") and update.contents[0].name == "request_info": # type: ignore[attr-defined] - function_call_update = update + if any(isinstance(content, FunctionApprovalRequestContent) for content in update.contents): + approval_update = update break - assert function_call_update is not None, "Should have received a request_info function call" - function_call: FunctionCallContent = function_call_update.contents[0] # type: ignore[assignment] + assert approval_update is not None, "Should have received a request_info approval request" + + function_call = next( + content for content in approval_update.contents if isinstance(content, FunctionCallContent) + ) + approval_request = next( + content for content in approval_update.contents if isinstance(content, FunctionApprovalRequestContent) + ) # Verify the function call has expected structure assert function_call.call_id is not None assert function_call.name == "request_info" assert isinstance(function_call.arguments, dict) - assert "request_id" in function_call.arguments + assert function_call.arguments.get("request_id") == approval_request.id + + # Approval request should reference the same function call + assert approval_request.function_call.call_id == function_call.call_id + assert approval_request.function_call.name == function_call.name # Verify the request is tracked in pending_requests assert len(agent.pending_requests) == 1 assert function_call.call_id in agent.pending_requests - # Now provide a function result response to test continuation - response_message = ChatMessage( - role=Role.USER, - contents=[FunctionResultContent(call_id=function_call.call_id, result="User provided answer")], + # Now provide an approval response with updated arguments to test continuation + response_args = WorkflowAgent.RequestInfoFunctionArgs( + request_id=approval_request.id, + data="User provided answer", + ).to_dict() + + approval_response = FunctionApprovalResponseContent( + approved=True, + id=approval_request.id, + function_call=FunctionCallContent( + call_id=function_call.call_id, + name=function_call.name, + arguments=response_args, + ), ) + response_message = ChatMessage(role=Role.USER, contents=[approval_response]) + # Continue the workflow with the response continuation_result = await agent.run(response_message) diff --git a/python/packages/purview/README.md b/python/packages/purview/README.md index b42213b4e5..b52fdca807 100644 --- a/python/packages/purview/README.md +++ b/python/packages/purview/README.md @@ -62,16 +62,16 @@ If a policy violation is detected on the prompt, the middleware terminates the r `PurviewClient` uses the `azure-identity` library for token acquisition. You can use any `TokenCredential` or `AsyncTokenCredential` implementation. The APIs require the following Graph Permissions: -- ProtectionScopes.Compute.All : (userProtectionScopeContainer)[https://learn.microsoft.com/en-us/graph/api/userprotectionscopecontainer-compute] -- Content.Process.All : (processContent)[https://learn.microsoft.com/en-us/graph/api/userdatasecurityandgovernance-processcontent] -- ContentActivity.Write : (contentActivity)[https://learn.microsoft.com/en-us/graph/api/activitiescontainer-post-contentactivities] +- ProtectionScopes.Compute.All : [userProtectionScopeContainer](https://learn.microsoft.com/en-us/graph/api/userprotectionscopecontainer-compute) +- Content.Process.All : [processContent](https://learn.microsoft.com/en-us/graph/api/userdatasecurityandgovernance-processcontent) +- ContentActivity.Write : [contentActivity](https://learn.microsoft.com/en-us/graph/api/activitiescontainer-post-contentactivities) ### Scopes `PurviewSettings.get_scopes()` derives the Graph scope list (currently `https://graph.microsoft.com/.default` style). ### Tenant Enablement for Purview - The tenant requires an e5 license and consumptive billing setup. -- There need to be (Data Loss Prevention)[https://learn.microsoft.com/en-us/purview/dlp-create-deploy-policy] or (Data Collection Policies)[https://learn.microsoft.com/en-us/purview/collection-policies-policy-reference] that apply to the user to call Process Content API else it calls Content Activities API for auditing the message. +- There need to be [Data Loss Prevention](https://learn.microsoft.com/en-us/purview/dlp-create-deploy-policy) or [Data Collection Policies](https://learn.microsoft.com/en-us/purview/collection-policies-policy-reference) that apply to the user to call Process Content API else it calls Content Activities API for auditing the message. --- diff --git a/python/samples/getting_started/observability/workflow_observability.py b/python/samples/getting_started/observability/workflow_observability.py index 10e30024ef..9b56def216 100644 --- a/python/samples/getting_started/observability/workflow_observability.py +++ b/python/samples/getting_started/observability/workflow_observability.py @@ -77,7 +77,7 @@ async def run_sequential_workflow() -> None: print(f"Starting workflow with input: '{input_text}'") output_event = None - async for event in workflow.run_stream(input_text): + async for event in workflow.run_stream("Hello world"): if isinstance(event, WorkflowOutputEvent): # The WorkflowOutputEvent contains the final result. output_event = event diff --git a/python/samples/getting_started/workflows/README.md b/python/samples/getting_started/workflows/README.md index 17780a7aac..26e8cdd3ba 100644 --- a/python/samples/getting_started/workflows/README.md +++ b/python/samples/getting_started/workflows/README.md @@ -89,6 +89,8 @@ Once comfortable with these, explore the rest of the samples below. | Concurrent Orchestration (Default Aggregator) | [orchestration/concurrent_agents.py](./orchestration/concurrent_agents.py) | Fan-out to multiple agents; fan-in with default aggregator returning combined ChatMessages | | Concurrent Orchestration (Custom Aggregator) | [orchestration/concurrent_custom_aggregator.py](./orchestration/concurrent_custom_aggregator.py) | Override aggregator via callback; summarize results with an LLM | | Concurrent Orchestration (Custom Agent Executors) | [orchestration/concurrent_custom_agent_executors.py](./orchestration/concurrent_custom_agent_executors.py) | Child executors own ChatAgents; concurrent fan-out/fan-in via ConcurrentBuilder | +| Handoff (Simple) | [orchestration/handoff_simple.py](./orchestration/handoff_simple.py) | Single-tier routing: triage agent routes to specialists, control returns to user after each specialist response | +| Handoff (Specialist-to-Specialist) | [orchestration/handoff_specialist_to_specialist.py](./orchestration/handoff_specialist_to_specialist.py) | Multi-tier routing: specialists can hand off to other specialists using `.add_handoff()` fluent API | | Magentic Workflow (Multi-Agent) | [orchestration/magentic.py](./orchestration/magentic.py) | Orchestrate multiple agents with Magentic manager and streaming | | Magentic + Human Plan Review | [orchestration/magentic_human_plan_update.py](./orchestration/magentic_human_plan_update.py) | Human reviews/updates the plan before execution | | Magentic + Checkpoint Resume | [orchestration/magentic_checkpoint.py](./orchestration/magentic_checkpoint.py) | Resume Magentic orchestration from saved checkpoints | @@ -97,6 +99,11 @@ Once comfortable with these, explore the rest of the samples below. **Magentic checkpointing tip**: Treat `MagenticBuilder.participants` keys as stable identifiers. When resuming from a checkpoint, the rebuilt workflow must reuse the same participant names; otherwise the checkpoint cannot be applied and the run will fail fast. +**Handoff workflow tip**: Handoff workflows maintain the full conversation history including any +`ChatMessage.additional_properties` emitted by your agents. This ensures routing metadata remains +intact across all agent transitions. For specialist-to-specialist handoffs, use `.add_handoff(source, targets)` +to configure which agents can route to which others with a fluent, type-safe API. + ### parallelism | Sample | File | Concepts | diff --git a/python/samples/getting_started/workflows/orchestration/handoff_simple.py b/python/samples/getting_started/workflows/orchestration/handoff_simple.py new file mode 100644 index 0000000000..6092083266 --- /dev/null +++ b/python/samples/getting_started/workflows/orchestration/handoff_simple.py @@ -0,0 +1,337 @@ +# Copyright (c) Microsoft. All rights reserved. + +import asyncio +from collections.abc import AsyncIterable +from typing import cast + +from agent_framework import ( + ChatAgent, + ChatMessage, + HandoffBuilder, + HandoffUserInputRequest, + RequestInfoEvent, + WorkflowEvent, + WorkflowOutputEvent, + WorkflowRunState, + WorkflowStatusEvent, +) +from agent_framework.azure import AzureOpenAIChatClient +from azure.identity import AzureCliCredential + +"""Sample: Simple handoff workflow with single-tier triage-to-specialist routing. + +This sample demonstrates the basic handoff pattern where only the triage agent can +route to specialists. Specialists cannot hand off to other specialists - after any +specialist responds, control returns to the user for the next input. + +Routing Pattern: + User → Triage Agent → Specialist → Back to User → Triage Agent → ... + +This is the simplest handoff configuration, suitable for straightforward support +scenarios where a triage agent dispatches to domain specialists, and each specialist +works independently. + +For multi-tier specialist-to-specialist handoffs, see handoff_specialist_to_specialist.py. + +Prerequisites: + - `az login` (Azure CLI authentication) + - Environment variables configured for AzureOpenAIChatClient (AZURE_OPENAI_ENDPOINT, etc.) + +Key Concepts: + - Single-tier routing: Only triage agent has handoff capabilities + - Auto-registered handoff tools: HandoffBuilder creates tools automatically + - Termination condition: Controls when the workflow stops requesting user input + - Request/response cycle: Workflow requests input, user responds, cycle continues +""" + + +def create_agents(chat_client: AzureOpenAIChatClient) -> tuple[ChatAgent, ChatAgent, ChatAgent, ChatAgent]: + """Create and configure the triage and specialist agents. + + The triage agent is responsible for: + - Receiving all user input first + - Deciding whether to handle the request directly or hand off to a specialist + - Signaling handoff by calling one of the explicit handoff tools exposed to it + + Specialist agents are invoked only when the triage agent explicitly hands off to them. + After a specialist responds, control returns to the triage agent. + + Returns: + Tuple of (triage_agent, refund_agent, order_agent, support_agent) + """ + # Triage agent: Acts as the frontline dispatcher + # NOTE: The instructions explicitly tell it to call the correct handoff tool when routing. + # The HandoffBuilder intercepts these tool calls and routes to the matching specialist. + triage = chat_client.create_agent( + instructions=( + "You are frontline support triage. Read the latest user message and decide whether " + "to hand off to refund_agent, order_agent, or support_agent. Provide a brief natural-language " + "response for the user. When delegation is required, call the matching handoff tool " + "(`handoff_to_refund_agent`, `handoff_to_order_agent`, or `handoff_to_support_agent`)." + ), + name="triage_agent", + ) + + # Refund specialist: Handles refund requests + refund = chat_client.create_agent( + instructions=( + "You handle refund workflows. Ask for any order identifiers you require and outline the refund steps." + ), + name="refund_agent", + ) + + # Order/shipping specialist: Resolves delivery issues + order = chat_client.create_agent( + instructions=( + "You resolve shipping and fulfillment issues. Clarify the delivery problem and describe the actions " + "you will take to remedy it." + ), + name="order_agent", + ) + + # General support specialist: Fallback for other issues + support = chat_client.create_agent( + instructions=( + "You are a general support agent. Offer empathetic troubleshooting and gather missing details if the " + "issue does not match other specialists." + ), + name="support_agent", + ) + + return triage, refund, order, support + + +async def _drain(stream: AsyncIterable[WorkflowEvent]) -> list[WorkflowEvent]: + """Collect all events from an async stream into a list. + + This helper drains the workflow's event stream so we can process events + synchronously after each workflow step completes. + + Args: + stream: Async iterable of WorkflowEvent + + Returns: + List of all events from the stream + """ + return [event async for event in stream] + + +def _handle_events(events: list[WorkflowEvent]) -> list[RequestInfoEvent]: + """Process workflow events and extract any pending user input requests. + + This function inspects each event type and: + - Prints workflow status changes (IDLE, IDLE_WITH_PENDING_REQUESTS, etc.) + - Displays final conversation snapshots when workflow completes + - Prints user input request prompts + - Collects all RequestInfoEvent instances for response handling + + Args: + events: List of WorkflowEvent to process + + Returns: + List of RequestInfoEvent representing pending user input requests + """ + requests: list[RequestInfoEvent] = [] + + for event in events: + # WorkflowStatusEvent: Indicates workflow state changes + if isinstance(event, WorkflowStatusEvent) and event.state in { + WorkflowRunState.IDLE, + WorkflowRunState.IDLE_WITH_PENDING_REQUESTS, + }: + print(f"[status] {event.state.name}") + + # WorkflowOutputEvent: Contains the final conversation when workflow terminates + elif isinstance(event, WorkflowOutputEvent): + conversation = cast(list[ChatMessage], event.data) + if isinstance(conversation, list): + print("\n=== Final Conversation Snapshot ===") + for message in conversation: + speaker = message.author_name or message.role.value + print(f"- {speaker}: {message.text}") + print("===================================") + + # RequestInfoEvent: Workflow is requesting user input + elif isinstance(event, RequestInfoEvent): + if isinstance(event.data, HandoffUserInputRequest): + _print_handoff_request(event.data) + requests.append(event) + + return requests + + +def _print_handoff_request(request: HandoffUserInputRequest) -> None: + """Display a user input request prompt with conversation context. + + The HandoffUserInputRequest contains the full conversation history so far, + allowing the user to see what's been discussed before providing their next input. + + Args: + request: The user input request containing conversation and prompt + """ + print("\n=== User Input Requested ===") + for message in request.conversation: + speaker = message.author_name or message.role.value + print(f"- {speaker}: {message.text}") + print("============================") + + +async def main() -> None: + """Main entry point for the handoff workflow demo. + + This function demonstrates: + 1. Creating triage and specialist agents + 2. Building a handoff workflow with custom termination condition + 3. Running the workflow with scripted user responses + 4. Processing events and handling user input requests + + The workflow uses scripted responses instead of interactive input to make + the demo reproducible and testable. In a production application, you would + replace the scripted_responses with actual user input collection. + """ + # Initialize the Azure OpenAI chat client + chat_client = AzureOpenAIChatClient(credential=AzureCliCredential()) + + # Create all agents: triage + specialists + triage, refund, order, support = create_agents(chat_client) + + # Build the handoff workflow + # - participants: All agents that can participate (triage MUST be first or explicitly set as set_coordinator) + # - set_coordinator: The triage agent receives all user input first + # - with_termination_condition: Custom logic to stop the request/response loop + # Default is 10 user messages; here we terminate after 4 to match our scripted demo + workflow = ( + HandoffBuilder( + name="customer_support_handoff", + participants=[triage, refund, order, support], + ) + .set_coordinator("triage_agent") + .with_termination_condition( + # Terminate after 4 user messages (initial + 3 scripted responses) + # Count only USER role messages to avoid counting agent responses + lambda conv: sum(1 for msg in conv if msg.role.value == "user") >= 4 + ) + .build() + ) + + # Scripted user responses for reproducible demo + # In a console application, replace this with: + # user_input = input("Your response: ") + # or integrate with a UI/chat interface + scripted_responses = [ + "My order 1234 arrived damaged and the packaging was destroyed.", + "Yes, I'd like a refund if that's possible.", + "Thanks for resolving this.", + ] + + # Start the workflow with the initial user message + # run_stream() returns an async iterator of WorkflowEvent + print("\n[Starting workflow with initial user message...]") + events = await _drain(workflow.run_stream("Hello, I need assistance with my recent purchase.")) + pending_requests = _handle_events(events) + + # Process the request/response cycle + # The workflow will continue requesting input until: + # 1. The termination condition is met (4 user messages in this case), OR + # 2. We run out of scripted responses + while pending_requests and scripted_responses: + # Get the next scripted response + user_response = scripted_responses.pop(0) + print(f"\n[User responding: {user_response}]") + + # Send response(s) to all pending requests + # In this demo, there's typically one request per cycle, but the API supports multiple + responses = {req.request_id: user_response for req in pending_requests} + + # Send responses and get new events + events = await _drain(workflow.send_responses_streaming(responses)) + pending_requests = _handle_events(events) + + """ + Sample Output: + + [Starting workflow with initial user message...] + + === User Input Requested === + - user: Hello, I need assistance with my recent purchase. + - triage_agent: I'd be happy to help you with your recent purchase. Could you please provide more details about the issue you're experiencing? + ============================ + [status] IDLE_WITH_PENDING_REQUESTS + + [User responding: My order 1234 arrived damaged and the packaging was destroyed.] + + === User Input Requested === + - user: Hello, I need assistance with my recent purchase. + - triage_agent: I'd be happy to help you with your recent purchase. Could you please provide more details about the issue you're experiencing? + - user: My order 1234 arrived damaged and the packaging was destroyed. + - triage_agent: I'm sorry to hear that your order arrived damaged and the packaging was destroyed. I will connect you with a specialist who can assist you further with this issue. + + Tool Call: handoff_to_support_agent (awaiting approval) + - support_agent: I'm so sorry to hear that your order arrived in such poor condition. I'll help you get this sorted out. + + To assist you better, could you please let me know: + - Which item(s) from order 1234 arrived damaged? + - Could you describe the damage, or provide photos if possible? + - Would you prefer a replacement or a refund? + + Once I have this information, I can help resolve this for you as quickly as possible. + ============================ + [status] IDLE_WITH_PENDING_REQUESTS + + [User responding: Yes, I'd like a refund if that's possible.] + + === User Input Requested === + - user: Hello, I need assistance with my recent purchase. + - triage_agent: I'd be happy to help you with your recent purchase. Could you please provide more details about the issue you're experiencing? + - user: My order 1234 arrived damaged and the packaging was destroyed. + - triage_agent: I'm sorry to hear that your order arrived damaged and the packaging was destroyed. I will connect you with a specialist who can assist you further with this issue. + + Tool Call: handoff_to_support_agent (awaiting approval) + - support_agent: I'm so sorry to hear that your order arrived in such poor condition. I'll help you get this sorted out. + + To assist you better, could you please let me know: + - Which item(s) from order 1234 arrived damaged? + - Could you describe the damage, or provide photos if possible? + - Would you prefer a replacement or a refund? + + Once I have this information, I can help resolve this for you as quickly as possible. + - user: Yes, I'd like a refund if that's possible. + - triage_agent: Thank you for letting me know you'd prefer a refund. I'll connect you with a specialist who can process your refund request. + + Tool Call: handoff_to_refund_agent (awaiting approval) + - refund_agent: Thank you for confirming that you'd like a refund for order 1234. + + Here's what will happen next: + + ... + + Tool Call: handoff_to_refund_agent (awaiting approval) + - refund_agent: Thank you for confirming that you'd like a refund for order 1234. + + Here's what will happen next: + + **1. Verification:** + I will need to verify a few more details to proceed. + - Can you confirm the items in order 1234 that arrived damaged? + - Do you have any photos of the damaged items/packaging? (Photos help speed up the process.) + + **2. Refund Request Submission:** + - Once I have the details, I will submit your refund request for review. + + **3. Return Instructions (if needed):** + - In some cases, we may provide instructions on how to return the damaged items. + - You will receive a prepaid return label if necessary. + + **4. Refund Processing:** + - After your request is approved (and any returns are received if required), your refund will be processed. + - Refunds usually appear on your original payment method within 5-10 business days. + + Could you please reply with the specific item(s) damaged and, if possible, attach photos? This will help me get your refund started right away. + - user: Thanks for resolving this. + =================================== + [status] IDLE + """ # noqa: E501 + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/python/samples/getting_started/workflows/orchestration/handoff_specialist_to_specialist.py b/python/samples/getting_started/workflows/orchestration/handoff_specialist_to_specialist.py new file mode 100644 index 0000000000..5e92a6325c --- /dev/null +++ b/python/samples/getting_started/workflows/orchestration/handoff_specialist_to_specialist.py @@ -0,0 +1,286 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Sample: Multi-tier handoff workflow with specialist-to-specialist routing. + +This sample demonstrates advanced handoff routing where specialist agents can hand off +to other specialists, enabling complex multi-tier workflows. Unlike the simple handoff +pattern (see handoff_simple.py), specialists here can delegate to other specialists +without returning control to the user until the specialist chain completes. + +Routing Pattern: + User → Triage → Specialist A → Specialist B → Back to User + +This pattern is useful for complex support scenarios where different specialists need +to collaborate or escalate to each other before returning to the user. For example: + - Replacement agent needs shipping info → hands off to delivery agent + - Technical support needs billing info → hands off to billing agent + - Level 1 support escalates to Level 2 → hands off to escalation agent + +Configuration uses `.add_handoff()` to explicitly define the routing graph. + +Prerequisites: + - `az login` (Azure CLI authentication) + - Environment variables configured for AzureOpenAIChatClient +""" + +import asyncio +from collections.abc import AsyncIterable +from typing import cast + +from agent_framework import ( + ChatMessage, + HandoffBuilder, + HandoffUserInputRequest, + RequestInfoEvent, + WorkflowEvent, + WorkflowOutputEvent, + WorkflowRunState, + WorkflowStatusEvent, +) +from agent_framework.azure import AzureOpenAIChatClient +from azure.identity import AzureCliCredential + + +def create_agents(chat_client: AzureOpenAIChatClient): + """Create triage and specialist agents with multi-tier handoff capabilities. + + Returns: + Tuple of (triage_agent, replacement_agent, delivery_agent, billing_agent) + """ + triage = chat_client.create_agent( + instructions=( + "You are a customer support triage agent. Assess the user's issue and route appropriately:\n" + "- For product replacement issues: call handoff_to_replacement_agent\n" + "- For delivery/shipping inquiries: call handoff_to_delivery_agent\n" + "- For billing/payment issues: call handoff_to_billing_agent\n" + "Be concise and friendly." + ), + name="triage_agent", + ) + + replacement = chat_client.create_agent( + instructions=( + "You handle product replacement requests. Ask for order number and reason for replacement.\n" + "If the user also needs shipping/delivery information, call handoff_to_delivery_agent to " + "get tracking details. Otherwise, process the replacement and confirm with the user.\n" + "Be concise and helpful." + ), + name="replacement_agent", + ) + + delivery = chat_client.create_agent( + instructions=( + "You handle shipping and delivery inquiries. Provide tracking information, estimated " + "delivery dates, and address any delivery concerns.\n" + "If billing issues come up, call handoff_to_billing_agent.\n" + "Be concise and clear." + ), + name="delivery_agent", + ) + + billing = chat_client.create_agent( + instructions=( + "You handle billing and payment questions. Help with refunds, payment methods, " + "and invoice inquiries. Be concise." + ), + name="billing_agent", + ) + + return triage, replacement, delivery, billing + + +async def _drain(stream: AsyncIterable[WorkflowEvent]) -> list[WorkflowEvent]: + """Collect all events from an async stream into a list.""" + return [event async for event in stream] + + +def _handle_events(events: list[WorkflowEvent]) -> list[RequestInfoEvent]: + """Process workflow events and extract pending user input requests.""" + requests: list[RequestInfoEvent] = [] + + for event in events: + if isinstance(event, WorkflowStatusEvent) and event.state in { + WorkflowRunState.IDLE, + WorkflowRunState.IDLE_WITH_PENDING_REQUESTS, + }: + print(f"[status] {event.state.name}") + + elif isinstance(event, WorkflowOutputEvent): + conversation = cast(list[ChatMessage], event.data) + if isinstance(conversation, list): + print("\n=== Final Conversation ===") + for message in conversation: + # Filter out messages with no text (tool calls) + if not message.text.strip(): + continue + speaker = message.author_name or message.role.value + print(f"- {speaker}: {message.text}") + print("==========================") + + elif isinstance(event, RequestInfoEvent): + if isinstance(event.data, HandoffUserInputRequest): + _print_handoff_request(event.data) + requests.append(event) + + return requests + + +def _print_handoff_request(request: HandoffUserInputRequest) -> None: + """Display a user input request with conversation context.""" + print("\n=== User Input Requested ===") + # Filter out messages with no text for cleaner display + messages_with_text = [msg for msg in request.conversation if msg.text.strip()] + print(f"Last {len(messages_with_text)} messages in conversation:") + for message in messages_with_text[-5:]: # Show last 5 for brevity + speaker = message.author_name or message.role.value + text = message.text[:100] + "..." if len(message.text) > 100 else message.text + print(f" {speaker}: {text}") + print("============================") + + +async def main() -> None: + """Demonstrate specialist-to-specialist handoffs in a multi-tier support scenario. + + This sample shows: + 1. Triage agent routes to replacement specialist + 2. Replacement specialist hands off to delivery specialist + 3. Delivery specialist can hand off to billing if needed + 4. All transitions are seamless without returning to user until complete + + The workflow configuration explicitly defines which agents can hand off to which others: + - triage_agent → replacement_agent, delivery_agent, billing_agent + - replacement_agent → delivery_agent, billing_agent + - delivery_agent → billing_agent + """ + chat_client = AzureOpenAIChatClient(credential=AzureCliCredential()) + triage, replacement, delivery, billing = create_agents(chat_client) + + # Configure multi-tier handoffs using fluent add_handoff() API + # This allows specialists to hand off to other specialists + workflow = ( + HandoffBuilder( + name="multi_tier_support", + participants=[triage, replacement, delivery, billing], + ) + .set_coordinator(triage) + .add_handoff(triage, [replacement, delivery, billing]) # Triage can route to any specialist + .add_handoff(replacement, [delivery, billing]) # Replacement can delegate to delivery or billing + .add_handoff(delivery, billing) # Delivery can escalate to billing + # Termination condition: Stop when more than 4 user messages exist. + # This allows agents to respond to the 4th user message before the 5th triggers termination. + # In this sample: initial message + 3 scripted responses = 4 messages, then 5th message ends workflow. + .with_termination_condition(lambda conv: sum(1 for msg in conv if msg.role.value == "user") > 4) + .build() + ) + + # Scripted user responses simulating a multi-tier handoff scenario + # Note: The initial run_stream() call sends the first user message, + # then these scripted responses are sent in sequence (total: 4 user messages). + # A 5th response triggers termination after agents respond to the 4th message. + scripted_responses = [ + "I need help with order 12345. I want a replacement and need to know when it will arrive.", + "The item arrived damaged. I'd like a replacement shipped to the same address.", + "Great! Can you confirm the shipping cost won't be charged again?", + "Thank you!", # Final response to trigger termination after billing agent answers + ] + + print("\n" + "=" * 80) + print("SPECIALIST-TO-SPECIALIST HANDOFF DEMONSTRATION") + print("=" * 80) + print("\nScenario: Customer needs replacement + shipping info + billing confirmation") + print("Expected flow: User → Triage → Replacement → Delivery → Billing → User") + print("=" * 80 + "\n") + + # Start workflow with initial message + print("[User]: I need help with order 12345. I want a replacement and need to know when it will arrive.\n") + events = await _drain( + workflow.run_stream("I need help with order 12345. I want a replacement and need to know when it will arrive.") + ) + pending_requests = _handle_events(events) + + # Process scripted responses + response_index = 0 + while pending_requests and response_index < len(scripted_responses): + user_response = scripted_responses[response_index] + print(f"\n[User]: {user_response}\n") + + responses = {req.request_id: user_response for req in pending_requests} + events = await _drain(workflow.send_responses_streaming(responses)) + pending_requests = _handle_events(events) + + response_index += 1 + + """ + Sample Output: + + ================================================================================ + SPECIALIST-TO-SPECIALIST HANDOFF DEMONSTRATION + ================================================================================ + + Scenario: Customer needs replacement + shipping info + billing confirmation + Expected flow: User → Triage → Replacement → Delivery → Billing → User + ================================================================================ + + [User]: I need help with order 12345. I want a replacement and need to know when it will arrive. + + + === User Input Requested === + Last 5 messages in conversation: + user: I need help with order 12345. I want a replacement and need to know when it will arrive. + triage_agent: I'm connecting you to our replacement team to assist with your request, and to our delivery team for... + replacement_agent: To assist with your replacement for order 12345 and provide tracking details for delivery, I've reac... + delivery_agent: I'm handing over your request for a replacement of order 12345, as well as your inquiry about estima... + billing_agent: I handle billing and payment questions. For replacement and delivery details for order 12345, please... + ============================ + [status] IDLE_WITH_PENDING_REQUESTS + + [User]: I need help with order 12345. I want a replacement and need to know when it will arrive. + + + === User Input Requested === + Last 7 messages in conversation: + replacement_agent: To assist with your replacement for order 12345 and provide tracking details for delivery, I've reac... + delivery_agent: I'm handing over your request for a replacement of order 12345, as well as your inquiry about estima... + billing_agent: I handle billing and payment questions. For replacement and delivery details for order 12345, please... + user: I need help with order 12345. I want a replacement and need to know when it will arrive. + triage_agent: I'm connecting you with our replacement team to help with your request, and our delivery team for in... + ============================ + [status] IDLE_WITH_PENDING_REQUESTS + + [User]: The item arrived damaged. I'd like a replacement shipped to the same address. + + + === User Input Requested === + Last 12 messages in conversation: + user: The item arrived damaged. I'd like a replacement shipped to the same address. + triage_agent: Thank you for letting us know your item from order 12345 arrived damaged. Im connecting you to our ... + replacement_agent: Your replacement request for order 12345 (damaged item) will be processed, and shipped to your curre... + delivery_agent: I've forwarded your request for a replacement of the damaged item from order 12345 to our team and a... + billing_agent: Thank you for the information. I've forwarded your request to our support team to arrange a replacem... + ============================ + [status] IDLE_WITH_PENDING_REQUESTS + + [User]: Great! Can you confirm the shipping cost won't be charged again? + + + === Final Conversation === + - user: I need help with order 12345. I want a replacement and need to know when it will arrive. + - triage_agent: I'm connecting you to our replacement team to assist with your request, and to our delivery team for updates on when your replacement will arrive. They'll help you shortly! + - replacement_agent: To assist with your replacement for order 12345 and provide tracking details for delivery, I've reached out to our teams. Can you please provide the reason for your replacement request? This will help to process things faster. + - delivery_agent: I'm handing over your request for a replacement of order 12345, as well as your inquiry about estimated delivery timing, to the appropriate team. They'll assist you further with the replacement and delivery details very soon. + - billing_agent: I handle billing and payment questions. For replacement and delivery details for order 12345, please contact our support team or the order department. Is there a billing or payment issue I can help you with? + - user: I need help with order 12345. I want a replacement and need to know when it will arrive. + - triage_agent: I'm connecting you with our replacement team to help with your request, and our delivery team for information about when your replacement will arrive. They'll assist you shortly! + - user: The item arrived damaged. I'd like a replacement shipped to the same address. + - triage_agent: Thank you for letting us know your item from order 12345 arrived damaged. I'm connecting you to our replacement team to arrange a replacement shipped to your address, and to our delivery team to update you on the arrival date of your replacement. They'll assist you shortly! + - replacement_agent: Your replacement request for order 12345 (damaged item) will be processed, and shipped to your current address. The delivery team will also provide tracking and estimated arrival details for your new item soon. + - delivery_agent: I've forwarded your request for a replacement of the damaged item from order 12345 to our team and asked them to ship it to your current address. You'll also receive tracking information and an estimated delivery date for the replacement soon. + - billing_agent: Thank you for the information. I've forwarded your request to our support team to arrange a replacement for the damaged item from order 12345. Your replacement will be shipped to the same address, and you'll receive delivery updates soon. If you need a refund instead or have any billing questions, please let me know. + - user: Great! Can you confirm the shipping cost won't be charged again? + ========================== + [status] IDLE + """ # noqa: E501 + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/python/samples/semantic-kernel-migration/README.md b/python/samples/semantic-kernel-migration/README.md index e3261a58e3..7c18db8c5b 100644 --- a/python/samples/semantic-kernel-migration/README.md +++ b/python/samples/semantic-kernel-migration/README.md @@ -4,13 +4,40 @@ This gallery helps Semantic Kernel (SK) developers move to the Microsoft Agent Framework (AF) with minimal guesswork. Each script pairs SK code with its AF equivalent so you can compare primitives, tooling, and orchestration patterns side by side while you migrate production workloads. ## What’s Included -- `chat_completion/` – SK `ChatCompletionAgent` scenarios and their AF `ChatAgent` counterparts (basic chat, tooling, threading/streaming). -- `azure_ai_agent/` – Remote Azure AI agent examples, including hosted code interpreter and explicit thread reuse. -- `openai_assistant/` – Assistants API migrations covering basic usage, code interpreter, and custom function tools. -- `openai_responses/` – Responses API parity samples with tooling and structured JSON output. -- `copilot_studio/` – Copilot Studio agent parity, tools, and streaming examples. -- `orchestrations/` – Sequential, Concurrent, and Magentic workflow migrations that mirror SK Team abstractions. -- `processes/` – Fan-out/fan-in and nested process examples that contrast SK’s Process Framework with AF workflows. + +### Chat completion parity +- [01_basic_chat_completion.py](chat_completion/01_basic_chat_completion.py) — Minimal SK `ChatCompletionAgent` and AF `ChatAgent` conversation. +- [02_chat_completion_with_tool.py](chat_completion/02_chat_completion_with_tool.py) — Adds a simple tool/function call in both SDKs. +- [03_chat_completion_thread_and_stream.py](chat_completion/03_chat_completion_thread_and_stream.py) — Demonstrates thread reuse and streaming prompts. + +### Azure AI agent parity +- [01_basic_azure_ai_agent.py](azure_ai_agent/01_basic_azure_ai_agent.py) — Create and run an Azure AI agent end to end. +- [02_azure_ai_agent_with_code_interpreter.py](azure_ai_agent/02_azure_ai_agent_with_code_interpreter.py) — Enable hosted code interpreter/tool execution. +- [03_azure_ai_agent_threads_and_followups.py](azure_ai_agent/03_azure_ai_agent_threads_and_followups.py) — Persist threads and follow-ups across invocations. + +### OpenAI Assistants API parity +- [01_basic_openai_assistant.py](openai_assistant/01_basic_openai_assistant.py) — Baseline assistant comparison. +- [02_openai_assistant_with_code_interpreter.py](openai_assistant/02_openai_assistant_with_code_interpreter.py) — Code interpreter tool usage. +- [03_openai_assistant_function_tool.py](openai_assistant/03_openai_assistant_function_tool.py) — Custom function tooling. + +### OpenAI Responses API parity +- [01_basic_responses_agent.py](openai_responses/01_basic_responses_agent.py) — Basic responses agent migration. +- [02_responses_agent_with_tool.py](openai_responses/02_responses_agent_with_tool.py) — Tool-augmented responses workflows. +- [03_responses_agent_structured_output.py](openai_responses/03_responses_agent_structured_output.py) — Structured JSON output alignment. + +### Copilot Studio parity +- [01_basic_copilot_studio_agent.py](copilot_studio/01_basic_copilot_studio_agent.py) — Minimal Copilot Studio agent invocation. +- [02_copilot_studio_streaming.py](copilot_studio/02_copilot_studio_streaming.py) — Streaming responses from Copilot Studio agents. + +### Orchestrations +- [sequential.py](orchestrations/sequential.py) — Step-by-step SK Team → AF `SequentialBuilder` migration. +- [concurrent_basic.py](orchestrations/concurrent_basic.py) — Concurrent orchestration parity. +- [handoff.py](orchestrations/handoff.py) — Support triage handoff migration with specialist routing. +- [magentic.py](orchestrations/magentic.py) — Magentic Team orchestration vs. AF builder wiring. + +### Processes +- [fan_out_fan_in_process.py](processes/fan_out_fan_in_process.py) — Fan-out/fan-in comparison between SK Process Framework and AF workflows. +- [nested_process.py](processes/nested_process.py) — Nested process orchestration vs. AF sub-workflows. Each script is fully async and the `main()` routine runs both implementations back to back so you can observe their outputs in a single execution. @@ -23,14 +50,14 @@ Each script is fully async and the `main()` routine runs both implementations ba ## Running Single-Agent Samples From the repository root: ``` -python samantic-kernel-migration/chat_completion/01_basic_chat_completion.py +python samples/semantic-kernel-migration/chat_completion/01_basic_chat_completion.py ``` Every script accepts no CLI arguments and will first call the SK implementation, followed by the AF version. Adjust the prompt or credentials inside the file as necessary before running. ## Running Orchestration & Workflow Samples -Advanced comparisons are split between `samantic-kernel-migration/orchestrations` (Sequential, Concurrent, Magentic) and `samantic-kernel-migration/processes` (fan-out/fan-in, nested). You can run them directly, or isolate dependencies in a throwaway virtual environment: +Advanced comparisons are split between `samples/semantic-kernel-migration/orchestrations` (Sequential, Concurrent, Group Chat, Handoff, Magentic) and `samples/semantic-kernel-migration/processes` (fan-out/fan-in, nested). You can run them directly, or isolate dependencies in a throwaway virtual environment: ``` -cd samantic-kernel-migration +cd samples/semantic-kernel-migration uv venv --python 3.10 .venv-migration source .venv-migration/bin/activate uv pip install semantic-kernel agent-framework diff --git a/python/samples/semantic-kernel-migration/orchestrations/handoff.py b/python/samples/semantic-kernel-migration/orchestrations/handoff.py new file mode 100644 index 0000000000..ccb30d4f6c --- /dev/null +++ b/python/samples/semantic-kernel-migration/orchestrations/handoff.py @@ -0,0 +1,297 @@ +# Copyright (c) Microsoft. All rights reserved. +"""Side-by-side handoff orchestrations for Semantic Kernel and Agent Framework.""" + +from __future__ import annotations + +import asyncio +import sys +from collections.abc import AsyncIterable, Sequence +from typing import Any, cast +from collections.abc import Iterator + +from agent_framework import ( + ChatMessage, + HandoffBuilder, + HandoffUserInputRequest, + RequestInfoEvent, + WorkflowEvent, + WorkflowOutputEvent, +) +from agent_framework.azure import AzureOpenAIChatClient +from azure.identity import AzureCliCredential +from semantic_kernel.agents import Agent, ChatCompletionAgent, HandoffOrchestration, OrchestrationHandoffs +from semantic_kernel.agents.runtime import InProcessRuntime +from semantic_kernel.connectors.ai.open_ai import AzureChatCompletion +from semantic_kernel.contents import ( + AuthorRole, + ChatMessageContent, + FunctionCallContent, + FunctionResultContent, + StreamingChatMessageContent, +) +from semantic_kernel.functions import KernelArguments, kernel_function +from semantic_kernel.prompt_template import KernelPromptTemplate, PromptTemplateConfig + +if sys.version_info >= (3, 12): + from typing import override # pragma: no cover +else: + from typing_extensions import override # pragma: no cover + + +CUSTOMER_PROMPT = "I need help with order 12345. I want a replacement and need to know when it will arrive." +SCRIPTED_RESPONSES = [ + "The item arrived damaged. I'd like a replacement shipped to the same address.", + "Great! Can you confirm the shipping cost won't be charged again?", + "Thanks for confirming!", +] + + +###################################################################### +# Semantic Kernel orchestration path +###################################################################### + + +class OrderStatusPlugin: + @kernel_function + def check_order_status(self, order_id: str) -> str: + return f"Order {order_id} is shipped and will arrive in 2-3 days." + + +class OrderRefundPlugin: + @kernel_function + def process_refund(self, order_id: str, reason: str) -> str: + return f"Refund for order {order_id} has been processed successfully (reason: {reason})." + + +class OrderReturnPlugin: + @kernel_function + def process_return(self, order_id: str, reason: str) -> str: + return f"Return for order {order_id} has been processed successfully (reason: {reason})." + + +def build_semantic_kernel_agents() -> tuple[list[Agent], OrchestrationHandoffs]: + credential = AzureCliCredential() + + triage = ChatCompletionAgent( + name="TriageAgent", + description="Customer support triage specialist.", + instructions="Greet the customer, collect intent, and hand off to the right specialist.", + service=AzureChatCompletion(credential=credential), + ) + refund = ChatCompletionAgent( + name="RefundAgent", + description="Handles refunds.", + instructions="Process refund requests.", + service=AzureChatCompletion(credential=credential), + plugins=[OrderRefundPlugin()], + ) + order_status = ChatCompletionAgent( + name="OrderStatusAgent", + description="Looks up order status.", + instructions="Provide shipping timelines and tracking information.", + service=AzureChatCompletion(credential=credential), + plugins=[OrderStatusPlugin()], + ) + order_return = ChatCompletionAgent( + name="OrderReturnAgent", + description="Handles returns.", + instructions="Coordinate order returns.", + service=AzureChatCompletion(credential=credential), + plugins=[OrderReturnPlugin()], + ) + + handoffs = ( + OrchestrationHandoffs() + .add_many( + source_agent=triage.name, + target_agents={ + refund.name: "Route refund-related requests here.", + order_status.name: "Route shipping questions here.", + order_return.name: "Route return-related requests here.", + }, + ) + .add(refund.name, triage.name, "Return to triage for non-refund issues.") + .add(order_status.name, triage.name, "Return to triage for non-status issues.") + .add(order_return.name, triage.name, "Return to triage for non-return issues.") + ) + + return [triage, refund, order_status, order_return], handoffs + + +_sk_new_message = True + + +def _sk_streaming_callback(message: StreamingChatMessageContent, is_final: bool) -> None: + """Display SK agent messages as they stream.""" + global _sk_new_message + if _sk_new_message: + print(f"{message.name}: ", end="", flush=True) + _sk_new_message = False + + if message.content: + print(message.content, end="", flush=True) + + for item in message.items: + if isinstance(item, FunctionCallContent): + print(f"[tool call: {item.name}({item.arguments})]", end="", flush=True) + if isinstance(item, FunctionResultContent): + print(f"[tool result: {item.result}]", end="", flush=True) + + if is_final: + print() + _sk_new_message = True + + +def _make_sk_human_responder(script: Iterator[str]) -> callable: + def _responder() -> ChatMessageContent: + try: + user_text = next(script) + except StopIteration: + user_text = "Thanks, that's all." + print(f"[User]: {user_text}") + return ChatMessageContent(role=AuthorRole.USER, content=user_text) + + return _responder + + +async def run_semantic_kernel_example(initial_task: str, scripted_responses: Sequence[str]) -> str: + agents, handoffs = build_semantic_kernel_agents() + response_iter = iter(scripted_responses) + + orchestration = HandoffOrchestration( + members=agents, + handoffs=handoffs, + streaming_agent_response_callback=_sk_streaming_callback, + human_response_function=_make_sk_human_responder(response_iter), + ) + + runtime = InProcessRuntime() + runtime.start() + + try: + orchestration_result = await orchestration.invoke(task=initial_task, runtime=runtime) + final_message = await orchestration_result.get(timeout=30) + if isinstance(final_message, ChatMessageContent): + return final_message.content or "" + return str(final_message) + finally: + await runtime.stop_when_idle() + + +###################################################################### +# Agent Framework orchestration path +###################################################################### + + +def _create_af_agents(client: AzureOpenAIChatClient): + triage = client.create_agent( + name="triage_agent", + instructions=( + "You are a customer support triage agent. Route requests:\n" + "- handoff_to_refund_agent for refunds\n" + "- handoff_to_order_status_agent for shipping/timeline questions\n" + "- handoff_to_order_return_agent for returns" + ), + ) + refund = client.create_agent( + name="refund_agent", + instructions=( + "Handle refunds. Ask for order id and reason. If shipping info is needed, hand off to order_status_agent." + ), + ) + status = client.create_agent( + name="order_status_agent", + instructions=( + "Provide order status, tracking, and timelines. If billing questions appear, hand off to refund_agent." + ), + ) + returns = client.create_agent( + name="order_return_agent", + instructions=( + "Coordinate returns, confirm addresses, and summarize next steps. Hand off to triage_agent if unsure." + ), + ) + return triage, refund, status, returns + + +async def _drain_events(stream: AsyncIterable[WorkflowEvent]) -> list[WorkflowEvent]: + return [event async for event in stream] + + +def _collect_handoff_requests(events: list[WorkflowEvent]) -> list[RequestInfoEvent]: + requests: list[RequestInfoEvent] = [] + for event in events: + if isinstance(event, RequestInfoEvent) and isinstance(event.data, HandoffUserInputRequest): + requests.append(event) + return requests + + +def _extract_final_conversation(events: list[WorkflowEvent]) -> list[ChatMessage]: + for event in events: + if isinstance(event, WorkflowOutputEvent): + data = cast(list[ChatMessage], event.data) + return data + return [] + + +async def run_agent_framework_example(initial_task: str, scripted_responses: Sequence[str]) -> str: + client = AzureOpenAIChatClient(credential=AzureCliCredential()) + triage, refund, status, returns = _create_af_agents(client) + + workflow = ( + HandoffBuilder(name="sk_af_handoff_migration", participants=[triage, refund, status, returns]) + .set_coordinator(triage) + .add_handoff(triage, [refund, status, returns]) + .add_handoff(refund, [status, triage]) + .add_handoff(status, [refund, triage]) + .add_handoff(returns, triage) + .build() + ) + + events = await _drain_events(workflow.run_stream(initial_task)) + pending = _collect_handoff_requests(events) + scripted_iter = iter(scripted_responses) + + final_events = events + while pending: + try: + user_reply = next(scripted_iter) + except StopIteration: + user_reply = "Thanks, that's all." + responses = {request.request_id: user_reply for request in pending} + final_events = await _drain_events(workflow.send_responses_streaming(responses)) + pending = _collect_handoff_requests(final_events) + + conversation = _extract_final_conversation(final_events) + if not conversation: + return "" + + # Render final transcript succinctly. + lines = [] + for message in conversation: + text = message.text or "" + if not text.strip(): + continue + speaker = message.author_name or message.role.value + lines.append(f"{speaker}: {text}") + return "\n".join(lines) + + +###################################################################### +# Console entry point +###################################################################### + + +async def main() -> None: + print("===== Agent Framework Handoff =====") + af_transcript = await run_agent_framework_example(CUSTOMER_PROMPT, SCRIPTED_RESPONSES) + print(af_transcript or "No output produced.") + print() + + print("===== Semantic Kernel Handoff =====") + sk_result = await run_semantic_kernel_example(CUSTOMER_PROMPT, SCRIPTED_RESPONSES) + print(sk_result or "No output produced.") + + +if __name__ == "__main__": + asyncio.run(main())