diff --git a/python/packages/core/agent_framework/_workflows/_agent_executor.py b/python/packages/core/agent_framework/_workflows/_agent_executor.py index 358cee94dd..bbaaf49784 100644 --- a/python/packages/core/agent_framework/_workflows/_agent_executor.py +++ b/python/packages/core/agent_framework/_workflows/_agent_executor.py @@ -105,6 +105,11 @@ def workflow_output_types(self) -> list[type[Any]]: return [AgentRunResponse] return [] + @property + def description(self) -> str | None: + """Get the description of the underlying agent.""" + return self._agent.description + @handler async def run( self, request: AgentExecutorRequest, ctx: WorkflowContext[AgentExecutorResponse, AgentRunResponse] diff --git a/python/packages/core/agent_framework/_workflows/_handoff.py b/python/packages/core/agent_framework/_workflows/_handoff.py index 757ebb3585..9a99657902 100644 --- a/python/packages/core/agent_framework/_workflows/_handoff.py +++ b/python/packages/core/agent_framework/_workflows/_handoff.py @@ -14,7 +14,10 @@ Key properties: - The entire conversation is maintained and reused on every hop - The coordinator signals a handoff by invoking a tool call that names the specialist -- After a specialist responds, the workflow immediately requests new user input +- In human_in_loop mode (default), the workflow requests user input after each agent response + that doesn't trigger a handoff +- In autonomous mode, agents continue responding until they invoke a handoff tool or reach + a termination condition or turn limit """ import logging @@ -76,9 +79,9 @@ def _create_handoff_tool(alias: str, description: str | None = None) -> AIFuncti # Note: approval_mode is intentionally NOT set for handoff tools. # Handoff tools are framework-internal signals that trigger routing logic, - # not actual function executions. They are automatically intercepted and - # never actually execute, so approval is unnecessary and causes issues - # with tool_calls/responses pairing when cleaning conversations. + # not actual function executions. They are automatically intercepted by + # _AutoHandoffMiddleware which short-circuits execution and provides synthetic + # results, so the function body never actually runs in practice. @ai_function(name=tool_name, description=doc) def _handoff_tool(context: str | None = None) -> str: """Return a deterministic acknowledgement that encodes the target alias.""" @@ -109,6 +112,8 @@ def _clone_chat_agent(agent: ChatAgent) -> ChatAgent: chat_message_store_factory=agent.chat_message_store_factory, context_providers=agent.context_provider, middleware=middleware, + # Disable parallel tool calls to prevent the agent from invoking multiple handoff tools at once. + allow_multiple_tool_calls=False, frequency_penalty=options.frequency_penalty, logit_bias=dict(options.logit_bias) if options.logit_bias else None, max_tokens=options.max_tokens, @@ -217,7 +222,7 @@ async def process( class _InputToConversation(Executor): - """Normalises initial workflow input into a list[ChatMessage].""" + """Normalizes initial workflow input into a list[ChatMessage].""" @handler async def from_str(self, prompt: str, ctx: WorkflowContext[list[ChatMessage]]) -> None: @@ -225,16 +230,12 @@ async def from_str(self, prompt: str, ctx: WorkflowContext[list[ChatMessage]]) - await ctx.send_message([ChatMessage(Role.USER, text=prompt)]) @handler - async def from_message(self, message: ChatMessage, ctx: WorkflowContext[list[ChatMessage]]) -> None: # type: ignore[name-defined] + async def from_message(self, message: ChatMessage, ctx: WorkflowContext[list[ChatMessage]]) -> None: """Pass through an existing chat message as the initial conversation.""" await ctx.send_message([message]) @handler - async def from_messages( - self, - messages: list[ChatMessage], - ctx: WorkflowContext[list[ChatMessage]], - ) -> None: # type: ignore[name-defined] + async def from_messages(self, messages: list[ChatMessage], ctx: WorkflowContext[list[ChatMessage]]) -> None: """Forward a list of chat messages as the starting conversation history.""" await ctx.send_message(list(messages)) @@ -400,7 +401,8 @@ async def handle_agent_response( self._conversation = list(full_conv) else: # Subsequent responses - append only new messages from this agent - # Keep ALL messages including tool calls to maintain complete history + # Keep ALL messages including tool calls to maintain complete history. + # This includes assistant messages with function calls and tool role messages with results. new_messages = response.agent_run_response.messages or [] self._conversation.extend(new_messages) @@ -516,9 +518,9 @@ async def handle_user_input( ) else: logger.info(f"Routing user input to coordinator '{target_agent_id}'") - # Note: Stack is only used for specialist-to-specialist handoffs, not user input routing - # Clean before sending to target agent + # Clean conversation before sending to target agent + # Removes tool-related messages that shouldn't be resent on every turn cleaned = clean_conversation_for_handoff(self._conversation) request = AgentExecutorRequest(messages=cleaned, should_respond=True) await ctx.send_message(request, target_id=target_agent_id) @@ -635,13 +637,7 @@ def _apply_response_metadata(self, conversation: list[ChatMessage], agent_respon class _UserInputGateway(Executor): """Bridges conversation context with the request & response cycle and re-enters the loop.""" - def __init__( - self, - *, - starting_agent_id: str, - prompt: str | None, - id: str, - ) -> None: + def __init__(self, *, starting_agent_id: str, prompt: str | None, id: str) -> None: """Initialise the gateway that requests user input and forwards responses.""" super().__init__(id) self._starting_agent_id = starting_agent_id @@ -702,7 +698,17 @@ async def resume_from_user( def _as_user_messages(payload: Any) -> list[ChatMessage]: - """Normalise arbitrary payloads into user-authored chat messages.""" + """Normalize arbitrary payloads into user-authored chat messages. + + Handles various input formats: + - ChatMessage instances (converted to USER role if needed) + - List of ChatMessage instances + - Mapping with 'text' or 'content' key + - Any other type (converted to string) + + Returns: + List of ChatMessage instances with USER role. + """ if isinstance(payload, ChatMessage): if payload.role == Role.USER: return [payload] @@ -798,7 +804,7 @@ class HandoffBuilder: name="customer_support", participants=[coordinator, refund, shipping], ) - .set_coordinator("coordinator_agent") + .set_coordinator(coordinator) .build() ) @@ -817,7 +823,7 @@ class HandoffBuilder: # Enable specialist-to-specialist handoffs with fluent API workflow = ( HandoffBuilder(participants=[coordinator, replacement, delivery, billing]) - .set_coordinator("coordinator_agent") + .set_coordinator(coordinator) .add_handoff(coordinator, [replacement, delivery, billing]) # Coordinator routes to all .add_handoff(replacement, [delivery, billing]) # Replacement delegates to delivery/billing .add_handoff(delivery, billing) # Delivery escalates to billing @@ -827,6 +833,35 @@ class HandoffBuilder: # Flow: User → Coordinator → Replacement → Delivery → Back to User # (Replacement hands off to Delivery without returning to user) + **Use Participant Factories for State Isolation:** + + .. code-block:: python + # Define factories that produce fresh agent instances per workflow run + def create_coordinator() -> AgentProtocol: + return chat_client.create_agent( + instructions="You are the coordinator agent...", + name="coordinator_agent", + ) + + + def create_specialist() -> AgentProtocol: + return chat_client.create_agent( + instructions="You are the specialist agent...", + name="specialist_agent", + ) + + + workflow = ( + HandoffBuilder( + participant_factories={ + "coordinator": create_coordinator, + "specialist": create_specialist, + } + ) + .set_coordinator("coordinator") + .build() + ) + **Custom Termination Condition:** .. code-block:: python @@ -834,7 +869,7 @@ class HandoffBuilder: # Terminate when user says goodbye or after 5 exchanges workflow = ( HandoffBuilder(participants=[coordinator, refund, shipping]) - .set_coordinator("coordinator_agent") + .set_coordinator(coordinator) .with_termination_condition( lambda conv: sum(1 for msg in conv if msg.role.value == "user") >= 5 or any("goodbye" in msg.text.lower() for msg in conv[-2:]) @@ -851,7 +886,7 @@ class HandoffBuilder: storage = InMemoryCheckpointStorage() workflow = ( HandoffBuilder(participants=[coordinator, refund, shipping]) - .set_coordinator("coordinator_agent") + .set_coordinator(coordinator) .with_checkpointing(storage) .build() ) @@ -860,6 +895,9 @@ class HandoffBuilder: name: Optional workflow name for identification and logging. participants: List of agents (AgentProtocol) or executors to participate in the handoff. The first agent you specify as coordinator becomes the orchestrating agent. + participant_factories: Mapping of factory names to callables that produce agents or + executors when invoked. This allows for lazy instantiation + and state isolation per workflow instance created by this builder. description: Optional human-readable description of the workflow. Raises: @@ -872,14 +910,16 @@ def __init__( *, name: str | None = None, participants: Sequence[AgentProtocol | Executor] | None = None, + participant_factories: Mapping[str, Callable[[], AgentProtocol | Executor]] | None = None, description: str | None = None, ) -> None: r"""Initialize a HandoffBuilder for creating conversational handoff workflows. The builder starts in an unconfigured state and requires you to call: 1. `.participants([...])` - Register agents - 2. `.set_coordinator(...)` - Designate which agent receives initial user input - 3. `.build()` - Construct the final Workflow + 2. or `.participant_factories({...})` - Register agent/executor factories + 3. `.set_coordinator(...)` - Designate which agent receives initial user input + 4. `.build()` - Construct the final Workflow Optional configuration methods allow you to customize context management, termination logic, and persistence. @@ -891,6 +931,9 @@ def __init__( participate in the handoff workflow. You can also call `.participants([...])` later. Each participant must have a unique identifier (name for agents, id for executors). + participant_factories: Optional mapping of factory names to callables that produce agents or + executors when invoked. This allows for lazy instantiation + and state isolation per workflow instance created by this builder. description: Optional human-readable description explaining the workflow's purpose. Useful for documentation and observability. @@ -911,7 +954,6 @@ def __init__( self._termination_condition: Callable[[list[ChatMessage]], bool | Awaitable[bool]] = ( _default_termination_condition ) - self._auto_register_handoff_tools: bool = True self._handoff_config: dict[str, list[str]] = {} # Maps agent_id -> [target_agent_ids] self._return_to_previous: bool = False self._interaction_mode: Literal["human_in_loop", "autonomous"] = "human_in_loop" @@ -919,9 +961,79 @@ def __init__( self._request_info_enabled: bool = False self._request_info_filter: set[str] | None = None + self._participant_factories: dict[str, Callable[[], AgentProtocol | Executor]] = {} + if participant_factories: + self.participant_factories(participant_factories) + if participants: self.participants(participants) + # region Fluent Configuration Methods + + def participant_factories( + self, participant_factories: Mapping[str, Callable[[], AgentProtocol | Executor]] + ) -> "HandoffBuilder": + """Register factories that produce agents or executors for the handoff workflow. + + Each factory is a callable that returns an AgentProtocol or Executor instance. + Factories are invoked when building the workflow, allowing for lazy instantiation + and state isolation per workflow instance. + + Args: + participant_factories: Mapping of factory names to callables that return AgentProtocol or Executor + instances. Each produced participant must have a unique identifier (name for + agents, id for executors). + + Returns: + Self for method chaining. + + Raises: + ValueError: If participant_factories is empty or `.participants(...)` or `.participant_factories(...)` + has already been called. + + Example: + .. code-block:: python + + from agent_framework import ChatAgent, HandoffBuilder + + + def create_coordinator() -> ChatAgent: + return ... + + + def create_refund_agent() -> ChatAgent: + return ... + + + def create_billing_agent() -> ChatAgent: + return ... + + + factories = { + "coordinator": create_coordinator, + "refund": create_refund_agent, + "billing": create_billing_agent, + } + + builder = HandoffBuilder().participant_factories(factories) + # Use the factory IDs to create handoffs and set the coordinator + builder.add_handoff("coordinator", ["refund", "billing"]) + builder.set_coordinator("coordinator") + """ + if self._executors: + raise ValueError( + "Cannot mix .participants([...]) and .participant_factories() in the same builder instance." + ) + + if self._participant_factories: + raise ValueError("participant_factories() has already been called on this builder instance.") + + if not participant_factories: + raise ValueError("participant_factories cannot be empty") + + self._participant_factories = dict(participant_factories) + return self + def participants(self, participants: Sequence[AgentProtocol | Executor]) -> "HandoffBuilder": """Register the agents or executors that will participate in the handoff workflow. @@ -938,7 +1050,8 @@ def participants(self, participants: Sequence[AgentProtocol | Executor]) -> "Han Self for method chaining. Raises: - ValueError: If participants is empty or contains duplicates. + ValueError: If participants is empty, contains duplicates, or `.participants(...)` or + `.participant_factories(...)` has already been called. TypeError: If participants are not AgentProtocol or Executor instances. Example: @@ -960,26 +1073,28 @@ def participants(self, participants: Sequence[AgentProtocol | Executor]) -> "Han This method resets any previously configured coordinator, so you must call `.set_coordinator(...)` again after changing participants. """ + if self._participant_factories: + raise ValueError( + "Cannot mix .participants([...]) and .participant_factories() in the same builder instance." + ) + + if self._executors: + raise ValueError("participants have already been assigned") + if not participants: raise ValueError("participants cannot be empty") named: dict[str, AgentProtocol | Executor] = {} for participant in participants: - identifier: str if isinstance(participant, Executor): identifier = participant.id elif isinstance(participant, AgentProtocol): - name_attr = getattr(participant, "name", None) - if not name_attr: - raise ValueError( - "Agents used in handoff workflows must have a stable name " - "so they can be addressed during routing." - ) - identifier = str(name_attr) + identifier = participant.display_name else: raise TypeError( f"Participants must be AgentProtocol or Executor instances. Got {type(participant).__name__}." ) + if identifier in named: raise ValueError(f"Duplicate participant name '{identifier}' detected") named[identifier] = participant @@ -990,15 +1105,10 @@ def participants(self, participants: Sequence[AgentProtocol | Executor]) -> "Han ) wrapped = metadata["executors"] - seen_ids: set[str] = set() - for executor in wrapped.values(): - if executor.id in seen_ids: - raise ValueError(f"Duplicate participant with id '{executor.id}' detected") - seen_ids.add(executor.id) - self._executors = {executor.id: executor for executor in wrapped.values()} self._aliases = metadata["aliases"] self._starting_agent_id = None + return self def set_coordinator(self, agent: str | AgentProtocol | Executor) -> "HandoffBuilder": @@ -1015,7 +1125,7 @@ def set_coordinator(self, agent: str | AgentProtocol | Executor) -> "HandoffBuil Args: agent: The agent to use as the coordinator. Can be: - - Agent name (str): e.g., "coordinator_agent" + - Factory name (str): If using participant factories - AgentProtocol instance: The actual agent object - Executor instance: A custom executor wrapping an agent @@ -1023,15 +1133,26 @@ def set_coordinator(self, agent: str | AgentProtocol | Executor) -> "HandoffBuil Self for method chaining. Raises: - ValueError: If participants(...) hasn't been called yet, or if the specified - agent is not in the participants list. + ValueError: 1) If `agent` is an AgentProtocol or Executor instance but `.participants(...)` hasn't + been called yet, or if it is not in the participants list. + 2) If `agent` is a factory name (str) but `.participant_factories(...)` hasn't been + called yet, or if it is not in the participant_factories list. + TypeError: If `agent` is not a str, AgentProtocol, or Executor instance. Example: .. code-block:: python - # Use agent name - builder = HandoffBuilder().participants([coordinator, refund, billing]).set_coordinator("coordinator") + # Use factory name with `.participant_factories()` + builder = ( + HandoffBuilder() + .participant_factories({ + "coordinator": create_coordinator, + "refund": create_refund_agent, + "billing": create_billing_agent, + }) + .set_coordinator("coordinator") + ) # Or pass the agent object directly builder = HandoffBuilder().participants([coordinator, refund, billing]).set_coordinator(coordinator) @@ -1042,12 +1163,29 @@ def set_coordinator(self, agent: str | AgentProtocol | Executor) -> "HandoffBuil Decorate the tool with `approval_mode="always_require"` to ensure the workflow intercepts the call before execution and can make the transition. """ - if not self._executors: - raise ValueError("Call participants(...) before coordinator(...)") - resolved = self._resolve_to_id(agent) - if resolved not in self._executors: - raise ValueError(f"coordinator '{resolved}' is not part of the participants list") - self._starting_agent_id = resolved + if isinstance(agent, (AgentProtocol, Executor)): + if not self._executors: + raise ValueError( + "Call participants(...) before coordinator(...). If using participant_factories, " + "pass the factory name (str) instead of the agent instance." + ) + resolved = self._resolve_to_id(agent) + if resolved not in self._executors: + raise ValueError(f"coordinator '{resolved}' is not part of the participants list") + self._starting_agent_id = resolved + elif isinstance(agent, str): + if agent not in self._participant_factories: + raise ValueError( + f"coordinator factory name '{agent}' is not part of the participant_factories list. If " + "you are using participant instances, call .participants(...) and pass the agent instance instead." + ) + self._starting_agent_id = agent + else: + raise TypeError( + "coordinator must be a factory name (str), AgentProtocol, or Executor instance. " + f"Got {type(agent).__name__}." + ) + return self def add_handoff( @@ -1067,33 +1205,42 @@ def add_handoff( Args: source: The agent that can initiate the handoff. Can be: - - Agent name (str): e.g., "triage_agent" + - Factory name (str): If using participant factories - AgentProtocol instance: The actual agent object - Executor instance: A custom executor wrapping an agent + - Cannot mix factory names and instances across source and targets targets: One or more target agents that the source can hand off to. Can be: - - Single agent: "billing_agent" or agent_instance - - Multiple agents: ["billing_agent", "support_agent"] or [agent1, agent2] - tool_name: Optional custom name for the handoff tool. If not provided, generates - "handoff_to_" for single targets or "handoff_to__agent" - for multiple targets based on target names. - tool_description: Optional custom description for the handoff tool. If not provided, - generates "Handoff to the agent." + - Factory name (str): If using participant factories + - AgentProtocol instance: The actual agent object + - Executor instance: A custom executor wrapping an agent + - Single target: "billing_agent" or agent_instance + - Multiple targets: ["billing_agent", "support_agent"] or [agent1, agent2] + - Cannot mix factory names and instances across source and targets + tool_name: Optional custom name for the handoff tool. Currently not used in the + implementation - tools are always auto-generated as "handoff_to_". + Reserved for future enhancement. + tool_description: Optional custom description for the handoff tool. Currently not used + in the implementation - descriptions are always auto-generated as + "Handoff to the agent.". Reserved for future enhancement. Returns: Self for method chaining. Raises: - ValueError: If source or targets are not in the participants list, or if + ValueError: 1) If source or targets are not in the participants list, or if participants(...) hasn't been called yet. + 2) If source or targets are factory names (str) but participant_factories(...) + hasn't been called yet, or if they are not in the participant_factories list. + TypeError: If mixing factory names (str) and AgentProtocol/Executor instances Examples: - Single target: + Single target (using factory name): .. code-block:: python builder.add_handoff("triage_agent", "billing_agent") - Multiple targets (using agent names): + Multiple targets (using factory names): .. code-block:: python @@ -1118,146 +1265,70 @@ def add_handoff( .build() ) - Custom tool names and descriptions: - - .. code-block:: python - - builder.add_handoff( - "support_agent", - "escalation_agent", - tool_name="escalate_to_l2", - tool_description="Escalate this issue to Level 2 support", - ) - Note: - Handoff tools are automatically registered for each source agent - If a source agent is configured multiple times via add_handoff, targets are merged """ - if not self._executors: - raise ValueError("Call participants(...) before add_handoff(...)") - - # Resolve source agent ID - source_id = self._resolve_to_id(source) - if source_id not in self._executors: - raise ValueError(f"Source agent '{source}' is not in the participants list") - - # Normalize targets to list - target_list = [targets] if isinstance(targets, (str, AgentProtocol, Executor)) else list(targets) - - # Resolve all target IDs - target_ids: list[str] = [] - for target in target_list: - target_id = self._resolve_to_id(target) - if target_id not in self._executors: - raise ValueError(f"Target agent '{target}' is not in the participants list") - target_ids.append(target_id) - - # Merge with existing handoff configuration for this source - if source_id in self._handoff_config: - # Add new targets to existing list, avoiding duplicates - existing = self._handoff_config[source_id] - for target_id in target_ids: - if target_id not in existing: - existing.append(target_id) - else: - self._handoff_config[source_id] = target_ids - - return self - - def auto_register_handoff_tools(self, enabled: bool) -> "HandoffBuilder": - """Configure whether the builder should synthesize handoff tools for the starting agent.""" - self._auto_register_handoff_tools = enabled - return self - - def _apply_auto_tools(self, agent: ChatAgent, specialists: Mapping[str, Executor]) -> dict[str, str]: - """Attach synthetic handoff tools to a chat agent and return the target lookup table.""" - chat_options = agent.chat_options - existing_tools = list(chat_options.tools or []) - existing_names = {getattr(tool, "name", "") for tool in existing_tools if hasattr(tool, "name")} - - tool_targets: dict[str, str] = {} - new_tools: list[Any] = [] - for exec_id, executor in specialists.items(): - alias = exec_id - sanitized = sanitize_identifier(alias) - - # Extract agent description from AgentExecutor if available - description = None - if isinstance(executor, AgentExecutor): - target_agent = getattr(executor, "_agent", None) - if target_agent: - description = getattr(target_agent, "description", None) - - tool = _create_handoff_tool(alias, description) - if tool.name not in existing_names: - new_tools.append(tool) - tool_targets[tool.name.lower()] = exec_id - tool_targets[sanitized] = exec_id - tool_targets[alias.lower()] = exec_id - - if new_tools: - chat_options.tools = existing_tools + new_tools - else: - chat_options.tools = existing_tools - - return tool_targets - - def _resolve_agent_id(self, agent_identifier: str) -> str: - """Resolve an agent identifier to an executor ID. - - Args: - agent_identifier: Can be agent name, display name, or executor ID - - Returns: - The executor ID - - Raises: - ValueError: If the identifier cannot be resolved - """ - # Check if it's already an executor ID - if agent_identifier in self._executors: - return agent_identifier - - # Check if it's an alias - if agent_identifier in self._aliases: - return self._aliases[agent_identifier] - - # Not found - raise ValueError(f"Agent identifier '{agent_identifier}' not found in participants") - - def _prepare_agent_with_handoffs( - self, - executor: AgentExecutor, - target_agents: Mapping[str, Executor], - ) -> tuple[AgentExecutor, dict[str, str]]: - """Prepare an agent by adding handoff tools for the specified target agents. - - Args: - executor: The agent executor to prepare - target_agents: Map of executor IDs to target executors this agent can hand off to - - Returns: - Tuple of (updated executor, tool_targets map) - """ - agent = getattr(executor, "_agent", None) - if not isinstance(agent, ChatAgent): - return executor, {} + if isinstance(source, str) and ( + isinstance(targets, str) or (isinstance(targets, Sequence) and all(isinstance(t, str) for t in targets)) + ): + # Both source and targets are factory names + if not self._participant_factories: + raise ValueError("Call participant_factories(...) before add_handoff(...)") + + if source not in self._participant_factories: + raise ValueError(f"Source factory name '{source}' is not in the participant_factories list") + + target_list: list[str] = [targets] if isinstance(targets, str) else list(targets) # type: ignore + for target in target_list: + if target not in self._participant_factories: + raise ValueError(f"Target factory name '{target}' is not in the participant_factories list") + + self._handoff_config[source] = target_list # type: ignore + return self + + if isinstance(source, (AgentProtocol, Executor)) and ( + isinstance(targets, (AgentProtocol, Executor)) + or all(isinstance(t, (AgentProtocol, Executor)) for t in targets) + ): + # Both source and targets are instances + if not self._executors: + raise ValueError("Call participants(...) before add_handoff(...)") + + # Resolve source agent ID + source_id = self._resolve_to_id(source) + if source_id not in self._executors: + raise ValueError(f"Source agent '{source}' is not in the participants list") + + # Normalize targets to list + target_list: list[AgentProtocol | Executor] = ( # type: ignore[no-redef] + [targets] if isinstance(targets, (AgentProtocol, Executor)) else list(targets) + ) # type: ignore + + # Resolve all target IDs + target_ids: list[str] = [] + for target in target_list: + target_id = self._resolve_to_id(target) + if target_id not in self._executors: + raise ValueError(f"Target agent '{target}' is not in the participants list") + target_ids.append(target_id) + + # Merge with existing handoff configuration for this source + if source_id in self._handoff_config: + # Add new targets to existing list, avoiding duplicates + existing = self._handoff_config[source_id] + for target_id in target_ids: + if target_id not in existing: + existing.append(target_id) + else: + self._handoff_config[source_id] = target_ids - cloned_agent = _clone_chat_agent(agent) - tool_targets = self._apply_auto_tools(cloned_agent, target_agents) - if tool_targets: - middleware = _AutoHandoffMiddleware(tool_targets) - existing_middleware = list(cloned_agent.middleware or []) - existing_middleware.append(middleware) - cloned_agent.middleware = existing_middleware + return self - new_executor = AgentExecutor( - cloned_agent, - agent_thread=getattr(executor, "_agent_thread", None), - output_response=getattr(executor, "_output_response", False), - id=executor.id, + raise TypeError( + "Cannot mix factory names (str) and AgentProtocol/Executor instances " + "across source and targets in add_handoff()" ) - return new_executor, tool_targets def request_prompt(self, prompt: str | None) -> "HandoffBuilder": """Set a custom prompt message displayed when requesting user input. @@ -1619,75 +1690,46 @@ def build(self) -> Workflow: After calling build(), the builder instance should not be reused. Create a new builder if you need to construct another workflow with different configuration. """ - if not self._executors: - raise ValueError("No participants provided. Call participants([...]) first.") + if not self._executors and not self._participant_factories: + raise ValueError( + "No participants or participant_factories have been configured. " + "Call participants(...) or participant_factories(...) first." + ) + if self._starting_agent_id is None: - raise ValueError("coordinator must be defined before build().") + raise ValueError("Must call set_coordinator(...) before building the workflow.") - starting_executor = self._executors[self._starting_agent_id] - specialists = { - exec_id: executor for exec_id, executor in self._executors.items() if exec_id != self._starting_agent_id - } - - # Build handoff tool registry for all agents that need them - handoff_tool_targets: dict[str, str] = {} - if self._auto_register_handoff_tools: - # Determine which agents should have handoff tools - if self._handoff_config: - # Use explicit handoff configuration from add_handoff() calls - for source_exec_id, target_exec_ids in self._handoff_config.items(): - executor = self._executors.get(source_exec_id) - if not executor: - raise ValueError(f"Handoff source agent '{source_exec_id}' not found in participants") - - if isinstance(executor, AgentExecutor): - # Build targets map for this source agent - targets_map: dict[str, Executor] = {} - for target_exec_id in target_exec_ids: - target_executor = self._executors.get(target_exec_id) - if not target_executor: - raise ValueError(f"Handoff target agent '{target_exec_id}' not found in participants") - targets_map[target_exec_id] = target_executor - - # Register handoff tools for this agent - updated_executor, tool_targets = self._prepare_agent_with_handoffs(executor, targets_map) - self._executors[source_exec_id] = updated_executor - handoff_tool_targets.update(tool_targets) - else: - # Default behavior: only coordinator gets handoff tools to all specialists - if isinstance(starting_executor, AgentExecutor) and specialists: - starting_executor, tool_targets = self._prepare_agent_with_handoffs(starting_executor, specialists) - self._executors[self._starting_agent_id] = starting_executor - handoff_tool_targets.update(tool_targets) # Update references after potential agent modifications - starting_executor = self._executors[self._starting_agent_id] - specialists = { - exec_id: executor for exec_id, executor in self._executors.items() if exec_id != self._starting_agent_id - } + # Resolve executors, aliases, and handoff tool targets + # This will instantiate participants if using factories, and validate handoff config + start_executor_id, executors, aliases, handoff_tool_targets = self._resolve_executors_and_handoffs() + specialists = {exec_id: executor for exec_id, executor in executors.items() if exec_id != start_executor_id} if not specialists: logger.warning("Handoff workflow has no specialist agents; the coordinator will loop with the user.") descriptions = { - exec_id: getattr(executor, "description", None) or exec_id for exec_id, executor in self._executors.items() + exec_id: getattr(executor, "description", None) or exec_id for exec_id, executor in executors.items() } participant_specs = { exec_id: GroupChatParticipantSpec(name=exec_id, participant=executor, description=descriptions[exec_id]) - for exec_id, executor in self._executors.items() + for exec_id, executor in executors.items() } input_node = _InputToConversation(id="input-conversation") user_gateway = _UserInputGateway( - starting_agent_id=starting_executor.id, + starting_agent_id=start_executor_id, prompt=self._request_prompt, id="handoff-user-input", ) builder = WorkflowBuilder(name=self._name, description=self._description).set_start_executor(input_node) - specialist_aliases = {alias: exec_id for alias, exec_id in self._aliases.items() if exec_id in specialists} + specialist_aliases = { + alias: specialists[exec_id].id for alias, exec_id in aliases.items() if exec_id in specialists + } def _handoff_orchestrator_factory(_: _GroupChatConfig) -> Executor: return _HandoffCoordinator( - starting_agent_id=starting_executor.id, + starting_agent_id=start_executor_id, specialist_ids=specialist_aliases, input_gateway_id=user_gateway.id, termination_condition=self._termination_condition, @@ -1704,8 +1746,8 @@ def _handoff_orchestrator_factory(_: _GroupChatConfig) -> Executor: manager_name=self._starting_agent_id, participants=participant_specs, max_rounds=None, - participant_aliases=self._aliases, - participant_executors=self._executors, + participant_aliases=aliases, + participant_executors=executors, ) # Determine participant factory - wrap with request info interceptor if enabled @@ -1754,14 +1796,159 @@ def _factory_with_request_info( builder = builder.add_edge(input_node, starting_entry_executor) else: # Fallback to direct connection if interceptor not found - builder = builder.add_edge(input_node, starting_executor) + builder = builder.add_edge(input_node, executors[start_executor_id]) else: - builder = builder.add_edge(input_node, starting_executor) + builder = builder.add_edge(input_node, executors[start_executor_id]) builder = builder.add_edge(coordinator, user_gateway) builder = builder.add_edge(user_gateway, coordinator) return builder.build() + # endregion Fluent Configuration Methods + + # region Internal Helper Methods + + def _resolve_executors(self) -> tuple[dict[str, Executor], dict[str, str]]: + """Resolve participant factories into executor instances. + + If executors were provided directly via participants(...), those are returned as-is. + If participant factories were provided via participant_factories(...), those + are invoked to create executor instances and aliases. + + Returns: + Tuple of (executors map, aliases map) + """ + if self._executors and self._participant_factories: + raise ValueError("Cannot have both executors and participant_factories configured") + + if self._executors: + if self._aliases: + # Return existing executors and aliases + return self._executors, self._aliases + raise ValueError("Aliases is empty despite executors being provided") + + if self._participant_factories: + # Invoke each factory to create participant instances + executor_ids_to_executors: dict[str, AgentProtocol | Executor] = {} + factory_names_to_ids: dict[str, str] = {} + for factory_name, factory in self._participant_factories.items(): + instance: Executor | AgentProtocol = factory() + if isinstance(instance, Executor): + identifier = instance.id + elif isinstance(instance, AgentProtocol): + identifier = instance.display_name + else: + raise TypeError( + f"Participants must be AgentProtocol or Executor instances. Got {type(instance).__name__}." + ) + + if identifier in executor_ids_to_executors: + raise ValueError(f"Duplicate participant name '{identifier}' detected") + executor_ids_to_executors[identifier] = instance + factory_names_to_ids[factory_name] = identifier + + # Prepare metadata and wrap instances as needed + metadata = prepare_participant_metadata( + executor_ids_to_executors, + description_factory=lambda name, participant: getattr(participant, "description", None) or name, + ) + + wrapped = metadata["executors"] + # Map executors by factory name (not executor.id) because handoff configs reference factory names + # This allows users to configure handoffs using the factory names they provided + executors = { + factory_name: wrapped[executor_id] for factory_name, executor_id in factory_names_to_ids.items() + } + aliases = metadata["aliases"] + + return executors, aliases + + raise ValueError("No executors or participant_factories have been configured") + + def _resolve_handoffs(self, executors: Mapping[str, Executor]) -> tuple[dict[str, Executor], dict[str, str]]: + """Handoffs may be specified using factory names or instances; resolve to executor IDs. + + Args: + executors: Map of executor IDs or factory names to Executor instances + + Returns: + Tuple of (updated executors map, handoff configuration map) + The updated executors map may have modified agents with handoff tools added + and maps executor IDs to Executor instances. + The handoff configuration map maps executor IDs to lists of target executor IDs. + """ + handoff_tool_targets: dict[str, str] = {} + updated_executors = {executor.id: executor for executor in executors.values()} + # Determine which agents should have handoff tools + if self._handoff_config: + # Use explicit handoff configuration from add_handoff() calls + for source_id, target_ids in self._handoff_config.items(): + executor = executors.get(source_id) + if not executor: + raise ValueError( + f"Handoff source agent '{source_id}' not found. " + "Please make sure source has been added as either a participant or participant_factory." + ) + + if isinstance(executor, AgentExecutor): + # Build targets map for this source agent + targets_map: dict[str, Executor] = {} + for target_id in target_ids: + target_executor = executors.get(target_id) + if not target_executor: + raise ValueError( + f"Handoff target agent '{target_id}' not found. " + "Please make sure target has been added as either a participant or participant_factory." + ) + targets_map[target_executor.id] = target_executor + + # Register handoff tools for this agent + updated_executor, tool_targets = self._prepare_agent_with_handoffs(executor, targets_map) + updated_executors[updated_executor.id] = updated_executor + handoff_tool_targets.update(tool_targets) + else: + if self._starting_agent_id is None or self._starting_agent_id not in executors: + raise RuntimeError("Failed to resolve default handoff configuration due to missing starting agent.") + + # Default behavior: only coordinator gets handoff tools to all specialists + starting_executor = executors[self._starting_agent_id] + specialists = { + executor.id: executor for executor in executors.values() if executor.id != starting_executor.id + } + + if isinstance(starting_executor, AgentExecutor) and specialists: + starting_executor, tool_targets = self._prepare_agent_with_handoffs(starting_executor, specialists) + updated_executors[starting_executor.id] = starting_executor + handoff_tool_targets.update(tool_targets) # Update references after potential agent modifications + + return updated_executors, handoff_tool_targets + + def _resolve_executors_and_handoffs(self) -> tuple[str, dict[str, Executor], dict[str, str], dict[str, str]]: + """Resolve participant factories into executor instances and handoff configurations. + + If executors were provided directly via participants(...), those are returned as-is. + If participant factories were provided via participant_factories(...), those + are invoked to create executor instances and aliases. + + Returns: + Tuple of (executors map, aliases map, handoff configuration map) + """ + # Resolve the participant factories now. This doesn't break the factory pattern + # since the Handoff builder still creates new instances per workflow build. + executors, aliases = self._resolve_executors() + # `self._starting_agent_id` is either a factory name or executor ID at this point, + # resolve to executor ID + if self._starting_agent_id in executors: + start_executor_id = executors[self._starting_agent_id].id + else: + raise RuntimeError("Failed to resolve starting agent ID during build.") + + # Resolve handoffs + # This will update the `executors` dict to a map of executor IDs to executors + updated_executors, handoff_tool_targets = self._resolve_handoffs(executors) + + return start_executor_id, updated_executors, aliases, handoff_tool_targets + def _resolve_to_id(self, candidate: str | AgentProtocol | Executor) -> str: """Resolve a participant reference into a concrete executor identifier.""" if isinstance(candidate, Executor): @@ -1776,3 +1963,77 @@ def _resolve_to_id(self, candidate: str | AgentProtocol | Executor) -> str: return self._aliases[candidate] return candidate raise TypeError(f"Invalid starting agent reference: {type(candidate).__name__}") + + def _apply_auto_tools(self, agent: ChatAgent, specialists: Mapping[str, Executor]) -> dict[str, str]: + """Attach synthetic handoff tools to a chat agent and return the target lookup table. + + Creates handoff tools for each specialist agent that this agent can route to. + The tool_targets dict maps various name formats (tool name, sanitized name, alias) + to executor IDs to enable flexible handoff target resolution. + + Args: + agent: The ChatAgent to add handoff tools to + specialists: Map of executor IDs or factory names to specialist executors this agent can hand off to + + Returns: + Dict mapping tool names (in various formats) to executor IDs for handoff resolution + """ + chat_options = agent.chat_options + existing_tools = list(chat_options.tools or []) + existing_names = {getattr(tool, "name", "") for tool in existing_tools if hasattr(tool, "name")} + + tool_targets: dict[str, str] = {} + new_tools: list[Any] = [] + for executor in specialists.values(): + alias = executor.id + sanitized = sanitize_identifier(alias) + tool = _create_handoff_tool(alias, executor.description if isinstance(executor, AgentExecutor) else None) + if tool.name not in existing_names: + new_tools.append(tool) + # Map multiple name variations to the same executor ID for robust resolution + tool_targets[tool.name.lower()] = executor.id + tool_targets[sanitized] = executor.id + tool_targets[alias.lower()] = executor.id + + if new_tools: + chat_options.tools = existing_tools + new_tools + else: + chat_options.tools = existing_tools + + return tool_targets + + def _prepare_agent_with_handoffs( + self, + executor: AgentExecutor, + target_agents: Mapping[str, Executor], + ) -> tuple[AgentExecutor, dict[str, str]]: + """Prepare an agent by adding handoff tools for the specified target agents. + + Args: + executor: The agent executor to prepare + target_agents: Map of executor IDs to target executors this agent can hand off to + + Returns: + Tuple of (updated executor, tool_targets map) + """ + agent = getattr(executor, "_agent", None) + if not isinstance(agent, ChatAgent): + return executor, {} + + cloned_agent = _clone_chat_agent(agent) + tool_targets = self._apply_auto_tools(cloned_agent, target_agents) + if tool_targets: + middleware = _AutoHandoffMiddleware(tool_targets) + existing_middleware = list(cloned_agent.middleware or []) + existing_middleware.append(middleware) + cloned_agent.middleware = existing_middleware + + new_executor = AgentExecutor( + cloned_agent, + agent_thread=getattr(executor, "_agent_thread", None), + output_response=getattr(executor, "_output_response", False), + id=executor.id, + ) + return new_executor, tool_targets + + # endregion Internal Helper Methods diff --git a/python/packages/core/agent_framework/_workflows/_participant_utils.py b/python/packages/core/agent_framework/_workflows/_participant_utils.py index ac632a917d..a6f1cf2a84 100644 --- a/python/packages/core/agent_framework/_workflows/_participant_utils.py +++ b/python/packages/core/agent_framework/_workflows/_participant_utils.py @@ -47,15 +47,13 @@ def wrap_participant(participant: AgentProtocol | Executor, *, executor_id: str """Represent `participant` as an `Executor`.""" if isinstance(participant, Executor): return participant + if not isinstance(participant, AgentProtocol): raise TypeError( f"Participants must implement AgentProtocol or be Executor instances. Got {type(participant).__name__}." ) - name = getattr(participant, "name", None) - if executor_id is None: - if not name: - raise ValueError("Agent participants must expose a stable 'name' attribute.") - executor_id = str(name) + + executor_id = executor_id or participant.display_name return AgentExecutor(participant, id=executor_id) diff --git a/python/packages/core/agent_framework/_workflows/_sequential.py b/python/packages/core/agent_framework/_workflows/_sequential.py index 0f849926b6..24ae4cda29 100644 --- a/python/packages/core/agent_framework/_workflows/_sequential.py +++ b/python/packages/core/agent_framework/_workflows/_sequential.py @@ -154,6 +154,9 @@ def register_participants( "Cannot mix .participants([...]) and .register_participants() in the same builder instance." ) + if self._participant_factories: + raise ValueError("register_participants() has already been called on this builder instance.") + if not participant_factories: raise ValueError("participant_factories cannot be empty") @@ -171,6 +174,9 @@ def participants(self, participants: Sequence[AgentProtocol | Executor]) -> "Seq "Cannot mix .participants([...]) and .register_participants() in the same builder instance." ) + if self._participants: + raise ValueError("participants() has already been called on this builder instance.") + if not participants: raise ValueError("participants cannot be empty") diff --git a/python/packages/core/agent_framework/openai/_chat_client.py b/python/packages/core/agent_framework/openai/_chat_client.py index 9222cae8b3..7f0feb0fc7 100644 --- a/python/packages/core/agent_framework/openai/_chat_client.py +++ b/python/packages/core/agent_framework/openai/_chat_client.py @@ -162,6 +162,7 @@ def _prepare_options(self, messages: MutableSequence[ChatMessage], chat_options: exclude={ "type", "instructions", # included as system message + "allow_multiple_tool_calls", # handled separately } ) @@ -174,6 +175,8 @@ def _prepare_options(self, messages: MutableSequence[ChatMessage], chat_options: if web_search_options: options_dict["web_search_options"] = web_search_options options_dict["tools"] = self._chat_to_tool_spec(chat_options.tools) + if chat_options.allow_multiple_tool_calls is not None: + options_dict["parallel_tool_calls"] = chat_options.allow_multiple_tool_calls if not options_dict.get("tools", None): options_dict.pop("tools", None) options_dict.pop("parallel_tool_calls", None) diff --git a/python/packages/core/tests/workflow/test_handoff.py b/python/packages/core/tests/workflow/test_handoff.py index 077cb7321e..d0d5092323 100644 --- a/python/packages/core/tests/workflow/test_handoff.py +++ b/python/packages/core/tests/workflow/test_handoff.py @@ -223,7 +223,7 @@ async def test_handoff_preserves_complex_additional_properties(complex_metadata: workflow = ( HandoffBuilder(participants=[triage, specialist]) - .set_coordinator("triage") + .set_coordinator(triage) .with_termination_condition(lambda conv: sum(1 for msg in conv if msg.role == Role.USER) >= 2) .build() ) @@ -286,7 +286,7 @@ async def test_tool_call_handoff_detection_with_text_hint(): triage = _RecordingAgent(name="triage", handoff_to="specialist", text_handoff=True) specialist = _RecordingAgent(name="specialist") - workflow = HandoffBuilder(participants=[triage, specialist]).set_coordinator("triage").build() + workflow = HandoffBuilder(participants=[triage, specialist]).set_coordinator(triage).build() await _drain(workflow.run_stream("Package arrived broken")) @@ -301,7 +301,7 @@ async def test_autonomous_interaction_mode_yields_output_without_user_request(): workflow = ( HandoffBuilder(participants=[triage, specialist]) - .set_coordinator("triage") + .set_coordinator(triage) .with_interaction_mode("autonomous", autonomous_turn_limit=1) .build() ) @@ -433,13 +433,13 @@ def test_build_fails_without_coordinator(): triage = _RecordingAgent(name="triage") specialist = _RecordingAgent(name="specialist") - with pytest.raises(ValueError, match="coordinator must be defined before build"): + with pytest.raises(ValueError, match=r"Must call set_coordinator\(...\) before building the workflow."): HandoffBuilder(participants=[triage, specialist]).build() def test_build_fails_without_participants(): """Verify that build() raises ValueError when no participants are provided.""" - with pytest.raises(ValueError, match="No participants provided"): + with pytest.raises(ValueError, match="No participants or participant_factories have been configured."): HandoffBuilder().build() @@ -610,7 +610,7 @@ async def test_return_to_previous_enabled(): workflow = ( HandoffBuilder(participants=[triage, specialist_a, specialist_b]) - .set_coordinator("triage") + .set_coordinator(triage) .enable_return_to_previous(True) .with_termination_condition(lambda conv: sum(1 for m in conv if m.role == Role.USER) >= 3) .build() @@ -643,7 +643,7 @@ def test_handoff_builder_sets_start_executor_once(monkeypatch: pytest.MonkeyPatc workflow = ( HandoffBuilder(participants=[coordinator, specialist]) - .set_coordinator("coordinator") + .set_coordinator(coordinator) .with_termination_condition(lambda conv: len(conv) > 0) .build() ) @@ -703,7 +703,7 @@ async def test_handoff_builder_with_request_info(): # Build workflow with request info enabled workflow = ( HandoffBuilder(participants=[coordinator, specialist]) - .set_coordinator("coordinator") + .set_coordinator(coordinator) .with_termination_condition(lambda conv: len([m for m in conv if m.role == Role.USER]) >= 1) .with_request_info() .build() @@ -782,6 +782,497 @@ async def test_return_to_previous_state_serialization(): assert coordinator2._current_agent_id == "specialist_a", "Current agent should be restored from checkpoint" # type: ignore[reportPrivateUsage] +# region Participant Factory Tests + + +def test_handoff_builder_rejects_empty_participant_factories(): + """Test that HandoffBuilder rejects empty participant_factories dictionary.""" + # Empty factories are rejected immediately when calling participant_factories() + with pytest.raises(ValueError, match=r"participant_factories cannot be empty"): + HandoffBuilder().participant_factories({}) + + with pytest.raises(ValueError, match=r"No participants or participant_factories have been configured"): + HandoffBuilder(participant_factories={}).build() + + +def test_handoff_builder_rejects_mixing_participants_and_factories(): + """Test that mixing participants and participant_factories in __init__ raises an error.""" + triage = _RecordingAgent(name="triage") + with pytest.raises(ValueError, match="Cannot mix .participants"): + HandoffBuilder(participants=[triage], participant_factories={"triage": lambda: triage}) + + +def test_handoff_builder_rejects_mixing_participants_and_participant_factories_methods(): + """Test that mixing .participants() and .participant_factories() raises an error.""" + triage = _RecordingAgent(name="triage") + + # Case 1: participants first, then participant_factories + with pytest.raises(ValueError, match="Cannot mix .participants"): + HandoffBuilder(participants=[triage]).participant_factories({ + "specialist": lambda: _RecordingAgent(name="specialist") + }) + + # Case 2: participant_factories first, then participants + with pytest.raises(ValueError, match="Cannot mix .participants"): + HandoffBuilder(participant_factories={"triage": lambda: triage}).participants([ + _RecordingAgent(name="specialist") + ]) + + # Case 3: participants(), then participant_factories() + with pytest.raises(ValueError, match="Cannot mix .participants"): + HandoffBuilder().participants([triage]).participant_factories({ + "specialist": lambda: _RecordingAgent(name="specialist") + }) + + # Case 4: participant_factories(), then participants() + with pytest.raises(ValueError, match="Cannot mix .participants"): + HandoffBuilder().participant_factories({"triage": lambda: triage}).participants([ + _RecordingAgent(name="specialist") + ]) + + # Case 5: mix during initialization + with pytest.raises(ValueError, match="Cannot mix .participants"): + HandoffBuilder( + participants=[triage], participant_factories={"specialist": lambda: _RecordingAgent(name="specialist")} + ) + + +def test_handoff_builder_rejects_multiple_calls_to_participant_factories(): + """Test that multiple calls to .participant_factories() raises an error.""" + with pytest.raises(ValueError, match=r"participant_factories\(\) has already been called"): + ( + HandoffBuilder() + .participant_factories({"agent1": lambda: _RecordingAgent(name="agent1")}) + .participant_factories({"agent2": lambda: _RecordingAgent(name="agent2")}) + ) + + +def test_handoff_builder_rejects_multiple_calls_to_participants(): + """Test that multiple calls to .participants() raises an error.""" + with pytest.raises(ValueError, match="participants have already been assigned"): + (HandoffBuilder().participants([_RecordingAgent(name="agent1")]).participants([_RecordingAgent(name="agent2")])) + + +def test_handoff_builder_rejects_duplicate_factories(): + """Test that multiple calls to participant_factories are rejected.""" + factories = { + "triage": lambda: _RecordingAgent(name="triage"), + "specialist": lambda: _RecordingAgent(name="specialist"), + } + + # Multiple calls to participant_factories should fail + builder = HandoffBuilder(participant_factories=factories) + with pytest.raises(ValueError, match=r"participant_factories\(\) has already been called"): + builder.participant_factories({"triage": lambda: _RecordingAgent(name="triage2")}) + + +def test_handoff_builder_rejects_instance_coordinator_with_factories(): + """Test that using an agent instance for set_coordinator when using factories raises an error.""" + + def create_triage() -> _RecordingAgent: + return _RecordingAgent(name="triage") + + def create_specialist() -> _RecordingAgent: + return _RecordingAgent(name="specialist") + + # Create an agent instance + coordinator_instance = _RecordingAgent(name="coordinator") + + with pytest.raises(ValueError, match=r"Call participants\(\.\.\.\) before coordinator\(\.\.\.\)"): + ( + HandoffBuilder( + participant_factories={"triage": create_triage, "specialist": create_specialist} + ).set_coordinator(coordinator_instance) # Instance, not factory name + ) + + +def test_handoff_builder_rejects_factory_name_coordinator_with_instances(): + """Test that using a factory name for set_coordinator when using instances raises an error.""" + triage = _RecordingAgent(name="triage") + specialist = _RecordingAgent(name="specialist") + + with pytest.raises( + ValueError, match="coordinator factory name 'triage' is not part of the participant_factories list" + ): + ( + HandoffBuilder(participants=[triage, specialist]).set_coordinator( + "triage" + ) # String factory name, not instance + ) + + +def test_handoff_builder_rejects_mixed_types_in_add_handoff_source(): + """Test that add_handoff rejects factory name source with instance-based participants.""" + triage = _RecordingAgent(name="triage") + specialist = _RecordingAgent(name="specialist") + + with pytest.raises(TypeError, match="Cannot mix factory names \\(str\\) and AgentProtocol/Executor instances"): + ( + HandoffBuilder(participants=[triage, specialist]) + .set_coordinator(triage) + .add_handoff("triage", specialist) # String source with instance participants + ) + + +def test_handoff_builder_accepts_all_factory_names_in_add_handoff(): + """Test that add_handoff accepts all factory names when using participant_factories.""" + + def create_triage() -> _RecordingAgent: + return _RecordingAgent(name="triage") + + def create_specialist_a() -> _RecordingAgent: + return _RecordingAgent(name="specialist_a") + + def create_specialist_b() -> _RecordingAgent: + return _RecordingAgent(name="specialist_b") + + # This should work - all strings with participant_factories + builder = ( + HandoffBuilder( + participant_factories={ + "triage": create_triage, + "specialist_a": create_specialist_a, + "specialist_b": create_specialist_b, + } + ) + .set_coordinator("triage") + .add_handoff("triage", ["specialist_a", "specialist_b"]) + ) + + workflow = builder.build() + assert "triage" in workflow.executors + assert "specialist_a" in workflow.executors + assert "specialist_b" in workflow.executors + + +def test_handoff_builder_accepts_all_instances_in_add_handoff(): + """Test that add_handoff accepts all instances when using participants.""" + triage = _RecordingAgent(name="triage", handoff_to="specialist_a") + specialist_a = _RecordingAgent(name="specialist_a") + specialist_b = _RecordingAgent(name="specialist_b") + + # This should work - all instances with participants + builder = ( + HandoffBuilder(participants=[triage, specialist_a, specialist_b]) + .set_coordinator(triage) + .add_handoff(triage, [specialist_a, specialist_b]) + ) + + workflow = builder.build() + assert "triage" in workflow.executors + assert "specialist_a" in workflow.executors + assert "specialist_b" in workflow.executors + + +async def test_handoff_with_participant_factories(): + """Test workflow creation using participant_factories.""" + call_count = 0 + + def create_triage() -> _RecordingAgent: + nonlocal call_count + call_count += 1 + return _RecordingAgent(name="triage", handoff_to="specialist") + + def create_specialist() -> _RecordingAgent: + nonlocal call_count + call_count += 1 + return _RecordingAgent(name="specialist") + + workflow = ( + HandoffBuilder(participant_factories={"triage": create_triage, "specialist": create_specialist}) + .set_coordinator("triage") + .with_termination_condition(lambda conv: sum(1 for m in conv if m.role == Role.USER) >= 2) + .build() + ) + + # Factories should be called during build + assert call_count == 2 + + events = await _drain(workflow.run_stream("Need help")) + requests = [ev for ev in events if isinstance(ev, RequestInfoEvent)] + assert requests + + # Follow-up message + events = await _drain(workflow.send_responses_streaming({requests[-1].request_id: "More details"})) + outputs = [ev for ev in events if isinstance(ev, WorkflowOutputEvent)] + assert outputs + + +async def test_handoff_participant_factories_reusable_builder(): + """Test that the builder can be reused to build multiple workflows with factories.""" + call_count = 0 + + def create_triage() -> _RecordingAgent: + nonlocal call_count + call_count += 1 + return _RecordingAgent(name="triage", handoff_to="specialist") + + def create_specialist() -> _RecordingAgent: + nonlocal call_count + call_count += 1 + return _RecordingAgent(name="specialist") + + builder = HandoffBuilder( + participant_factories={"triage": create_triage, "specialist": create_specialist} + ).set_coordinator("triage") + + # Build first workflow + wf1 = builder.build() + assert call_count == 2 + + # Build second workflow + wf2 = builder.build() + assert call_count == 4 + + # Verify that the two workflows have different agent instances + assert wf1.executors["triage"] is not wf2.executors["triage"] + assert wf1.executors["specialist"] is not wf2.executors["specialist"] + + +async def test_handoff_with_participant_factories_and_add_handoff(): + """Test that .add_handoff() works correctly with participant_factories.""" + + def create_triage() -> _RecordingAgent: + return _RecordingAgent(name="triage", handoff_to="specialist_a") + + def create_specialist_a() -> _RecordingAgent: + return _RecordingAgent(name="specialist_a", handoff_to="specialist_b") + + def create_specialist_b() -> _RecordingAgent: + return _RecordingAgent(name="specialist_b") + + workflow = ( + HandoffBuilder( + participant_factories={ + "triage": create_triage, + "specialist_a": create_specialist_a, + "specialist_b": create_specialist_b, + } + ) + .set_coordinator("triage") + .add_handoff("triage", ["specialist_a", "specialist_b"]) + .add_handoff("specialist_a", "specialist_b") + .with_termination_condition(lambda conv: sum(1 for m in conv if m.role == Role.USER) >= 3) + .build() + ) + + # Start conversation - triage hands off to specialist_a + events = await _drain(workflow.run_stream("Initial request")) + requests = [ev for ev in events if isinstance(ev, RequestInfoEvent)] + assert requests + + # Verify specialist_a executor exists and was called + assert "specialist_a" in workflow.executors + + # Second user message - specialist_a hands off to specialist_b + events = await _drain(workflow.send_responses_streaming({requests[-1].request_id: "Need escalation"})) + requests = [ev for ev in events if isinstance(ev, RequestInfoEvent)] + assert requests + + # Verify specialist_b executor exists + assert "specialist_b" in workflow.executors + + +async def test_handoff_participant_factories_with_checkpointing(): + """Test checkpointing with participant_factories.""" + from agent_framework._workflows._checkpoint import InMemoryCheckpointStorage + + storage = InMemoryCheckpointStorage() + + def create_triage() -> _RecordingAgent: + return _RecordingAgent(name="triage", handoff_to="specialist") + + def create_specialist() -> _RecordingAgent: + return _RecordingAgent(name="specialist") + + workflow = ( + HandoffBuilder(participant_factories={"triage": create_triage, "specialist": create_specialist}) + .set_coordinator("triage") + .with_checkpointing(storage) + .with_termination_condition(lambda conv: sum(1 for m in conv if m.role == Role.USER) >= 2) + .build() + ) + + # Run workflow and capture output + events = await _drain(workflow.run_stream("checkpoint test")) + requests = [ev for ev in events if isinstance(ev, RequestInfoEvent)] + assert requests + + events = await _drain(workflow.send_responses_streaming({requests[-1].request_id: "follow up"})) + outputs = [ev for ev in events if isinstance(ev, WorkflowOutputEvent)] + assert outputs, "Should have workflow output after termination condition is met" + + # List checkpoints - just verify they were created + checkpoints = await storage.list_checkpoints() + assert checkpoints, "Checkpoints should be created during workflow execution" + + +def test_handoff_set_coordinator_with_factory_name(): + """Test that set_coordinator accepts factory name as string.""" + + def create_triage() -> _RecordingAgent: + return _RecordingAgent(name="triage") + + def create_specialist() -> _RecordingAgent: + return _RecordingAgent(name="specialist") + + builder = HandoffBuilder( + participant_factories={"triage": create_triage, "specialist": create_specialist} + ).set_coordinator("triage") + + workflow = builder.build() + assert "triage" in workflow.executors + + +def test_handoff_add_handoff_with_factory_names(): + """Test that add_handoff accepts factory names as strings.""" + + def create_triage() -> _RecordingAgent: + return _RecordingAgent(name="triage", handoff_to="specialist_a") + + def create_specialist_a() -> _RecordingAgent: + return _RecordingAgent(name="specialist_a") + + def create_specialist_b() -> _RecordingAgent: + return _RecordingAgent(name="specialist_b") + + builder = ( + HandoffBuilder( + participant_factories={ + "triage": create_triage, + "specialist_a": create_specialist_a, + "specialist_b": create_specialist_b, + } + ) + .set_coordinator("triage") + .add_handoff("triage", ["specialist_a", "specialist_b"]) + ) + + workflow = builder.build() + assert "triage" in workflow.executors + assert "specialist_a" in workflow.executors + assert "specialist_b" in workflow.executors + + +async def test_handoff_participant_factories_autonomous_mode(): + """Test autonomous mode with participant_factories.""" + + def create_triage() -> _RecordingAgent: + return _RecordingAgent(name="triage", handoff_to="specialist") + + def create_specialist() -> _RecordingAgent: + return _RecordingAgent(name="specialist") + + workflow = ( + HandoffBuilder(participant_factories={"triage": create_triage, "specialist": create_specialist}) + .set_coordinator("triage") + .with_interaction_mode("autonomous", autonomous_turn_limit=2) + .build() + ) + + events = await _drain(workflow.run_stream("Issue")) + outputs = [ev for ev in events if isinstance(ev, WorkflowOutputEvent)] + assert outputs, "Autonomous mode should yield output" + requests = [ev for ev in events if isinstance(ev, RequestInfoEvent)] + assert not requests, "Autonomous mode should not request user input" + + +async def test_handoff_participant_factories_with_request_info(): + """Test that .with_request_info() works with participant_factories.""" + + def create_triage() -> _RecordingAgent: + return _RecordingAgent(name="triage") + + def create_specialist() -> _RecordingAgent: + return _RecordingAgent(name="specialist") + + builder = ( + HandoffBuilder(participant_factories={"triage": create_triage, "specialist": create_specialist}) + .set_coordinator("triage") + .with_request_info(agents=["triage"]) + ) + + workflow = builder.build() + assert "triage" in workflow.executors + + +def test_handoff_participant_factories_invalid_coordinator_name(): + """Test that set_coordinator raises error for non-existent factory name.""" + + def create_triage() -> _RecordingAgent: + return _RecordingAgent(name="triage") + + with pytest.raises( + ValueError, match="coordinator factory name 'nonexistent' is not part of the participant_factories list" + ): + (HandoffBuilder(participant_factories={"triage": create_triage}).set_coordinator("nonexistent").build()) + + +def test_handoff_participant_factories_invalid_handoff_target(): + """Test that add_handoff raises error for non-existent target factory name.""" + + def create_triage() -> _RecordingAgent: + return _RecordingAgent(name="triage") + + def create_specialist() -> _RecordingAgent: + return _RecordingAgent(name="specialist") + + with pytest.raises(ValueError, match="Target factory name 'nonexistent' is not in the participant_factories list"): + ( + HandoffBuilder(participant_factories={"triage": create_triage, "specialist": create_specialist}) + .set_coordinator("triage") + .add_handoff("triage", "nonexistent") + .build() + ) + + +async def test_handoff_participant_factories_enable_return_to_previous(): + """Test return_to_previous works with participant_factories.""" + + def create_triage() -> _RecordingAgent: + return _RecordingAgent(name="triage", handoff_to="specialist_a") + + def create_specialist_a() -> _RecordingAgent: + return _RecordingAgent(name="specialist_a", handoff_to="specialist_b") + + def create_specialist_b() -> _RecordingAgent: + return _RecordingAgent(name="specialist_b") + + workflow = ( + HandoffBuilder( + participant_factories={ + "triage": create_triage, + "specialist_a": create_specialist_a, + "specialist_b": create_specialist_b, + } + ) + .set_coordinator("triage") + .add_handoff("triage", ["specialist_a", "specialist_b"]) + .add_handoff("specialist_a", "specialist_b") + .enable_return_to_previous(True) + .with_termination_condition(lambda conv: sum(1 for m in conv if m.role == Role.USER) >= 3) + .build() + ) + + # Start conversation - triage hands off to specialist_a + events = await _drain(workflow.run_stream("Initial request")) + requests = [ev for ev in events if isinstance(ev, RequestInfoEvent)] + assert requests + + # Second user message - specialist_a hands off to specialist_b + events = await _drain(workflow.send_responses_streaming({requests[-1].request_id: "Need escalation"})) + requests = [ev for ev in events if isinstance(ev, RequestInfoEvent)] + assert requests + + # Third user message - should route back to specialist_b (return to previous) + events = await _drain(workflow.send_responses_streaming({requests[-1].request_id: "Follow up"})) + outputs = [ev for ev in events if isinstance(ev, WorkflowOutputEvent)] + assert outputs or [ev for ev in events if isinstance(ev, RequestInfoEvent)] + + +# endregion Participant Factory Tests + + async def test_handoff_user_input_request_checkpoint_excludes_conversation(): """Test that HandoffUserInputRequest serialization excludes conversation to prevent duplication. diff --git a/python/samples/getting_started/workflows/README.md b/python/samples/getting_started/workflows/README.md index 0cfcd85cd2..4077c117a5 100644 --- a/python/samples/getting_started/workflows/README.md +++ b/python/samples/getting_started/workflows/README.md @@ -86,7 +86,6 @@ Once comfortable with these, explore the rest of the samples below. | ConcurrentBuilder Request Info | [human-in-the-loop/concurrent_request_info.py](./human-in-the-loop/concurrent_request_info.py) | Review concurrent agent outputs before aggregation using `.with_request_info()` on ConcurrentBuilder | | GroupChatBuilder Request Info | [human-in-the-loop/group_chat_request_info.py](./human-in-the-loop/group_chat_request_info.py) | Steer group discussions with periodic guidance using `.with_request_info()` on GroupChatBuilder | - ### tool-approval Tool approval samples demonstrate using `@ai_function(approval_mode="always_require")` to gate sensitive tool executions with human approval. These work with the high-level builder APIs. @@ -120,6 +119,7 @@ For additional observability samples in Agent Framework, see the [observability | Handoff (Specialist-to-Specialist) | [orchestration/handoff_specialist_to_specialist.py](./orchestration/handoff_specialist_to_specialist.py) | Multi-tier routing: specialists can hand off to other specialists using `.add_handoff()` fluent API | | Handoff (Return-to-Previous) | [orchestration/handoff_return_to_previous.py](./orchestration/handoff_return_to_previous.py) | Return-to-previous routing: after user input, routes back to the previous specialist instead of coordinator using `.enable_return_to_previous()` | | Handoff (Autonomous) | [orchestration/handoff_autonomous.py](./orchestration/handoff_autonomous.py) | Autonomous mode: specialists iterate independently until invoking a handoff tool using `.with_interaction_mode("autonomous", autonomous_turn_limit=N)` | +| Handoff (Participant Factory) | [orchestration/handoff_participant_factory.py](./orchestration/handoff_participant_factory.py) | Use participant factories for state isolation between workflow instances | | Magentic Workflow (Multi-Agent) | [orchestration/magentic.py](./orchestration/magentic.py) | Orchestrate multiple agents with Magentic manager and streaming | | Magentic + Human Plan Review | [orchestration/magentic_human_plan_update.py](./orchestration/magentic_human_plan_update.py) | Human reviews/updates the plan before execution | | Magentic + Human Stall Intervention | [orchestration/magentic_human_replan.py](./orchestration/magentic_human_replan.py) | Human intervenes when workflow stalls with `with_human_input_on_stall()` | diff --git a/python/samples/getting_started/workflows/orchestration/handoff_autonomous.py b/python/samples/getting_started/workflows/orchestration/handoff_autonomous.py index 68bf789083..154f768d09 100644 --- a/python/samples/getting_started/workflows/orchestration/handoff_autonomous.py +++ b/python/samples/getting_started/workflows/orchestration/handoff_autonomous.py @@ -2,14 +2,15 @@ import asyncio import logging -from collections.abc import AsyncIterable from typing import cast from agent_framework import ( + AgentRunResponseUpdate, AgentRunUpdateEvent, ChatAgent, ChatMessage, HandoffBuilder, + HostedWebSearchTool, WorkflowEvent, WorkflowOutputEvent, ) @@ -44,31 +45,29 @@ def create_agents( """Create coordinator and specialists for autonomous iteration.""" coordinator = chat_client.create_agent( instructions=( - "You are a coordinator. Route user requests to either research_agent or summary_agent. " - "Always call exactly one handoff tool with a short routing acknowledgement. " - "If unsure, default to research_agent. Never request information yourself. " - "After a specialist hands off back to you, provide a concise final summary and stop." + "You are a coordinator. You break down a user query into a research task and a summary task. " + "Assign the two tasks to the appropriate specialists, one after the other." ), name="coordinator", ) research_agent = chat_client.create_agent( instructions=( - "You are a research specialist that explores topics thoroughly. " + "You are a research specialist that explores topics thoroughly on the Microsoft Learn Site." "When given a research task, break it down into multiple aspects and explore each one. " - "Continue your research across multiple responses - don't try to finish everything in one response. " - "After each response, think about what else needs to be explored. " - "When you have covered the topic comprehensively (at least 3-4 different aspects), " - "call the handoff tool to return to coordinator with your findings. " - "Keep each individual response focused on one aspect." + "Continue your research across multiple responses - don't try to finish everything in one " + "response. After each response, think about what else needs to be explored. When you have " + "covered the topic comprehensively (at least 3-4 different aspects), return control to the " + "coordinator. Keep each individual response focused on one aspect." ), name="research_agent", + tools=[HostedWebSearchTool()], ) summary_agent = chat_client.create_agent( instructions=( - "You summarize research findings. Provide a concise, well-organized summary. " - "When done, hand off to coordinator." + "You summarize research findings. Provide a concise, well-organized summary. When done, return " + "control to the coordinator." ), name="summary_agent", ) @@ -76,25 +75,29 @@ def create_agents( return coordinator, research_agent, summary_agent -async def _drain(stream: AsyncIterable[WorkflowEvent]) -> list[WorkflowEvent]: - """Collect all events from an async stream.""" - return [event async for event in stream] +last_response_id: str | None = None -def _print_conversation(events: list[WorkflowEvent]) -> None: +def _display_event(event: WorkflowEvent) -> None: """Print the final conversation snapshot from workflow output events.""" - for event in events: - if isinstance(event, AgentRunUpdateEvent): - print(event.data, flush=True, end="") - elif isinstance(event, WorkflowOutputEvent): - conversation = cast(list[ChatMessage], event.data) - print("\n=== Final Conversation (Autonomous with Iteration) ===") - for message in conversation: - speaker = message.author_name or message.role.value - text_preview = message.text[:200] + "..." if len(message.text) > 200 else message.text - print(f"- {speaker}: {text_preview}") - print(f"\nTotal messages: {len(conversation)}") - print("=====================================================") + if isinstance(event, AgentRunUpdateEvent) and event.data: + update: AgentRunResponseUpdate = event.data + if not update.text: + return + global last_response_id + if update.response_id != last_response_id: + last_response_id = update.response_id + print(f"\n- {update.author_name}: ", flush=True, end="") + print(event.data, flush=True, end="") + elif isinstance(event, WorkflowOutputEvent): + conversation = cast(list[ChatMessage], event.data) + print("\n=== Final Conversation (Autonomous with Iteration) ===") + for message in conversation: + speaker = message.author_name or message.role.value + text_preview = message.text[:200] + "..." if len(message.text) > 200 else message.text + print(f"- {speaker}: {text_preview}") + print(f"\nTotal messages: {len(conversation)}") + print("=====================================================") async def main() -> None: @@ -122,12 +125,10 @@ async def main() -> None: .build() ) - initial_request = "Research the key benefits and challenges of renewable energy adoption." - print("Initial request:", initial_request) - print("\nExpecting multiple iterations from the research agent...\n") - - events = await _drain(workflow.run_stream(initial_request)) - _print_conversation(events) + request = "Perform a comprehensive research on Microsoft Agent Framework." + print("Request:", request) + async for event in workflow.run_stream(request): + _display_event(event) """ Expected behavior: diff --git a/python/samples/getting_started/workflows/orchestration/handoff_participant_factory.py b/python/samples/getting_started/workflows/orchestration/handoff_participant_factory.py new file mode 100644 index 0000000000..1b676c5ffd --- /dev/null +++ b/python/samples/getting_started/workflows/orchestration/handoff_participant_factory.py @@ -0,0 +1,265 @@ +# Copyright (c) Microsoft. All rights reserved. + +import asyncio +import logging +from collections.abc import AsyncIterable +from typing import cast + +from agent_framework import ( + ChatAgent, + ChatMessage, + HandoffBuilder, + HandoffUserInputRequest, + RequestInfoEvent, + Role, + Workflow, + WorkflowEvent, + WorkflowOutputEvent, + ai_function, +) +from agent_framework.azure import AzureOpenAIChatClient +from azure.identity import AzureCliCredential +from typing import Annotated + +logging.basicConfig(level=logging.ERROR) + +"""Sample: Autonomous handoff workflow with agent factory. + +This sample demonstrates how to use participant factories in HandoffBuilder to create +agents dynamically. + +Using participant factories allows you to set up proper state isolation between workflow +instances created by the same builder. This is particularly useful when you need to handle +requests or tasks in parallel with stateful participants. + +Routing Pattern: + User -> Coordinator -> Specialist (iterates N times) -> Handoff -> Final Output + +Prerequisites: + - `az login` (Azure CLI authentication) + - Environment variables for AzureOpenAIChatClient (AZURE_OPENAI_ENDPOINT, etc.) + +Key Concepts: + - Participant factories: create agents via factory functions for isolation +""" + + +@ai_function +def process_refund(order_number: Annotated[str, "Order number to process refund for"]) -> str: + """Simulated function to process a refund for a given order number.""" + return f"Refund processed successfully for order {order_number}." + + +@ai_function +def check_order_status(order_number: Annotated[str, "Order number to check status for"]) -> str: + """Simulated function to check the status of a given order number.""" + return f"Order {order_number} is currently being processed and will ship in 2 business days." + + +@ai_function +def process_return(order_number: Annotated[str, "Order number to process return for"]) -> str: + """Simulated function to process a return for a given order number.""" + return f"Return initiated successfully for order {order_number}. You will receive return instructions via email." + + +def create_triage_agent() -> ChatAgent: + """Factory function to create a triage agent instance.""" + return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent( + instructions=( + "You are frontline support triage. Route customer issues to the appropriate specialist agents " + "based on the problem described." + ), + name="triage_agent", + ) + + +def create_refund_agent() -> ChatAgent: + """Factory function to create a refund agent instance.""" + return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent( + instructions="You process refund requests.", + name="refund_agent", + # In a real application, an agent can have multiple tools; here we keep it simple + tools=[process_refund], + ) + + +def create_order_status_agent() -> ChatAgent: + """Factory function to create an order status agent instance.""" + return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent( + instructions="You handle order and shipping inquiries.", + name="order_agent", + # In a real application, an agent can have multiple tools; here we keep it simple + tools=[check_order_status], + ) + + +def create_return_agent() -> ChatAgent: + """Factory function to create a return agent instance.""" + return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent( + instructions="You manage product return requests.", + name="return_agent", + # In a real application, an agent can have multiple tools; here we keep it simple + tools=[process_return], + ) + + +async def _drain(stream: AsyncIterable[WorkflowEvent]) -> list[WorkflowEvent]: + """Collect all events from an async stream into a list. + + This helper drains the workflow's event stream so we can process events + synchronously after each workflow step completes. + + Args: + stream: Async iterable of WorkflowEvent + + Returns: + List of all events from the stream + """ + return [event async for event in stream] + + +def _handle_events(events: list[WorkflowEvent]) -> list[RequestInfoEvent]: + """Process workflow events and extract any pending user input requests. + + This function inspects each event type and: + - Prints workflow status changes (IDLE, IDLE_WITH_PENDING_REQUESTS, etc.) + - Displays final conversation snapshots when workflow completes + - Prints user input request prompts + - Collects all RequestInfoEvent instances for response handling + + Args: + events: List of WorkflowEvent to process + + Returns: + List of RequestInfoEvent representing pending user input requests + """ + requests: list[RequestInfoEvent] = [] + + for event in events: + # WorkflowOutputEvent: Contains the final conversation when workflow terminates + if isinstance(event, WorkflowOutputEvent): + conversation = cast(list[ChatMessage], event.data) + if isinstance(conversation, list): + print("\n=== Final Conversation Snapshot ===") + for message in conversation: + speaker = message.author_name or message.role.value + print(f"- {speaker}: {message.text}") + print("===================================") + + # RequestInfoEvent: Workflow is requesting user input + elif isinstance(event, RequestInfoEvent): + if isinstance(event.data, HandoffUserInputRequest): + _print_agent_responses_since_last_user_message(event.data) + requests.append(event) + + return requests + + +def _print_agent_responses_since_last_user_message(request: HandoffUserInputRequest) -> None: + """Display agent responses since the last user message in a handoff request. + + The HandoffUserInputRequest contains the full conversation history so far, + allowing the user to see what's been discussed before providing their next input. + + Args: + request: The user input request containing conversation and prompt + """ + if not request.conversation: + raise RuntimeError("HandoffUserInputRequest missing conversation history.") + + # Reverse iterate to collect agent responses since last user message + agent_responses: list[ChatMessage] = [] + for message in request.conversation[::-1]: + if message.role == Role.USER: + break + agent_responses.append(message) + + # Print agent responses in original order + agent_responses.reverse() + for message in agent_responses: + speaker = message.author_name or message.role.value + print(f"- {speaker}: {message.text}") + + +async def _run_Workflow(workflow: Workflow, user_inputs: list[str]) -> None: + """Run the workflow with the given user input and display events.""" + print(f"- User: {user_inputs[0]}") + events = await _drain(workflow.run_stream(user_inputs[0])) + pending_requests = _handle_events(events) + + # Process the request/response cycle + # The workflow will continue requesting input until: + # 1. The termination condition is met (4 user messages in this case), OR + # 2. We run out of scripted responses + while pending_requests and user_inputs[1:]: + # Get the next scripted response + user_response = user_inputs.pop(1) + print(f"\n- User: {user_response}") + + # Send response(s) to all pending requests + # In this demo, there's typically one request per cycle, but the API supports multiple + responses = {req.request_id: user_response for req in pending_requests} + + # Send responses and get new events + # We use send_responses_streaming() to get events as they occur, allowing us to + # display agent responses in real-time and handle new requests as they arrive + events = await _drain(workflow.send_responses_streaming(responses)) + pending_requests = _handle_events(events) + + +async def main() -> None: + """Run the autonomous handoff workflow with participant factories.""" + # Build the handoff workflow using participant factories + workflow_builder = ( + HandoffBuilder( + name="Autonomous Handoff with Participant Factories", + participant_factories={ + "triage": create_triage_agent, + "refund": create_refund_agent, + "order_status": create_order_status_agent, + "return": create_return_agent, + }, + ) + .set_coordinator("triage") + .with_termination_condition( + # Custom termination: Check if the triage agent has provided a closing message. + # This looks for the last message being from triage_agent and containing "welcome", + # which indicates the conversation has concluded naturally. + lambda conversation: len(conversation) > 0 + and conversation[-1].author_name == "triage_agent" + and "welcome" in conversation[-1].text.lower() + ) + ) + + # Scripted user responses for reproducible demo + # In a console application, replace this with: + # user_input = input("Your response: ") + # or integrate with a UI/chat interface + user_inputs = [ + "Hello, I need assistance with my recent purchase.", + "My order 1234 arrived damaged and the packaging was destroyed. I'd like to return it.", + "Is my return being processed?", + "Thanks for resolving this.", + ] + + workflow_a = workflow_builder.build() + print("=== Running workflow_a ===") + await _run_Workflow(workflow_a, list(user_inputs)) + + workflow_b = workflow_builder.build() + print("=== Running workflow_b ===") + # Only provide the last two inputs to workflow_b to demonstrate state isolation + # The agents in this workflow have no prior context thus should not have knowledge of + # order 1234 or previous interactions. + await _run_Workflow(workflow_b, user_inputs[2:]) + """ + Expected behavior: + - workflow_a and workflow_b maintain separate states for their participants. + - Each workflow processes its requests independently without interference. + - workflow_a will answer the follow-up request based on its own conversation history, + while workflow_b will provide a general answer without prior context. + """ + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/python/samples/getting_started/workflows/orchestration/handoff_simple.py b/python/samples/getting_started/workflows/orchestration/handoff_simple.py index 6092083266..84b6e0f243 100644 --- a/python/samples/getting_started/workflows/orchestration/handoff_simple.py +++ b/python/samples/getting_started/workflows/orchestration/handoff_simple.py @@ -2,7 +2,7 @@ import asyncio from collections.abc import AsyncIterable -from typing import cast +from typing import Annotated, cast from agent_framework import ( ChatAgent, @@ -10,10 +10,12 @@ HandoffBuilder, HandoffUserInputRequest, RequestInfoEvent, + Role, WorkflowEvent, WorkflowOutputEvent, WorkflowRunState, WorkflowStatusEvent, + ai_function, ) from agent_framework.azure import AzureOpenAIChatClient from azure.identity import AzureCliCredential @@ -22,10 +24,10 @@ This sample demonstrates the basic handoff pattern where only the triage agent can route to specialists. Specialists cannot hand off to other specialists - after any -specialist responds, control returns to the user for the next input. +specialist responds, control returns to the user (via the triage agent) for the next input. Routing Pattern: - User → Triage Agent → Specialist → Back to User → Triage Agent → ... + User → Triage Agent → Specialist → Triage Agent → User → Triage Agent → ... This is the simplest handoff configuration, suitable for straightforward support scenarios where a triage agent dispatches to domain specialists, and each specialist @@ -39,12 +41,31 @@ Key Concepts: - Single-tier routing: Only triage agent has handoff capabilities - - Auto-registered handoff tools: HandoffBuilder creates tools automatically + - Auto-registered handoff tools: HandoffBuilder automatically creates handoff tools + for each participant, allowing the coordinator to transfer control to specialists - Termination condition: Controls when the workflow stops requesting user input - Request/response cycle: Workflow requests input, user responds, cycle continues """ +@ai_function +def process_refund(order_number: Annotated[str, "Order number to process refund for"]) -> str: + """Simulated function to process a refund for a given order number.""" + return f"Refund processed successfully for order {order_number}." + + +@ai_function +def check_order_status(order_number: Annotated[str, "Order number to check status for"]) -> str: + """Simulated function to check the status of a given order number.""" + return f"Order {order_number} is currently being processed and will ship in 2 business days." + + +@ai_function +def process_return(order_number: Annotated[str, "Order number to process return for"]) -> str: + """Simulated function to process a return for a given order number.""" + return f"Return initiated successfully for order {order_number}. You will receive return instructions via email." + + def create_agents(chat_client: AzureOpenAIChatClient) -> tuple[ChatAgent, ChatAgent, ChatAgent, ChatAgent]: """Create and configure the triage and specialist agents. @@ -54,51 +75,46 @@ def create_agents(chat_client: AzureOpenAIChatClient) -> tuple[ChatAgent, ChatAg - Signaling handoff by calling one of the explicit handoff tools exposed to it Specialist agents are invoked only when the triage agent explicitly hands off to them. - After a specialist responds, control returns to the triage agent. + After a specialist responds, control returns to the triage agent, which then prompts + the user for their next message. Returns: - Tuple of (triage_agent, refund_agent, order_agent, support_agent) + Tuple of (triage_agent, refund_agent, order_agent, return_agent) """ # Triage agent: Acts as the frontline dispatcher - # NOTE: The instructions explicitly tell it to call the correct handoff tool when routing. - # The HandoffBuilder intercepts these tool calls and routes to the matching specialist. - triage = chat_client.create_agent( + triage_agent = chat_client.create_agent( instructions=( - "You are frontline support triage. Read the latest user message and decide whether " - "to hand off to refund_agent, order_agent, or support_agent. Provide a brief natural-language " - "response for the user. When delegation is required, call the matching handoff tool " - "(`handoff_to_refund_agent`, `handoff_to_order_agent`, or `handoff_to_support_agent`)." + "You are frontline support triage. Route customer issues to the appropriate specialist agents " + "based on the problem described." ), name="triage_agent", ) # Refund specialist: Handles refund requests - refund = chat_client.create_agent( - instructions=( - "You handle refund workflows. Ask for any order identifiers you require and outline the refund steps." - ), + refund_agent = chat_client.create_agent( + instructions="You process refund requests.", name="refund_agent", + # In a real application, an agent can have multiple tools; here we keep it simple + tools=[process_refund], ) # Order/shipping specialist: Resolves delivery issues - order = chat_client.create_agent( - instructions=( - "You resolve shipping and fulfillment issues. Clarify the delivery problem and describe the actions " - "you will take to remedy it." - ), + order_agent = chat_client.create_agent( + instructions="You handle order and shipping inquiries.", name="order_agent", + # In a real application, an agent can have multiple tools; here we keep it simple + tools=[check_order_status], ) - # General support specialist: Fallback for other issues - support = chat_client.create_agent( - instructions=( - "You are a general support agent. Offer empathetic troubleshooting and gather missing details if the " - "issue does not match other specialists." - ), - name="support_agent", + # Return specialist: Handles return requests + return_agent = chat_client.create_agent( + instructions="You manage product return requests.", + name="return_agent", + # In a real application, an agent can have multiple tools; here we keep it simple + tools=[process_return], ) - return triage, refund, order, support + return triage_agent, refund_agent, order_agent, return_agent async def _drain(stream: AsyncIterable[WorkflowEvent]) -> list[WorkflowEvent]: @@ -139,7 +155,7 @@ def _handle_events(events: list[WorkflowEvent]) -> list[RequestInfoEvent]: WorkflowRunState.IDLE, WorkflowRunState.IDLE_WITH_PENDING_REQUESTS, }: - print(f"[status] {event.state.name}") + print(f"\n[Workflow Status] {event.state.name}") # WorkflowOutputEvent: Contains the final conversation when workflow terminates elif isinstance(event, WorkflowOutputEvent): @@ -154,14 +170,14 @@ def _handle_events(events: list[WorkflowEvent]) -> list[RequestInfoEvent]: # RequestInfoEvent: Workflow is requesting user input elif isinstance(event, RequestInfoEvent): if isinstance(event.data, HandoffUserInputRequest): - _print_handoff_request(event.data) + _print_agent_responses_since_last_user_message(event.data) requests.append(event) return requests -def _print_handoff_request(request: HandoffUserInputRequest) -> None: - """Display a user input request prompt with conversation context. +def _print_agent_responses_since_last_user_message(request: HandoffUserInputRequest) -> None: + """Display agent responses since the last user message in a handoff request. The HandoffUserInputRequest contains the full conversation history so far, allowing the user to see what's been discussed before providing their next input. @@ -169,11 +185,21 @@ def _print_handoff_request(request: HandoffUserInputRequest) -> None: Args: request: The user input request containing conversation and prompt """ - print("\n=== User Input Requested ===") - for message in request.conversation: + if not request.conversation: + raise RuntimeError("HandoffUserInputRequest missing conversation history.") + + # Reverse iterate to collect agent responses since last user message + agent_responses: list[ChatMessage] = [] + for message in request.conversation[::-1]: + if message.role == Role.USER: + break + agent_responses.append(message) + + # Print agent responses in original order + agent_responses.reverse() + for message in agent_responses: speaker = message.author_name or message.role.value print(f"- {speaker}: {message.text}") - print("============================") async def main() -> None: @@ -196,20 +222,26 @@ async def main() -> None: triage, refund, order, support = create_agents(chat_client) # Build the handoff workflow - # - participants: All agents that can participate (triage MUST be first or explicitly set as set_coordinator) - # - set_coordinator: The triage agent receives all user input first - # - with_termination_condition: Custom logic to stop the request/response loop - # Default is 10 user messages; here we terminate after 4 to match our scripted demo + # - participants: All agents that can participate in the workflow + # - set_coordinator: The triage agent is designated as the coordinator, which means + # it receives all user input first and orchestrates handoffs to specialists + # - with_termination_condition: Custom logic to stop the request/response loop. + # Without this, the default behavior continues requesting user input until max_turns + # is reached. Here we use a custom condition that checks if the conversation has ended + # naturally (when triage agent says something like "you're welcome"). workflow = ( HandoffBuilder( name="customer_support_handoff", participants=[triage, refund, order, support], ) - .set_coordinator("triage_agent") + .set_coordinator(triage) .with_termination_condition( - # Terminate after 4 user messages (initial + 3 scripted responses) - # Count only USER role messages to avoid counting agent responses - lambda conv: sum(1 for msg in conv if msg.role.value == "user") >= 4 + # Custom termination: Check if the triage agent has provided a closing message. + # This looks for the last message being from triage_agent and containing "welcome", + # which indicates the conversation has concluded naturally. + lambda conversation: len(conversation) > 0 + and conversation[-1].author_name == "triage_agent" + and "welcome" in conversation[-1].text.lower() ) .build() ) @@ -219,15 +251,16 @@ async def main() -> None: # user_input = input("Your response: ") # or integrate with a UI/chat interface scripted_responses = [ - "My order 1234 arrived damaged and the packaging was destroyed.", - "Yes, I'd like a refund if that's possible.", + "My order 1234 arrived damaged and the packaging was destroyed. I'd like to return it.", "Thanks for resolving this.", ] # Start the workflow with the initial user message # run_stream() returns an async iterator of WorkflowEvent - print("\n[Starting workflow with initial user message...]") - events = await _drain(workflow.run_stream("Hello, I need assistance with my recent purchase.")) + print("[Starting workflow with initial user message...]\n") + initial_message = "Hello, I need assistance with my recent purchase." + print(f"- User: {initial_message}") + events = await _drain(workflow.run_stream(initial_message)) pending_requests = _handle_events(events) # Process the request/response cycle @@ -237,13 +270,15 @@ async def main() -> None: while pending_requests and scripted_responses: # Get the next scripted response user_response = scripted_responses.pop(0) - print(f"\n[User responding: {user_response}]") + print(f"\n- User: {user_response}") # Send response(s) to all pending requests # In this demo, there's typically one request per cycle, but the API supports multiple responses = {req.request_id: user_response for req in pending_requests} # Send responses and get new events + # We use send_responses_streaming() to get events as they occur, allowing us to + # display agent responses in real-time and handle new requests as they arrive events = await _drain(workflow.send_responses_streaming(responses)) pending_requests = _handle_events(events) @@ -252,84 +287,30 @@ async def main() -> None: [Starting workflow with initial user message...] - === User Input Requested === - - user: Hello, I need assistance with my recent purchase. - - triage_agent: I'd be happy to help you with your recent purchase. Could you please provide more details about the issue you're experiencing? - ============================ - [status] IDLE_WITH_PENDING_REQUESTS - - [User responding: My order 1234 arrived damaged and the packaging was destroyed.] - - === User Input Requested === - - user: Hello, I need assistance with my recent purchase. - - triage_agent: I'd be happy to help you with your recent purchase. Could you please provide more details about the issue you're experiencing? - - user: My order 1234 arrived damaged and the packaging was destroyed. - - triage_agent: I'm sorry to hear that your order arrived damaged and the packaging was destroyed. I will connect you with a specialist who can assist you further with this issue. + - User: Hello, I need assistance with my recent purchase. + - triage_agent: Could you please provide more details about the issue you're experiencing with your recent purchase? This will help me route you to the appropriate specialist. - Tool Call: handoff_to_support_agent (awaiting approval) - - support_agent: I'm so sorry to hear that your order arrived in such poor condition. I'll help you get this sorted out. + [Workflow Status] IDLE_WITH_PENDING_REQUESTS - To assist you better, could you please let me know: - - Which item(s) from order 1234 arrived damaged? - - Could you describe the damage, or provide photos if possible? - - Would you prefer a replacement or a refund? + - User: My order 1234 arrived damaged and the packaging was destroyed. I'd like to return it. + - triage_agent: I've directed your request to our return agent, who will assist you with returning the damaged order. Thank you for your patience! + - return_agent: The return for your order 1234 has been successfully initiated. You will receive return instructions via email shortly. If you have any other questions or need further assistance, feel free to ask! - Once I have this information, I can help resolve this for you as quickly as possible. - ============================ - [status] IDLE_WITH_PENDING_REQUESTS + [Workflow Status] IDLE_WITH_PENDING_REQUESTS - [User responding: Yes, I'd like a refund if that's possible.] + - User: Thanks for resolving this. - === User Input Requested === + === Final Conversation Snapshot === - user: Hello, I need assistance with my recent purchase. - - triage_agent: I'd be happy to help you with your recent purchase. Could you please provide more details about the issue you're experiencing? - - user: My order 1234 arrived damaged and the packaging was destroyed. - - triage_agent: I'm sorry to hear that your order arrived damaged and the packaging was destroyed. I will connect you with a specialist who can assist you further with this issue. - - Tool Call: handoff_to_support_agent (awaiting approval) - - support_agent: I'm so sorry to hear that your order arrived in such poor condition. I'll help you get this sorted out. - - To assist you better, could you please let me know: - - Which item(s) from order 1234 arrived damaged? - - Could you describe the damage, or provide photos if possible? - - Would you prefer a replacement or a refund? - - Once I have this information, I can help resolve this for you as quickly as possible. - - user: Yes, I'd like a refund if that's possible. - - triage_agent: Thank you for letting me know you'd prefer a refund. I'll connect you with a specialist who can process your refund request. - - Tool Call: handoff_to_refund_agent (awaiting approval) - - refund_agent: Thank you for confirming that you'd like a refund for order 1234. - - Here's what will happen next: - - ... - - Tool Call: handoff_to_refund_agent (awaiting approval) - - refund_agent: Thank you for confirming that you'd like a refund for order 1234. - - Here's what will happen next: - - **1. Verification:** - I will need to verify a few more details to proceed. - - Can you confirm the items in order 1234 that arrived damaged? - - Do you have any photos of the damaged items/packaging? (Photos help speed up the process.) - - **2. Refund Request Submission:** - - Once I have the details, I will submit your refund request for review. - - **3. Return Instructions (if needed):** - - In some cases, we may provide instructions on how to return the damaged items. - - You will receive a prepaid return label if necessary. - - **4. Refund Processing:** - - After your request is approved (and any returns are received if required), your refund will be processed. - - Refunds usually appear on your original payment method within 5-10 business days. - - Could you please reply with the specific item(s) damaged and, if possible, attach photos? This will help me get your refund started right away. + - triage_agent: Could you please provide more details about the issue you're experiencing with your recent purchase? This will help me route you to the appropriate specialist. + - user: My order 1234 arrived damaged and the packaging was destroyed. I'd like to return it. + - triage_agent: I've directed your request to our return agent, who will assist you with returning the damaged order. Thank you for your patience! + - return_agent: The return for your order 1234 has been successfully initiated. You will receive return instructions via email shortly. If you have any other questions or need further assistance, feel free to ask! - user: Thanks for resolving this. + - triage_agent: You're welcome! If you have any more questions or need assistance in the future, feel free to reach out. Have a great day! =================================== - [status] IDLE + + [Workflow Status] IDLE """ # noqa: E501 diff --git a/python/samples/getting_started/workflows/orchestration/handoff_with_code_interpreter_file.py b/python/samples/getting_started/workflows/orchestration/handoff_with_code_interpreter_file.py index cec49e8634..b1fd37302a 100644 --- a/python/samples/getting_started/workflows/orchestration/handoff_with_code_interpreter_file.py +++ b/python/samples/getting_started/workflows/orchestration/handoff_with_code_interpreter_file.py @@ -26,9 +26,8 @@ """ import asyncio -from collections.abc import AsyncIterable +from collections.abc import AsyncIterable, AsyncIterator from contextlib import asynccontextmanager -from collections.abc import AsyncIterator from agent_framework import ( AgentRunUpdateEvent,