diff --git a/python/packages/core/agent_framework/_workflows/_edge.py b/python/packages/core/agent_framework/_workflows/_edge.py index 2d144657fe..87a6f7af2b 100644 --- a/python/packages/core/agent_framework/_workflows/_edge.py +++ b/python/packages/core/agent_framework/_workflows/_edge.py @@ -232,7 +232,7 @@ def __init__(self) -> None: """ condition: Callable[[Any], bool] - target: Executor + target: Executor | str @dataclass @@ -255,7 +255,7 @@ def __init__(self) -> None: assert fallback.target.id == "dead_letter" """ - target: Executor + target: Executor | str @dataclass(init=False) diff --git a/python/packages/core/agent_framework/_workflows/_validation.py b/python/packages/core/agent_framework/_workflows/_validation.py index 88e37a121a..fc59bb94e1 100644 --- a/python/packages/core/agent_framework/_workflows/_validation.py +++ b/python/packages/core/agent_framework/_workflows/_validation.py @@ -101,21 +101,20 @@ class WorkflowGraphValidator: def __init__(self) -> None: self._edges: list[Edge] = [] self._executors: dict[str, Executor] = {} - self._start_executor_ref: Executor | str | None = None # region Core Validation Methods def validate_workflow( self, edge_groups: Sequence[EdgeGroup], executors: dict[str, Executor], - start_executor: Executor | str, + start_executor: Executor, ) -> None: """Validate the entire workflow graph. Args: edge_groups: list of edge groups in the workflow executors: Map of executor IDs to executor instances - start_executor: The starting executor (can be instance or ID) + start_executor: The starting executor Raises: WorkflowValidationError: If any validation fails @@ -123,22 +122,20 @@ def validate_workflow( self._executors = executors self._edges = [edge for group in edge_groups for edge in group.edges] self._edge_groups = edge_groups - self._start_executor_ref = start_executor # If only the start executor exists, add it to the executor map # Handle the special case where the workflow consists of only a single executor and no edges. # In this scenario, the executor map will be empty because there are no edge groups to reference executors. # Adding the start executor to the map ensures that single-executor workflows (without any edges) are supported, # allowing validation and execution to proceed for workflows that do not require inter-executor communication. - if not self._executors and start_executor and isinstance(start_executor, Executor): + if not self._executors: self._executors[start_executor.id] = start_executor # Validate that start_executor exists in the graph # It should because we check for it in the WorkflowBuilder # but we do it here for completeness. - start_executor_id = start_executor.id if isinstance(start_executor, Executor) else start_executor - if start_executor_id not in self._executors: - raise GraphConnectivityError(f"Start executor '{start_executor_id}' is not present in the workflow graph") + if start_executor.id not in self._executors: + raise GraphConnectivityError(f"Start executor '{start_executor.id}' is not present in the workflow graph") # Additional presence verification: # A start executor that is only injected via the builder (present in the executors map) @@ -152,16 +149,16 @@ def validate_workflow( for e in self._edges: edge_executor_ids.add(e.source_id) edge_executor_ids.add(e.target_id) - if start_executor_id not in edge_executor_ids: + if start_executor.id not in edge_executor_ids: raise GraphConnectivityError( - f"Start executor '{start_executor_id}' is not present in the workflow graph" + f"Start executor '{start_executor.id}' is not present in the workflow graph" ) # Run all checks self._validate_edge_duplication() self._validate_handler_output_annotations() self._validate_type_compatibility() - self._validate_graph_connectivity(start_executor_id) + self._validate_graph_connectivity(start_executor.id) self._validate_self_loops() self._validate_dead_ends() @@ -398,7 +395,7 @@ def _validate_dead_ends(self) -> None: def validate_workflow_graph( edge_groups: Sequence[EdgeGroup], executors: dict[str, Executor], - start_executor: Executor | str, + start_executor: Executor, ) -> None: """Convenience function to validate a workflow graph. diff --git a/python/packages/core/agent_framework/_workflows/_workflow.py b/python/packages/core/agent_framework/_workflows/_workflow.py index eb22d7c330..caa60fbef6 100644 --- a/python/packages/core/agent_framework/_workflows/_workflow.py +++ b/python/packages/core/agent_framework/_workflows/_workflow.py @@ -180,7 +180,7 @@ def __init__( self, edge_groups: list[EdgeGroup], executors: dict[str, Executor], - start_executor: Executor | str, + start_executor: Executor, runner_context: RunnerContext, max_iterations: int = DEFAULT_MAX_ITERATIONS, name: str | None = None, @@ -192,19 +192,16 @@ def __init__( Args: edge_groups: A list of EdgeGroup instances that define the workflow edges. executors: A dictionary mapping executor IDs to Executor instances. - start_executor: The starting executor for the workflow, which can be an Executor instance or its ID. + start_executor: The starting executor for the workflow. runner_context: The RunnerContext instance to be used during workflow execution. max_iterations: The maximum number of iterations the workflow will run for convergence. name: Optional human-readable name for the workflow. description: Optional description of what the workflow does. kwargs: Additional keyword arguments. Unused in this implementation. """ - # Convert start_executor to string ID if it's an Executor instance - start_executor_id = start_executor.id if isinstance(start_executor, Executor) else start_executor - self.edge_groups = list(edge_groups) self.executors = dict(executors) - self.start_executor_id = start_executor_id + self.start_executor_id = start_executor.id self.max_iterations = max_iterations self.id = str(uuid.uuid4()) self.name = name diff --git a/python/packages/core/agent_framework/_workflows/_workflow_builder.py b/python/packages/core/agent_framework/_workflows/_workflow_builder.py index 70f8747ec9..26cd0213e4 100644 --- a/python/packages/core/agent_framework/_workflows/_workflow_builder.py +++ b/python/packages/core/agent_framework/_workflows/_workflow_builder.py @@ -3,8 +3,13 @@ import logging import sys from collections.abc import Callable, Sequence +from dataclasses import dataclass from typing import Any +from typing_extensions import deprecated + +from agent_framework import AgentThread + from .._agents import AgentProtocol from ..observability import OtelAttr, capture_exception, create_workflow_span from ._agent_executor import AgentExecutor @@ -36,6 +41,76 @@ logger = logging.getLogger(__name__) +@dataclass +class _EdgeRegistration: + """A data class representing an edge registration in the workflow builder. + + Args: + source: The registered source name. + target: The registered target name. + condition: An optional condition function for the edge. + """ + + source: str + target: str + condition: Callable[[Any], bool] | None = None + + +@dataclass +class _FanOutEdgeRegistration: + """A data class representing a fan-out edge registration in the workflow builder. + + Args: + source: The registered source name. + targets: A list of registered target names. + """ + + source: str + targets: list[str] + + +@dataclass +class _FanInEdgeRegistration: + """A data class representing a fan-in edge registration in the workflow builder. + + Args: + sources: A list of registered source names. + target: The registered target name. + """ + + sources: list[str] + target: str + + +@dataclass +class _SwitchCaseEdgeGroupRegistration: + """A data class representing a switch-case edge group registration in the workflow builder. + + Args: + source: The registered source name. + cases: A list of case objects that determine the target executor for each message. + """ + + source: str + cases: list[Case | Default] + + +@dataclass +class _MultiSelectionEdgeGroupRegistration: + """A data class representing a multi-selection edge group registration in the workflow builder. + + Args: + source: The registered source name. + targets: A list of registered target names. + selection_func: A function that selects target executors for messages. + Takes (message, list[registered target names]) and returns list[registered target names]. + """ + + source: str + targets: list[str] + selection_func: Callable[[Any, list[str]], list[str]] + + class WorkflowBuilder: """A builder class for constructing workflows. @@ -65,8 +140,10 @@ async def process(self, text: str, ctx: WorkflowContext[Never, str]) -> None: # Build a workflow workflow = ( WorkflowBuilder() - .add_edge(UpperCaseExecutor(id="upper"), ReverseExecutor(id="reverse")) - .set_start_executor("upper") + .register_executor(lambda: UpperCaseExecutor(id="upper"), name="UpperCase") + .register_executor(lambda: ReverseExecutor(id="reverse"), name="Reverse") + .add_edge("UpperCase", "Reverse") + .set_start_executor("UpperCase") .build() ) @@ -101,6 +178,16 @@ def __init__( # the start node vs edge nodes and triggering a GraphConnectivityError during validation. self._agent_wrappers: dict[int, Executor] = {} + # Registrations for lazy initialization of executors + self._edge_registry: list[ + _EdgeRegistration + | _FanOutEdgeRegistration + | _SwitchCaseEdgeGroupRegistration + | _MultiSelectionEdgeGroupRegistration + | _FanInEdgeRegistration + ] = [] + self._executor_registry: dict[str, Callable[[], Executor]] = {} + # Agents auto-wrapped by builder now always stream incremental updates. def _add_executor(self, executor: Executor) -> str: @@ -173,6 +260,135 @@ def _maybe_wrap_agent( f"WorkflowBuilder expected an Executor or AgentProtocol instance; got {type(candidate).__name__}." ) + def register_executor(self, factory_func: Callable[[], Executor], name: str | list[str]) -> Self: + """Register an executor factory function for lazy initialization. + + This method allows you to register a factory function that creates an executor. + The executor will be instantiated only when the workflow is built, enabling + deferred initialization and potentially reducing startup time. + + Args: + factory_func: A callable that returns an Executor instance when called. + name: The name(s) of the registered executor factory. This doesn't have to match + the executor's ID, but it must be unique within the workflow. + + Example: + .. code-block:: python + from typing_extensions import Never + from agent_framework import Executor, WorkflowBuilder, WorkflowContext, handler + + + class UpperCaseExecutor(Executor): + @handler + async def process(self, text: str, ctx: WorkflowContext[str]) -> None: + await ctx.send_message(text.upper()) + + + class ReverseExecutor(Executor): + @handler + async def process(self, text: str, ctx: WorkflowContext[Never, str]) -> None: + await ctx.yield_output(text[::-1]) + + + # Build a workflow + workflow = ( + WorkflowBuilder() + .register_executor(lambda: UpperCaseExecutor(id="upper"), name="UpperCase") + .register_executor(lambda: ReverseExecutor(id="reverse"), name="Reverse") + .set_start_executor("UpperCase") + .add_edge("UpperCase", "Reverse") + .build() + ) + + If multiple names are provided, the same factory function will be registered under each name. + + ...code-block:: python + from agent_framework import WorkflowBuilder, Executor, WorkflowContext, handler + + + class LoggerExecutor(Executor): + @handler + async def log(self, message: str, ctx: WorkflowContext) -> None: + print(f"Log: {message}") + + + # Register the same executor factory under multiple names + workflow = ( + WorkflowBuilder() + .register_executor(lambda: CustomExecutor(id="logger"), name=["ExecutorA", "ExecutorB"]) + .set_start_executor("ExecutorA") + .add_edge("ExecutorA", "ExecutorB") + .build() + """ + names = [name] if isinstance(name, str) else name + + for n in names: + if n in self._executor_registry: + raise ValueError(f"An executor factory with the name '{n}' is already registered.") + + for n in names: + self._executor_registry[n] = factory_func + + return self + + def register_agent( + self, + factory_func: Callable[[], AgentProtocol], + name: str, + agent_thread: AgentThread | None = None, + output_response: bool = False, + ) -> Self: + """Register an agent factory function for lazy initialization. + + This method allows you to register a factory function that creates an agent. + The agent will be instantiated and wrapped in an AgentExecutor only when the workflow is built, + enabling deferred initialization and potentially reducing startup time. + + Args: + factory_func: A callable that returns an AgentProtocol instance when called. + name: The name of the registered agent factory. This doesn't have to match + the agent's internal name. But it must be unique within the workflow. + agent_thread: The thread to use for running the agent. If None, a new thread will be created when + the agent is instantiated. + output_response: Whether to yield an AgentRunResponse as a workflow output when the agent completes. + + Example: + .. code-block:: python + + from agent_framework import WorkflowBuilder + from agent_framework_anthropic import AnthropicAgent + + + # Build a workflow + workflow = ( + WorkflowBuilder() + .register_executor(lambda: ..., name="SomeOtherExecutor") + .register_agent( + lambda: AnthropicAgent(name="writer", model="claude-3-5-sonnet-20241022"), + name="WriterAgent", + output_response=True, + ) + .add_edge("SomeOtherExecutor", "WriterAgent") + .set_start_executor("SomeOtherExecutor") + .build() + ) + """ + if name in self._executor_registry: + raise ValueError(f"An executor factory with the name '{name}' is already registered.") + + def wrapped_factory() -> AgentExecutor: + agent = factory_func() + return AgentExecutor( + agent, + agent_thread=agent_thread, + output_response=output_response, + ) + + self._executor_registry[name] = wrapped_factory + + return self + + @deprecated("Use register_agent() for lazy initialization instead.") def add_agent( self, agent: AgentProtocol, @@ -214,6 +430,11 @@ def add_agent( # Add the agent to a workflow workflow = WorkflowBuilder().add_agent(agent, output_response=True).set_start_executor(agent).build() """ + logger.warning( + "Adding an agent instance directly to WorkflowBuilder is not recommended, " + "because workflow instances created from the builder will share the same agent instance. " + "Consider using register_agent() for lazy initialization instead." + ) executor = self._maybe_wrap_agent( agent, agent_thread=agent_thread, output_response=output_response, executor_id=id ) @@ -222,8 +443,8 @@ def add_agent( def add_edge( self, - source: Executor | AgentProtocol, - target: Executor | AgentProtocol, + source: Executor | AgentProtocol | str, + target: Executor | AgentProtocol | str, condition: Callable[[Any], bool] | None = None, ) -> Self: """Add a directed edge between two executors. @@ -232,8 +453,8 @@ def add_edge( Messages sent by the source executor will be routed to the target executor. Args: - source: The source executor of the edge. - target: The target executor of the edge. + source: The source executor or registered name of the source factory for the edge. + target: The target executor or registered name of the target factory for the edge. condition: An optional condition function that determines whether the edge should be traversed based on the message type. @@ -261,7 +482,12 @@ async def process(self, count: int, ctx: WorkflowContext[Never, str]) -> None: # Connect executors with an edge workflow = ( - WorkflowBuilder().add_edge(ProcessorA(id="a"), ProcessorB(id="b")).set_start_executor("a").build() + WorkflowBuilder() + .register_executor(lambda: ProcessorA(id="a"), name="ProcessorA") + .register_executor(lambda: ProcessorB(id="b"), name="ProcessorB") + .add_edge("ProcessorA", "ProcessorB") + .set_start_executor("ProcessorA") + .build() ) @@ -272,14 +498,33 @@ def only_large_numbers(msg: int) -> bool: workflow = ( WorkflowBuilder() - .add_edge(ProcessorA(id="a"), ProcessorB(id="b"), condition=only_large_numbers) - .set_start_executor("a") + .register_executor(lambda: ProcessorA(id="a"), name="ProcessorA") + .register_executor(lambda: ProcessorB(id="b"), name="ProcessorB") + .add_edge("ProcessorA", "ProcessorB", condition=only_large_numbers) + .set_start_executor("ProcessorA") .build() ) """ - # TODO(@taochen): Support executor factories for lazy initialization - source_exec = self._maybe_wrap_agent(source) - target_exec = self._maybe_wrap_agent(target) + if not isinstance(source, str) or not isinstance(target, str): + logger.warning( + "Adding an edge with Executor or AgentProtocol instances directly is not recommended, " + "because workflow instances created from the builder will share the same executor/agent instances. " + "Consider using a registered name for lazy initialization instead." + ) + + if (isinstance(source, str) and not isinstance(target, str)) or ( + not isinstance(source, str) and isinstance(target, str) + ): + raise ValueError("Both source and target must be either names (str) or Executor/AgentProtocol instances.") + + if isinstance(source, str) and isinstance(target, str): + # Both are names; defer resolution to build time + self._edge_registry.append(_EdgeRegistration(source=source, target=target, condition=condition)) + return self + + # Both are Executor/AgentProtocol instances; wrap and add now + source_exec = self._maybe_wrap_agent(source) # type: ignore[arg-type] + target_exec = self._maybe_wrap_agent(target) # type: ignore[arg-type] source_id = self._add_executor(source_exec) target_id = self._add_executor(target_exec) self._edge_groups.append(SingleEdgeGroup(source_id, target_id, condition)) # type: ignore[call-arg] @@ -287,8 +532,8 @@ def only_large_numbers(msg: int) -> bool: def add_fan_out_edges( self, - source: Executor | AgentProtocol, - targets: Sequence[Executor | AgentProtocol], + source: Executor | AgentProtocol | str, + targets: Sequence[Executor | AgentProtocol | str], ) -> Self: """Add multiple edges to the workflow where messages from the source will be sent to all targets. @@ -296,8 +541,8 @@ def add_fan_out_edges( Messages from the source will be broadcast to all target executors concurrently. Args: - source: The source executor of the edges. - targets: A list of target executors for the edges. + source: The source executor or registered name of the source factory for the edges. + targets: A list of target executors or registered names of the target factories for the edges. Returns: Self: The WorkflowBuilder instance for method chaining. @@ -330,13 +575,34 @@ async def validate(self, data: str, ctx: WorkflowContext) -> None: # Broadcast to multiple validators workflow = ( WorkflowBuilder() - .add_fan_out_edges(DataSource(id="source"), [ValidatorA(id="val_a"), ValidatorB(id="val_b")]) - .set_start_executor("source") + .register_executor(lambda: DataSource(id="source"), name="DataSource") + .register_executor(lambda: ValidatorA(id="val_a"), name="ValidatorA") + .register_executor(lambda: ValidatorB(id="val_b"), name="ValidatorB") + .add_fan_out_edges("DataSource", ["ValidatorA", "ValidatorB"]) + .set_start_executor("DataSource") .build() ) """ - source_exec = self._maybe_wrap_agent(source) - target_execs = [self._maybe_wrap_agent(t) for t in targets] + if not isinstance(source, str) or any(not isinstance(t, str) for t in targets): + logger.warning( + "Adding fan-out edges with Executor or AgentProtocol instances directly is not recommended, " + "because workflow instances created from the builder will share the same executor/agent instances. " + "Consider using registered names for lazy initialization instead." + ) + + if (isinstance(source, str) and not all(isinstance(t, str) for t in targets)) or ( + not isinstance(source, str) and any(isinstance(t, str) for t in targets) + ): + raise ValueError("Both source and targets must be either names (str) or Executor/AgentProtocol instances.") + + if isinstance(source, str) and all(isinstance(t, str) for t in targets): + # Both are names; defer resolution to build time + self._edge_registry.append(_FanOutEdgeRegistration(source=source, targets=list(targets))) # type: ignore + return self + + # Both are Executor/AgentProtocol instances; wrap and add now + source_exec = self._maybe_wrap_agent(source) # type: ignore[arg-type] + target_execs = [self._maybe_wrap_agent(t) for t in targets] # type: ignore[arg-type] source_id = self._add_executor(source_exec) target_ids = [self._add_executor(t) for t in target_execs] self._edge_groups.append(FanOutEdgeGroup(source_id, target_ids)) # type: ignore[call-arg] @@ -345,7 +611,7 @@ async def validate(self, data: str, ctx: WorkflowContext) -> None: def add_switch_case_edge_group( self, - source: Executor | AgentProtocol, + source: Executor | AgentProtocol | str, cases: Sequence[Case | Default], ) -> Self: """Add an edge group that represents a switch-case statement. @@ -362,7 +628,7 @@ def add_switch_case_edge_group( (i.e., no condition matched). Args: - source: The source executor of the edges. + source: The source executor or registered name of the source factory for the edge group. cases: A list of case objects that determine the target executor for each message. Returns: @@ -401,24 +667,47 @@ async def handle(self, result: Result, ctx: WorkflowContext) -> None: # Route based on score value workflow = ( WorkflowBuilder() + .register_executor(lambda: Evaluator(id="eval"), name="Evaluator") + .register_executor(lambda: HighScoreHandler(id="high"), name="HighScoreHandler") + .register_executor(lambda: LowScoreHandler(id="low"), name="LowScoreHandler") .add_switch_case_edge_group( - Evaluator(id="eval"), + "Evaluator", [ - Case(condition=lambda r: r.score > 10, target=HighScoreHandler(id="high")), - Default(target=LowScoreHandler(id="low")), + Case(condition=lambda r: r.score > 10, target="HighScoreHandler"), + Default(target="LowScoreHandler"), ], ) - .set_start_executor("eval") + .set_start_executor("Evaluator") .build() ) """ - source_exec = self._maybe_wrap_agent(source) + if not isinstance(source, str) or not all(isinstance(case.target, str) for case in cases): + logger.warning( + "Adding a switch-case edge group with Executor or AgentProtocol instances directly is not recommended, " + "because workflow instances created from the builder will share the same executor/agent instance. " + "Consider using a registered name for lazy initialization instead." + ) + + if (isinstance(source, str) and not all(isinstance(case.target, str) for case in cases)) or ( + not isinstance(source, str) and any(isinstance(case.target, str) for case in cases) + ): + raise ValueError( + "Both source and case targets must be either names (str) or Executor/AgentProtocol instances." + ) + + if isinstance(source, str) and all(isinstance(case.target, str) for case in cases): + # Source is a name; defer resolution to build time + self._edge_registry.append(_SwitchCaseEdgeGroupRegistration(source=source, cases=list(cases))) # type: ignore + return self + + # Source is an Executor/AgentProtocol instance; wrap and add now + source_exec = self._maybe_wrap_agent(source) # type: ignore[arg-type] source_id = self._add_executor(source_exec) # Convert case data types to internal types that only uses target_id. internal_cases: list[SwitchCaseEdgeGroupCase | SwitchCaseEdgeGroupDefault] = [] for case in cases: # Allow case targets to be agents - case.target = self._maybe_wrap_agent(case.target) # type: ignore[attr-defined] + case.target = self._maybe_wrap_agent(case.target) # type: ignore[arg-type] self._add_executor(case.target) if isinstance(case, Default): internal_cases.append(SwitchCaseEdgeGroupDefault(target_id=case.target.id)) @@ -430,8 +719,8 @@ async def handle(self, result: Result, ctx: WorkflowContext) -> None: def add_multi_selection_edge_group( self, - source: Executor | AgentProtocol, - targets: Sequence[Executor | AgentProtocol], + source: Executor | AgentProtocol | str, + targets: Sequence[Executor | AgentProtocol | str], selection_func: Callable[[Any, list[str]], list[str]], ) -> Self: """Add an edge group that represents a multi-selection execution model. @@ -444,10 +733,11 @@ def add_multi_selection_edge_group( and return a list of executor IDs indicating which target executors should receive the message. Args: - source: The source executor of the edges. - targets: A list of target executors for the edges. + source: The source executor or registered name of the source factory for the edge group. + targets: A list of target executors or registered names of the target factories for the edges. selection_func: A function that selects target executors for messages. - Takes (message, list[executor_id]) and returns list[executor_id]. + Takes (message, list[executor_id or registered target names]) and + returns list[executor_id or registered target names]. Returns: Self: The WorkflowBuilder instance for method chaining. @@ -485,25 +775,52 @@ async def process(self, task: Task, ctx: WorkflowContext) -> None: # Select workers based on task priority - def select_workers(task: Task, executor_ids: list[str]) -> list[str]: + def select_workers(task: Task, available: list[str]) -> list[str]: if task.priority == "high": - return executor_ids # Send to all workers - return [executor_ids[0]] # Send to first worker only + return available # Send to all workers + return [available[0]] # Send to first worker only workflow = ( WorkflowBuilder() + .register_executor(lambda: TaskDispatcher(id="dispatcher"), name="TaskDispatcher") + .register_executor(lambda: WorkerA(id="worker_a"), name="WorkerA") + .register_executor(lambda: WorkerB(id="worker_b"), name="WorkerB") .add_multi_selection_edge_group( - TaskDispatcher(id="dispatcher"), - [WorkerA(id="worker_a"), WorkerB(id="worker_b")], + "TaskDispatcher", + ["WorkerA", "WorkerB"], selection_func=select_workers, ) - .set_start_executor("dispatcher") + .set_start_executor("TaskDispatcher") .build() ) """ - source_exec = self._maybe_wrap_agent(source) - target_execs = [self._maybe_wrap_agent(t) for t in targets] + if not isinstance(source, str) or any(not isinstance(t, str) for t in targets): + logger.warning( + "Adding fan-out edges with Executor or AgentProtocol instances directly is not recommended, " + "because workflow instances created from the builder will share the same executor/agent instances. " + "Consider using registered names for lazy initialization instead." + ) + + if (isinstance(source, str) and not all(isinstance(t, str) for t in targets)) or ( + not isinstance(source, str) and any(isinstance(t, str) for t in targets) + ): + raise ValueError("Both source and targets must be either names (str) or Executor/AgentProtocol instances.") + + if isinstance(source, str) and all(isinstance(t, str) for t in targets): + # Both are names; defer resolution to build time + self._edge_registry.append( + _MultiSelectionEdgeGroupRegistration( + source=source, + targets=list(targets), # type: ignore + selection_func=selection_func, + ) + ) + return self + + # Both are Executor/AgentProtocol instances; wrap and add now + source_exec = self._maybe_wrap_agent(source) # type: ignore + target_execs = [self._maybe_wrap_agent(t) for t in targets] # type: ignore source_id = self._add_executor(source_exec) target_ids = [self._add_executor(t) for t in target_execs] self._edge_groups.append(FanOutEdgeGroup(source_id, target_ids, selection_func)) # type: ignore[call-arg] @@ -512,8 +829,8 @@ def select_workers(task: Task, executor_ids: list[str]) -> list[str]: def add_fan_in_edges( self, - sources: Sequence[Executor | AgentProtocol], - target: Executor | AgentProtocol, + sources: Sequence[Executor | AgentProtocol | str], + target: Executor | AgentProtocol | str, ) -> Self: """Add multiple edges from sources to a single target executor. @@ -525,8 +842,8 @@ def add_fan_in_edges( types of the source executors. Args: - sources: A list of source executors for the edges. - target: The target executor for the edges. + sources: A list of source executors or registered names of the source factories for the edges. + target: The target executor or registered name of the target factory for the edges. Returns: Self: The WorkflowBuilder instance for method chaining. @@ -554,20 +871,41 @@ async def aggregate(self, results: list[str], ctx: WorkflowContext[Never, str]) # Collect results from multiple producers workflow = ( WorkflowBuilder() - .add_fan_in_edges([Producer(id="prod_1"), Producer(id="prod_2")], Aggregator(id="agg")) - .set_start_executor("prod_1") + .register_executor(lambda: Producer(id="prod_1"), name="Producer1") + .register_executor(lambda: Producer(id="prod_2"), name="Producer2") + .register_executor(lambda: Aggregator(id="agg"), name="Aggregator") + .add_fan_in_edges(["Producer1", "Producer2"], "Aggregator") + .set_start_executor("Producer1") .build() ) """ - source_execs = [self._maybe_wrap_agent(s) for s in sources] - target_exec = self._maybe_wrap_agent(target) + if not all(isinstance(s, str) for s in sources) or not isinstance(target, str): + logger.warning( + "Adding fan-in edges with Executor or AgentProtocol instances directly is not recommended, " + "because workflow instances created from the builder will share the same executor/agent instances. " + "Consider using registered names for lazy initialization instead." + ) + + if (all(isinstance(s, str) for s in sources) and not isinstance(target, str)) or ( + not all(isinstance(s, str) for s in sources) and isinstance(target, str) + ): + raise ValueError("Both sources and target must be either names (str) or Executor/AgentProtocol instances.") + + if all(isinstance(s, str) for s in sources) and isinstance(target, str): + # Both are names; defer resolution to build time + self._edge_registry.append(_FanInEdgeRegistration(sources=list(sources), target=target)) # type: ignore + return self + + # Both are Executor/AgentProtocol instances; wrap and add now + source_execs = [self._maybe_wrap_agent(s) for s in sources] # type: ignore + target_exec = self._maybe_wrap_agent(target) # type: ignore source_ids = [self._add_executor(s) for s in source_execs] target_id = self._add_executor(target_exec) self._edge_groups.append(FanInEdgeGroup(source_ids, target_id)) # type: ignore[call-arg] return self - def add_chain(self, executors: Sequence[Executor | AgentProtocol]) -> Self: + def add_chain(self, executors: Sequence[Executor | AgentProtocol | str]) -> Self: """Add a chain of executors to the workflow. The output of each executor in the chain will be sent to the next executor in the chain. @@ -576,7 +914,7 @@ def add_chain(self, executors: Sequence[Executor | AgentProtocol]) -> Self: Circles in the chain are not allowed, meaning the chain cannot have two executors with the same ID. Args: - executors: A list of executors to be added to the chain. + executors: A list of executors or registered names of the executor factories to chain together. Returns: Self: The WorkflowBuilder instance for method chaining. @@ -609,13 +947,38 @@ async def process(self, text: str, ctx: WorkflowContext[Never, str]) -> None: # Chain executors in sequence workflow = ( WorkflowBuilder() - .add_chain([Step1(id="step1"), Step2(id="step2"), Step3(id="step3")]) + .register_executor(lambda: Step1(id="step1"), name="step1") + .register_executor(lambda: Step2(id="step2"), name="step2") + .register_executor(lambda: Step3(id="step3"), name="step3") + .add_chain(["step1", "step2", "step3"]) .set_start_executor("step1") .build() ) """ + if len(executors) < 2: + raise ValueError("At least two executors are required to form a chain.") + + if not all(isinstance(e, str) for e in executors): + logger.warning( + "Adding a chain with Executor or AgentProtocol instances directly is not recommended, " + "because workflow instances created from the builder will share the same executor/agent instances. " + "Consider using registered names for lazy initialization instead." + ) + + if not all(isinstance(e, str) for e in executors) and any(isinstance(e, str) for e in executors): + raise ValueError( + "All executors in the chain must be either names (str) or Executor/AgentProtocol instances." + ) + + if all(isinstance(e, str) for e in executors): + # All are names; defer resolution to build time + for i in range(len(executors) - 1): + self.add_edge(executors[i], executors[i + 1]) + return self + + # Both are Executor/AgentProtocol instances; wrap and add now # Wrap each candidate first to ensure stable IDs before adding edges - wrapped: list[Executor] = [self._maybe_wrap_agent(e) for e in executors] + wrapped: list[Executor] = [self._maybe_wrap_agent(e) for e in executors] # type: ignore[arg-type] for i in range(len(wrapped) - 1): self.add_edge(wrapped[i], wrapped[i + 1]) return self @@ -628,7 +991,7 @@ def set_start_executor(self, executor: Executor | AgentProtocol | str) -> Self: Args: executor: The starting executor, which can be an Executor instance, AgentProtocol instance, - or the string ID of an executor previously added to the workflow. + or the name of a registered executor factory. Returns: Self: The WorkflowBuilder instance for method chaining. @@ -652,18 +1015,19 @@ async def process(self, text: str, ctx: WorkflowContext[Never, str]) -> None: await ctx.yield_output(text) - # Set by executor instance - entry = EntryPoint(id="entry") - workflow = WorkflowBuilder().add_edge(entry, Processor(id="proc")).set_start_executor(entry).build() - - # Set by executor ID string workflow = ( WorkflowBuilder() - .add_edge(EntryPoint(id="entry"), Processor(id="proc")) - .set_start_executor("entry") + .register_executor(lambda: EntryPoint(id="entry"), name="EntryPoint") + .register_executor(lambda: Processor(id="proc"), name="Processor") + .add_edge("EntryPoint", "Processor") + .set_start_executor("EntryPoint") .build() ) """ + if self._start_executor is not None: + start_id = self._start_executor if isinstance(self._start_executor, str) else self._start_executor.id + logger.warning(f"Overwriting existing start executor: {start_id} for the workflow.") + if isinstance(executor, str): self._start_executor = executor else: @@ -711,9 +1075,11 @@ async def process(self, count: int, ctx: WorkflowContext[int]) -> None: workflow = ( WorkflowBuilder() .set_max_iterations(500) - .add_edge(StepA(id="step_a"), StepB(id="step_b")) - .add_edge(StepB(id="step_b"), StepA(id="step_a")) # Cycle - .set_start_executor("step_a") + .register_executor(lambda: StepA(id="step_a"), name="StepA") + .register_executor(lambda: StepB(id="step_b"), name="StepB") + .add_edge("StepA", "StepB") + .add_edge("StepB", "StepA") # Cycle + .set_start_executor("StepA") .build() ) """ @@ -759,8 +1125,10 @@ async def process(self, text: str, ctx: WorkflowContext[Never, str]) -> None: storage = FileCheckpointStorage("./checkpoints") workflow = ( WorkflowBuilder() - .add_edge(ProcessorA(id="proc_a"), ProcessorB(id="proc_b")) - .set_start_executor("proc_a") + .register_executor(lambda: ProcessorA(id="proc_a"), name="ProcessorA") + .register_executor(lambda: ProcessorB(id="proc_b"), name="ProcessorB") + .add_edge("ProcessorA", "ProcessorB") + .set_start_executor("ProcessorA") .with_checkpointing(storage) .build() ) @@ -771,6 +1139,70 @@ async def process(self, text: str, ctx: WorkflowContext[Never, str]) -> None: self._checkpoint_storage = checkpoint_storage return self + def _resolve_edge_registry(self) -> tuple[Executor, list[Executor], list[EdgeGroup]]: + """Resolve deferred edge registrations into executors and edge groups.""" + if not self._start_executor: + raise ValueError("Starting executor must be set using set_start_executor before building the workflow.") + + start_executor: Executor | None = None + if isinstance(self._start_executor, Executor): + start_executor = self._start_executor + + executors: dict[str, Executor] = {} + deferred_edge_groups: list[EdgeGroup] = [] + for name, exec_factory in self._executor_registry.items(): + instance = exec_factory() + if isinstance(self._start_executor, str) and name == self._start_executor: + start_executor = instance + # All executors will get their own internal edge group for receiving system messages + deferred_edge_groups.append(InternalEdgeGroup(instance.id)) # type: ignore[call-arg] + executors[name] = instance + + def _get_executor(name: str) -> Executor: + """Helper to get executor by the registered name. Raises if not found.""" + if name not in executors: + raise ValueError(f"Executor with name '{name}' has not been registered.") + return executors[name] + + for registration in self._edge_registry: + match registration: + case _EdgeRegistration(source, target, condition): + source_exec: Executor = _get_executor(source) + target_exec: Executor = _get_executor(target) + deferred_edge_groups.append(SingleEdgeGroup(source_exec.id, target_exec.id, condition)) # type: ignore[call-arg] + case _FanOutEdgeRegistration(source, targets): + source_exec = _get_executor(source) + target_execs = [_get_executor(t) for t in targets] + deferred_edge_groups.append(FanOutEdgeGroup(source_exec.id, [t.id for t in target_execs])) # type: ignore[call-arg] + case _SwitchCaseEdgeGroupRegistration(source, cases): + source_exec = _get_executor(source) + cases_converted: list[SwitchCaseEdgeGroupCase | SwitchCaseEdgeGroupDefault] = [] + for case in cases: + if not isinstance(case.target, str): + raise ValueError("Switch case target must be a registered executor name (str) if deferred.") + target_exec = _get_executor(case.target) + if isinstance(case, Default): + cases_converted.append(SwitchCaseEdgeGroupDefault(target_id=target_exec.id)) + else: + cases_converted.append( + SwitchCaseEdgeGroupCase(condition=case.condition, target_id=target_exec.id) + ) + deferred_edge_groups.append(SwitchCaseEdgeGroup(source_exec.id, cases_converted)) # type: ignore[call-arg] + case _MultiSelectionEdgeGroupRegistration(source, targets, selection_func): + source_exec = _get_executor(source) + target_execs = [_get_executor(t) for t in targets] + deferred_edge_groups.append( + FanOutEdgeGroup(source_exec.id, [t.id for t in target_execs], selection_func) # type: ignore[call-arg] + ) + case _FanInEdgeRegistration(sources, target): + source_execs = [_get_executor(s) for s in sources] + target_exec = _get_executor(target) + deferred_edge_groups.append(FanInEdgeGroup([s.id for s in source_execs], target_exec.id)) # type: ignore[call-arg] + if start_executor is None: + raise ValueError("Failed to resolve starting executor from registered factories.") + + return start_executor, list(executors.values()), deferred_edge_groups + def build(self) -> Workflow: """Build and return the constructed workflow. @@ -802,7 +1234,12 @@ async def process(self, text: str, ctx: WorkflowContext[Never, str]) -> None: # Build and execute a workflow - workflow = WorkflowBuilder().set_start_executor(MyExecutor(id="executor")).build() + workflow = ( + WorkflowBuilder() + .register_executor(lambda: MyExecutor(id="executor"), name="MyExecutor") + .set_start_executor("MyExecutor") + .build() + ) # The workflow is now immutable and ready to run events = await workflow.run("hello") @@ -818,16 +1255,16 @@ async def process(self, text: str, ctx: WorkflowContext[Never, str]) -> None: # Add workflow build started event span.add_event(OtelAttr.BUILD_STARTED) - if not self._start_executor: - raise ValueError( - "Starting executor must be set using set_start_executor before building the workflow." - ) + # Resolve lazy edge registrations + start_executor, deferred_executors, deferred_edge_groups = self._resolve_edge_registry() + executors = self._executors | {exe.id: exe for exe in deferred_executors} + edge_groups = self._edge_groups + deferred_edge_groups # Perform validation before creating the workflow validate_workflow_graph( - self._edge_groups, - self._executors, - self._start_executor, + edge_groups, + executors, + start_executor, ) # Add validation completed event @@ -837,9 +1274,9 @@ async def process(self, text: str, ctx: WorkflowContext[Never, str]) -> None: # Create workflow instance after validation workflow = Workflow( - self._edge_groups, - self._executors, - self._start_executor, + edge_groups, + executors, + start_executor, context, self._max_iterations, name=self._name, diff --git a/python/packages/core/agent_framework/observability.py b/python/packages/core/agent_framework/observability.py index 8d2f9826ab..7ec778b8d2 100644 --- a/python/packages/core/agent_framework/observability.py +++ b/python/packages/core/agent_framework/observability.py @@ -20,7 +20,7 @@ if TYPE_CHECKING: # pragma: no cover from azure.core.credentials import TokenCredential - from opentelemetry.sdk._logs._internal.export import LogRecordExporter + from opentelemetry.sdk._logs.export import LogRecordExporter from opentelemetry.sdk.metrics.export import MetricExporter from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace.export import SpanExporter @@ -268,9 +268,9 @@ def _get_otlp_exporters(endpoints: list[str]) -> list["LogRecordExporter | SpanE exporters: list["LogRecordExporter | SpanExporter | MetricExporter"] = [] for endpoint in endpoints: - exporters.append(OTLPLogExporter(endpoint=endpoint)) # type: ignore[arg-type] - exporters.append(OTLPSpanExporter(endpoint=endpoint)) # type: ignore[arg-type] - exporters.append(OTLPMetricExporter(endpoint=endpoint)) # type: ignore[arg-type] + exporters.append(OTLPLogExporter(endpoint=endpoint)) + exporters.append(OTLPSpanExporter(endpoint=endpoint)) + exporters.append(OTLPMetricExporter(endpoint=endpoint)) return exporters @@ -493,8 +493,7 @@ def _configure_providers(self, exporters: list["LogRecordExporter | MetricExport """Configure tracing, logging, events and metrics with the provided exporters.""" from opentelemetry._logs import set_logger_provider from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler - from opentelemetry.sdk._logs._internal.export import LogRecordExporter - from opentelemetry.sdk._logs.export import BatchLogRecordProcessor + from opentelemetry.sdk._logs.export import BatchLogRecordProcessor, LogRecordExporter from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.metrics.export import MetricExporter, PeriodicExportingMetricReader from opentelemetry.sdk.metrics.view import DropAggregation, View @@ -522,7 +521,7 @@ def _configure_providers(self, exporters: list["LogRecordExporter | MetricExport logger_provider.add_log_record_processor(BatchLogRecordProcessor(exporter)) should_add_console_exporter = False if should_add_console_exporter: - from opentelemetry.sdk._logs._internal.export import ConsoleLogExporter + from opentelemetry.sdk._logs.export import ConsoleLogExporter logger_provider.add_log_record_processor(BatchLogRecordProcessor(ConsoleLogExporter())) diff --git a/python/packages/core/tests/workflow/test_workflow_builder.py b/python/packages/core/tests/workflow/test_workflow_builder.py index b88271ed26..b85ca5d787 100644 --- a/python/packages/core/tests/workflow/test_workflow_builder.py +++ b/python/packages/core/tests/workflow/test_workflow_builder.py @@ -208,3 +208,318 @@ def test_add_agent_duplicate_id_raises_error(): # Adding second agent with same name should raise ValueError with pytest.raises(ValueError, match="Duplicate executor ID"): builder.add_agent(agent2) + + +# Tests for new executor registration patterns + + +def test_register_executor_basic(): + """Test basic executor registration with lazy initialization.""" + builder = WorkflowBuilder() + + # Register an executor factory - ID must match the registered name + result = builder.register_executor(lambda: MockExecutor(id="TestExecutor"), name="TestExecutor") + + # Verify that register returns the builder for chaining + assert result is builder + + # Build workflow and verify executor is instantiated + workflow = builder.set_start_executor("TestExecutor").build() + assert "TestExecutor" in workflow.executors + assert isinstance(workflow.executors["TestExecutor"], MockExecutor) + + +def test_register_multiple_executors(): + """Test registering multiple executors and connecting them with edges.""" + builder = WorkflowBuilder() + + # Register multiple executors - IDs must match registered names + builder.register_executor(lambda: MockExecutor(id="ExecutorA"), name="ExecutorA") + builder.register_executor(lambda: MockExecutor(id="ExecutorB"), name="ExecutorB") + builder.register_executor(lambda: MockExecutor(id="ExecutorC"), name="ExecutorC") + + # Build workflow with edges using registered names + workflow = ( + builder.set_start_executor("ExecutorA") + .add_edge("ExecutorA", "ExecutorB") + .add_edge("ExecutorB", "ExecutorC") + .build() + ) + + # Verify all executors are present + assert "ExecutorA" in workflow.executors + assert "ExecutorB" in workflow.executors + assert "ExecutorC" in workflow.executors + assert workflow.start_executor_id == "ExecutorA" + + +def test_register_with_multiple_names(): + """Test registering the same factory function under multiple names.""" + builder = WorkflowBuilder() + + # Register same executor factory under multiple names + # Note: Each call creates a new instance, so IDs won't conflict + counter = {"val": 0} + + def make_executor(): + counter["val"] += 1 + return MockExecutor(id="ExecutorA" if counter["val"] == 1 else "ExecutorB") + + builder.register_executor(make_executor, name=["ExecutorA", "ExecutorB"]) + + # Set up workflow + workflow = builder.set_start_executor("ExecutorA").add_edge("ExecutorA", "ExecutorB").build() + + # Verify both executors are present + assert "ExecutorA" in workflow.executors + assert "ExecutorB" in workflow.executors + assert workflow.start_executor_id == "ExecutorA" + + +def test_register_duplicate_name_raises_error(): + """Test that registering duplicate names raises an error.""" + builder = WorkflowBuilder() + + # Register first executor + builder.register_executor(lambda: MockExecutor(id="executor_1"), name="MyExecutor") + + # Registering second executor with same name should raise ValueError + with pytest.raises(ValueError, match="already registered"): + builder.register_executor(lambda: MockExecutor(id="executor_2"), name="MyExecutor") + + +def test_register_agent_basic(): + """Test basic agent registration with lazy initialization.""" + builder = WorkflowBuilder() + + # Register an agent factory + result = builder.register_agent( + lambda: DummyAgent(id="agent_test", name="test_agent"), name="TestAgent", output_response=True + ) + + # Verify that register_agent returns the builder for chaining + assert result is builder + + # Build workflow and verify agent is wrapped in AgentExecutor + workflow = builder.set_start_executor("TestAgent").build() + assert "test_agent" in workflow.executors + assert isinstance(workflow.executors["test_agent"], AgentExecutor) + assert workflow.executors["test_agent"]._output_response is True # type: ignore + + +def test_register_agent_with_thread(): + """Test registering an agent with a custom thread.""" + builder = WorkflowBuilder() + custom_thread = AgentThread() + + # Register agent with custom thread + builder.register_agent( + lambda: DummyAgent(id="agent_with_thread", name="threaded_agent"), + name="ThreadedAgent", + agent_thread=custom_thread, + output_response=False, + ) + + # Build workflow and verify agent executor configuration + workflow = builder.set_start_executor("ThreadedAgent").build() + executor = workflow.executors["threaded_agent"] + + assert isinstance(executor, AgentExecutor) + assert executor.id == "threaded_agent" + assert executor._output_response is False # type: ignore + assert executor._agent_thread is custom_thread # type: ignore + + +def test_register_agent_duplicate_name_raises_error(): + """Test that registering agents with duplicate names raises an error.""" + builder = WorkflowBuilder() + + # Register first agent + builder.register_agent(lambda: DummyAgent(id="agent1", name="first"), name="MyAgent") + + # Registering second agent with same name should raise ValueError + with pytest.raises(ValueError, match="already registered"): + builder.register_agent(lambda: DummyAgent(id="agent2", name="second"), name="MyAgent") + + +def test_register_and_add_edge_with_strings(): + """Test that registered executors can be connected using string names.""" + builder = WorkflowBuilder() + + # Register executors + builder.register_executor(lambda: MockExecutor(id="source"), name="Source") + builder.register_executor(lambda: MockExecutor(id="target"), name="Target") + + # Add edge using string names + workflow = builder.set_start_executor("Source").add_edge("Source", "Target").build() + + # Verify edge is created correctly + assert workflow.start_executor_id == "source" + assert "source" in workflow.executors + assert "target" in workflow.executors + + +def test_register_agent_and_add_edge_with_strings(): + """Test that registered agents can be connected using string names.""" + builder = WorkflowBuilder() + + # Register agents + builder.register_agent(lambda: DummyAgent(id="writer_id", name="writer"), name="Writer") + builder.register_agent(lambda: DummyAgent(id="reviewer_id", name="reviewer"), name="Reviewer") + + # Add edge using string names + workflow = builder.set_start_executor("Writer").add_edge("Writer", "Reviewer").build() + + # Verify edge is created correctly + assert workflow.start_executor_id == "writer" + assert "writer" in workflow.executors + assert "reviewer" in workflow.executors + assert all(isinstance(e, AgentExecutor) for e in workflow.executors.values()) + + +def test_register_with_fan_out_edges(): + """Test using registered names with fan-out edge groups.""" + builder = WorkflowBuilder() + + # Register executors - IDs must match registered names + builder.register_executor(lambda: MockExecutor(id="Source"), name="Source") + builder.register_executor(lambda: MockExecutor(id="Target1"), name="Target1") + builder.register_executor(lambda: MockExecutor(id="Target2"), name="Target2") + + # Add fan-out edges using registered names + workflow = builder.set_start_executor("Source").add_fan_out_edges("Source", ["Target1", "Target2"]).build() + + # Verify all executors are present + assert "Source" in workflow.executors + assert "Target1" in workflow.executors + assert "Target2" in workflow.executors + + +def test_register_with_fan_in_edges(): + """Test using registered names with fan-in edge groups.""" + builder = WorkflowBuilder() + + # Register executors - IDs must match registered names + builder.register_executor(lambda: MockExecutor(id="Source1"), name="Source1") + builder.register_executor(lambda: MockExecutor(id="Source2"), name="Source2") + builder.register_executor(lambda: MockAggregator(id="Aggregator"), name="Aggregator") + + # Add fan-in edges using registered names + # Both Source1 and Source2 need to be reachable, so connect Source1 to Source2 + workflow = ( + builder.set_start_executor("Source1") + .add_edge("Source1", "Source2") + .add_fan_in_edges(["Source1", "Source2"], "Aggregator") + .build() + ) + + # Verify all executors are present + assert "Source1" in workflow.executors + assert "Source2" in workflow.executors + assert "Aggregator" in workflow.executors + + +def test_register_with_chain(): + """Test using registered names with add_chain.""" + builder = WorkflowBuilder() + + # Register executors - IDs must match registered names + builder.register_executor(lambda: MockExecutor(id="Step1"), name="Step1") + builder.register_executor(lambda: MockExecutor(id="Step2"), name="Step2") + builder.register_executor(lambda: MockExecutor(id="Step3"), name="Step3") + + # Add chain using registered names + workflow = builder.add_chain(["Step1", "Step2", "Step3"]).set_start_executor("Step1").build() + + # Verify all executors are present + assert "Step1" in workflow.executors + assert "Step2" in workflow.executors + assert "Step3" in workflow.executors + assert workflow.start_executor_id == "Step1" + + +def test_register_factory_called_only_once(): + """Test that registered factory functions are called only during build.""" + call_count = 0 + + def factory(): + nonlocal call_count + call_count += 1 + return MockExecutor(id="Test") + + builder = WorkflowBuilder() + builder.register_executor(factory, name="Test") + + # Factory should not be called yet + assert call_count == 0 + + # Add edge without building + builder.set_start_executor("Test") + + # Factory should still not be called + assert call_count == 0 + + # Build workflow + workflow = builder.build() + + # Factory should now be called exactly once + assert call_count == 1 + assert "Test" in workflow.executors + + +def test_mixing_eager_and_lazy_initialization_error(): + """Test that mixing eager executor instances with lazy string names raises appropriate error.""" + builder = WorkflowBuilder() + + # Create an eager executor instance + eager_executor = MockExecutor(id="eager") + + # Register a lazy executor + builder.register_executor(lambda: MockExecutor(id="Lazy"), name="Lazy") + + # Mixing eager and lazy should raise an error during add_edge + with pytest.raises(ValueError, match="Both source and target must be either names"): + builder.add_edge(eager_executor, "Lazy") + + +def test_register_with_condition(): + """Test adding edges with conditions using registered names.""" + builder = WorkflowBuilder() + + def condition_func(msg: MockMessage) -> bool: + return msg.data > 0 + + # Register executors - IDs must match registered names + builder.register_executor(lambda: MockExecutor(id="Source"), name="Source") + builder.register_executor(lambda: MockExecutor(id="Target"), name="Target") + + # Add edge with condition + workflow = builder.set_start_executor("Source").add_edge("Source", "Target", condition=condition_func).build() + + # Verify workflow is built correctly + assert "Source" in workflow.executors + assert "Target" in workflow.executors + + +def test_register_agent_creates_unique_instances(): + """Test that registered agent factories create new instances on each build.""" + instance_ids: list[int] = [] + + def agent_factory() -> DummyAgent: + agent = DummyAgent(id=f"agent_{len(instance_ids)}", name="test") + instance_ids.append(id(agent)) + return agent + + # Build first workflow + builder1 = WorkflowBuilder() + builder1.register_agent(agent_factory, name="Agent") + _ = builder1.set_start_executor("Agent").build() + + # Build second workflow + builder2 = WorkflowBuilder() + builder2.register_agent(agent_factory, name="Agent") + _ = builder2.set_start_executor("Agent").build() + + # Verify that two different agent instances were created + assert len(instance_ids) == 2 + assert instance_ids[0] != instance_ids[1] diff --git a/python/samples/getting_started/workflows/_start-here/step4_using_factories.py b/python/samples/getting_started/workflows/_start-here/step4_using_factories.py new file mode 100644 index 0000000000..1c9dd5b1e6 --- /dev/null +++ b/python/samples/getting_started/workflows/_start-here/step4_using_factories.py @@ -0,0 +1,104 @@ +# Copyright (c) Microsoft. All rights reserved. + +import asyncio + +from agent_framework import ( + AgentRunResponse, + ChatAgent, + Executor, + WorkflowBuilder, + WorkflowContext, + WorkflowOutputEvent, + executor, + handler, +) +from agent_framework.azure import AzureOpenAIChatClient +from azure.identity import AzureCliCredential + +""" +Step 4: Using Factories to Define Executors and Agents + +What this example shows +- Defining custom executors using both class-based and function-based approaches. +- Registering executor and agent factories with WorkflowBuilder for lazy instantiation. +- Building a simple workflow that transforms input text through multiple steps. + +Benefits of using factories +- Decouples executor and agent creation from workflow definition. +- Isolated instances are created for workflow builder build, allowing for cleaner state management + and handling parallel workflow runs. + +It is recommended to use factories when defining executors and agents for production workflows. + +Prerequisites +- No external services required. +""" + + +class UpperCase(Executor): + def __init__(self, id: str): + super().__init__(id=id) + + @handler + async def to_upper_case(self, text: str, ctx: WorkflowContext[str]) -> None: + """Convert the input to uppercase and forward it to the next node.""" + result = text.upper() + + # Send the result to the next executor in the workflow. + await ctx.send_message(result) + + +@executor(id="reverse_text_executor") +async def reverse_text(text: str, ctx: WorkflowContext[str]) -> None: + """Reverse the input string and send it downstream.""" + result = text[::-1] + + # Send the result to the next executor in the workflow. + await ctx.send_message(result) + + +def create_agent() -> ChatAgent: + """Factory function to create a Writer agent.""" + return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent( + instructions=("You decode messages. Try to reconstruct the original message."), + name="decoder", + ) + + +async def main(): + """Build and run a simple 2-step workflow using the fluent builder API.""" + # Build the workflow using a fluent pattern: + # 1) register_executor(factory, name) registers an executor factory + # 2) register_agent(factory, name) registers an agent factory + # 3) add_chain([node_names]) adds a sequence of nodes to the workflow + # 4) set_start_executor(node) declares the entry point + # 5) build() finalizes and returns an immutable Workflow object + workflow = ( + WorkflowBuilder() + .register_executor(lambda: UpperCase(id="upper_case_executor"), name="UpperCase") + .register_executor(lambda: reverse_text, name="ReverseText") + .register_agent(create_agent, name="DecoderAgent", output_response=True) + .add_chain(["UpperCase", "ReverseText", "DecoderAgent"]) + .set_start_executor("UpperCase") + .build() + ) + + output: AgentRunResponse | None = None + async for event in workflow.run_stream("hello world"): + if isinstance(event, WorkflowOutputEvent) and isinstance(event.data, AgentRunResponse): + output = event.data + + if output: + print(f"Decoded output: {output.text}") + else: + print("No output received.") + + """ + Sample Output: + + HELLO WORLD + """ + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/python/samples/getting_started/workflows/agents/azure_ai_agents_streaming.py b/python/samples/getting_started/workflows/agents/azure_ai_agents_streaming.py index 12b4cda4c1..9494c700e5 100644 --- a/python/samples/getting_started/workflows/agents/azure_ai_agents_streaming.py +++ b/python/samples/getting_started/workflows/agents/azure_ai_agents_streaming.py @@ -1,11 +1,8 @@ # Copyright (c) Microsoft. All rights reserved. import asyncio -from collections.abc import Awaitable, Callable -from contextlib import AsyncExitStack -from typing import Any -from agent_framework import AgentRunUpdateEvent, WorkflowBuilder, WorkflowOutputEvent +from agent_framework import AgentRunUpdateEvent, ChatAgent, WorkflowBuilder, WorkflowOutputEvent from agent_framework.azure import AzureAIAgentClient from azure.identity.aio import AzureCliCredential @@ -29,48 +26,36 @@ """ -async def create_azure_ai_agent() -> tuple[Callable[..., Awaitable[Any]], Callable[[], Awaitable[None]]]: - """Helper method to create a Azure AI agent factory and a close function. +def create_writer_agent(client: AzureAIAgentClient) -> ChatAgent: + return client.create_agent( + name="Writer", + instructions=( + "You are an excellent content writer. You create new content and edit contents based on the feedback." + ), + ) - This makes sure the async context managers are properly handled. - """ - stack = AsyncExitStack() - cred = await stack.enter_async_context(AzureCliCredential()) - client = await stack.enter_async_context(AzureAIAgentClient(async_credential=cred)) - - async def agent(**kwargs: Any) -> Any: - return await stack.enter_async_context(client.create_agent(**kwargs)) - - async def close() -> None: - await stack.aclose() - - return agent, close +def create_reviewer_agent(client: AzureAIAgentClient) -> ChatAgent: + return client.create_agent( + name="Reviewer", + instructions=( + "You are an excellent content reviewer. " + "Provide actionable feedback to the writer about the provided content. " + "Provide the feedback in the most concise manner possible." + ), + ) async def main() -> None: - agent, close = await create_azure_ai_agent() - try: - writer = await agent( - name="Writer", - instructions=( - "You are an excellent content writer. You create new content and edit contents based on the feedback." - ), - ) - reviewer = await agent( - name="Reviewer", - instructions=( - "You are an excellent content reviewer. " - "Provide actionable feedback to the writer about the provided content. " - "Provide the feedback in the most concise manner possible." - ), - ) + async with AzureCliCredential() as cred, AzureAIAgentClient(async_credential=cred) as client: # Build the workflow by adding agents directly as edges. # Agents adapt to workflow mode: run_stream() for incremental updates, run() for complete responses. workflow = ( WorkflowBuilder() - .set_start_executor(writer) - .add_edge(writer, reviewer) + .register_agent(lambda: create_writer_agent(client), name="writer") + .register_agent(lambda: create_reviewer_agent(client), name="reviewer", output_response=True) + .set_start_executor("writer") + .add_edge("writer", "reviewer") .build() ) @@ -89,8 +74,6 @@ async def main() -> None: elif isinstance(event, WorkflowOutputEvent): print("\n===== Final output =====") print(event.data) - finally: - await close() if __name__ == "__main__": diff --git a/python/samples/getting_started/workflows/agents/azure_chat_agents_function_bridge.py b/python/samples/getting_started/workflows/agents/azure_chat_agents_function_bridge.py index a78231444b..90b1919b08 100644 --- a/python/samples/getting_started/workflows/agents/azure_chat_agents_function_bridge.py +++ b/python/samples/getting_started/workflows/agents/azure_chat_agents_function_bridge.py @@ -86,18 +86,17 @@ async def enrich_with_references( await ctx.send_message(AgentExecutorRequest(messages=conversation)) -async def main() -> None: - """Run the workflow and stream combined updates from both agents.""" - chat_client = AzureOpenAIChatClient(credential=AzureCliCredential()) - - research_agent = chat_client.create_agent( +def create_research_agent(): + return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent( name="research_agent", instructions=( "Produce a short, bullet-style briefing with two actionable ideas. Label the section as 'Initial Draft'." ), ) - final_editor_agent = chat_client.create_agent( + +def create_final_editor_agent(): + return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent( name="final_editor_agent", instructions=( "Use all conversation context (including external notes) to produce the final answer. " @@ -105,11 +104,17 @@ async def main() -> None: ), ) + +async def main() -> None: + """Run the workflow and stream combined updates from both agents.""" workflow = ( WorkflowBuilder() - .set_start_executor(research_agent) - .add_edge(research_agent, enrich_with_references) - .add_edge(enrich_with_references, final_editor_agent) + .register_agent(create_research_agent, name="research_agent") + .register_agent(create_final_editor_agent, name="final_editor_agent") + .register_executor(lambda: enrich_with_references, name="enrich_with_references") + .set_start_executor("research_agent") + .add_edge("research_agent", "enrich_with_references") + .add_edge("enrich_with_references", "final_editor_agent") .build() ) diff --git a/python/samples/getting_started/workflows/agents/azure_chat_agents_streaming.py b/python/samples/getting_started/workflows/agents/azure_chat_agents_streaming.py index f811e8460d..97820dffc1 100644 --- a/python/samples/getting_started/workflows/agents/azure_chat_agents_streaming.py +++ b/python/samples/getting_started/workflows/agents/azure_chat_agents_streaming.py @@ -26,35 +26,37 @@ """ -async def main(): - """Build and run a simple two node agent workflow: Writer then Reviewer.""" - # Create the Azure chat client. AzureCliCredential uses your current az login. - chat_client = AzureOpenAIChatClient(credential=AzureCliCredential()) - - # Define two domain specific chat agents. - writer_agent = chat_client.create_agent( +def create_writer_agent(): + return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent( instructions=( "You are an excellent content writer. You create new content and edit contents based on the feedback." ), - name="writer_agent", + name="writer", ) - reviewer_agent = chat_client.create_agent( + +def create_reviewer_agent(): + return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent( instructions=( "You are an excellent content reviewer." "Provide actionable feedback to the writer about the provided content." "Provide the feedback in the most concise manner possible." ), - name="reviewer_agent", + name="reviewer", ) + +async def main(): + """Build and run a simple two node agent workflow: Writer then Reviewer.""" # Build the workflow using the fluent builder. # Set the start node and connect an edge from writer to reviewer. # Agents adapt to workflow mode: run_stream() for incremental updates, run() for complete responses. workflow = ( WorkflowBuilder() - .set_start_executor(writer_agent) - .add_edge(writer_agent, reviewer_agent) + .register_agent(create_writer_agent, name="writer") + .register_agent(create_reviewer_agent, name="reviewer", output_response=True) + .set_start_executor("writer") + .add_edge("writer", "reviewer") .build() ) diff --git a/python/samples/getting_started/workflows/agents/azure_chat_agents_tool_calls_with_feedback.py b/python/samples/getting_started/workflows/agents/azure_chat_agents_tool_calls_with_feedback.py index f5cb8e99e8..b0fbb7eea1 100644 --- a/python/samples/getting_started/workflows/agents/azure_chat_agents_tool_calls_with_feedback.py +++ b/python/samples/getting_started/workflows/agents/azure_chat_agents_tool_calls_with_feedback.py @@ -10,6 +10,7 @@ AgentExecutorResponse, AgentRunResponse, AgentRunUpdateEvent, + ChatAgent, ChatMessage, Executor, FunctionCallContent, @@ -166,6 +167,31 @@ async def on_human_feedback( ) +def create_writer_agent() -> ChatAgent: + """Creates a writer agent with tools.""" + return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent( + name="writer_agent", + instructions=( + "You are a marketing writer. Call the available tools before drafting copy so you are precise. " + "Always call both tools once before drafting. Summarize tool outputs as bullet points, then " + "produce a 3-sentence draft." + ), + tools=[fetch_product_brief, get_brand_voice_profile], + tool_choice=ToolMode.REQUIRED_ANY, + ) + + +def create_final_editor_agent() -> ChatAgent: + """Creates a final editor agent.""" + return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent( + name="final_editor_agent", + instructions=( + "You are an editor who polishes marketing copy after human approval. " + "Correct any legal or factual issues. Return the final version even if no changes are made. " + ), + ) + + def display_agent_run_update(event: AgentRunUpdateEvent, last_executor: str | None) -> None: """Display an AgentRunUpdateEvent in a readable format.""" printed_tool_calls: set[str] = set() @@ -211,42 +237,25 @@ def display_agent_run_update(event: AgentRunUpdateEvent, last_executor: str | No async def main() -> None: """Run the workflow and bridge human feedback between two agents.""" - # Create agents with tools and instructions. - chat_client = AzureOpenAIChatClient(credential=AzureCliCredential()) - - writer_agent = chat_client.create_agent( - name="writer_agent", - instructions=( - "You are a marketing writer. Call the available tools before drafting copy so you are precise. " - "Always call both tools once before drafting. Summarize tool outputs as bullet points, then " - "produce a 3-sentence draft." - ), - tools=[fetch_product_brief, get_brand_voice_profile], - tool_choice=ToolMode.REQUIRED_ANY, - ) - - final_editor_agent = chat_client.create_agent( - name="final_editor_agent", - instructions=( - "You are an editor who polishes marketing copy after human approval. " - "Correct any legal or factual issues. Return the final version even if no changes are made. " - ), - ) - - coordinator = Coordinator( - id="coordinator", - writer_id="writer_agent", - final_editor_id="final_editor_agent", - ) # Build the workflow. workflow = ( WorkflowBuilder() - .set_start_executor(writer_agent) - .add_edge(writer_agent, coordinator) - .add_edge(coordinator, writer_agent) - .add_edge(final_editor_agent, coordinator) - .add_edge(coordinator, final_editor_agent) + .register_agent(create_writer_agent, name="writer_agent") + .register_agent(create_final_editor_agent, name="final_editor_agent") + .register_executor( + lambda: Coordinator( + id="coordinator", + writer_id="writer_agent", + final_editor_id="final_editor_agent", + ), + name="coordinator", + ) + .set_start_executor("writer_agent") + .add_edge("writer_agent", "coordinator") + .add_edge("coordinator", "writer_agent") + .add_edge("final_editor_agent", "coordinator") + .add_edge("coordinator", "final_editor_agent") .build() ) diff --git a/python/samples/getting_started/workflows/agents/custom_agent_executors.py b/python/samples/getting_started/workflows/agents/custom_agent_executors.py index 91d2ceb2a8..8791305392 100644 --- a/python/samples/getting_started/workflows/agents/custom_agent_executors.py +++ b/python/samples/getting_started/workflows/agents/custom_agent_executors.py @@ -41,9 +41,9 @@ class Writer(Executor): agent: ChatAgent - def __init__(self, chat_client: AzureOpenAIChatClient, id: str = "writer"): + def __init__(self, id: str = "writer"): # Create a domain specific agent using your configured AzureOpenAIChatClient. - self.agent = chat_client.create_agent( + self.agent = AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent( instructions=( "You are an excellent content writer. You create new content and edit contents based on the feedback." ), @@ -83,9 +83,9 @@ class Reviewer(Executor): agent: ChatAgent - def __init__(self, chat_client: AzureOpenAIChatClient, id: str = "reviewer"): + def __init__(self, id: str = "reviewer"): # Create a domain specific agent that evaluates and refines content. - self.agent = chat_client.create_agent( + self.agent = AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent( instructions=( "You are an excellent content reviewer. You review the content and provide feedback to the writer." ), @@ -105,16 +105,17 @@ async def handle(self, messages: list[ChatMessage], ctx: WorkflowContext[list[Ch async def main(): """Build and run a simple two node agent workflow: Writer then Reviewer.""" - # Create the Azure chat client. AzureCliCredential uses your current az login. - chat_client = AzureOpenAIChatClient(credential=AzureCliCredential()) - - # Instantiate the two agent backed executors. - writer = Writer(chat_client) - reviewer = Reviewer(chat_client) # Build the workflow using the fluent builder. # Set the start node and connect an edge from writer to reviewer. - workflow = WorkflowBuilder().set_start_executor(writer).add_edge(writer, reviewer).build() + workflow = ( + WorkflowBuilder() + .register_executor(Writer, name="writer") + .register_executor(Reviewer, name="reviewer") + .set_start_executor("writer") + .add_edge("writer", "reviewer") + .build() + ) # Run the workflow with the user's initial message. # For foundational clarity, use run (non streaming) and print the workflow output. diff --git a/python/samples/getting_started/workflows/agents/mixed_agents_and_executors.py b/python/samples/getting_started/workflows/agents/mixed_agents_and_executors.py index cb71ba72e6..ddda154be4 100644 --- a/python/samples/getting_started/workflows/agents/mixed_agents_and_executors.py +++ b/python/samples/getting_started/workflows/agents/mixed_agents_and_executors.py @@ -5,6 +5,7 @@ from agent_framework import ( AgentExecutorResponse, + ChatAgent, Executor, HostedCodeInterpreterTool, WorkflowBuilder, @@ -70,21 +71,39 @@ async def handle(self, message: AgentExecutorResponse, ctx: WorkflowContext[Neve await ctx.yield_output(f"Correctness: {correctness}, Consumption: {consumption}") +def create_coding_agent(client: AzureAIAgentClient) -> ChatAgent: + """Create an AI agent with code interpretation capabilities. + + This agent can generate and execute Python code to solve problems. + + Args: + client: The AzureAIAgentClient used to create the agent + + Returns: + A ChatAgent configured with coding instructions and tools + """ + return client.create_agent( + name="CodingAgent", + instructions=("You are a helpful assistant that can write and execute Python code to solve problems."), + tools=HostedCodeInterpreterTool(), + ) + + async def main(): async with ( AzureCliCredential() as credential, AzureAIAgentClient(async_credential=credential) as chat_client, ): - # Create an agent with code interpretation capabilities - agent = chat_client.create_agent( - name="CodingAgent", - instructions=("You are a helpful assistant that can write and execute Python code to solve problems."), - tools=HostedCodeInterpreterTool(), - ) - # Build a workflow: Agent generates code -> Evaluator assesses results # The agent will be wrapped in a special agent executor which produces AgentExecutorResponse - workflow = WorkflowBuilder().set_start_executor(agent).add_edge(agent, Evaluator(id="evaluator")).build() + workflow = ( + WorkflowBuilder() + .register_agent(lambda: create_coding_agent(chat_client), name="coding_agent") + .register_executor(lambda: Evaluator(id="evaluator"), name="evaluator") + .set_start_executor("coding_agent") + .add_edge("coding_agent", "evaluator") + .build() + ) # Execute the workflow with a specific coding task results = await workflow.run( diff --git a/python/samples/getting_started/workflows/agents/workflow_as_agent_human_in_the_loop.py b/python/samples/getting_started/workflows/agents/workflow_as_agent_human_in_the_loop.py index 9935339709..3850cf74e7 100644 --- a/python/samples/getting_started/workflows/agents/workflow_as_agent_human_in_the_loop.py +++ b/python/samples/getting_started/workflows/agents/workflow_as_agent_human_in_the_loop.py @@ -81,7 +81,7 @@ async def review(self, request: ReviewRequest, ctx: WorkflowContext) -> None: @response_handler async def accept_human_review( self, - original_request: ReviewRequest, + original_request: HumanReviewRequest, response: ReviewResponse, ctx: WorkflowContext[ReviewResponse], ) -> None: @@ -97,20 +97,25 @@ async def main() -> None: print("Starting Workflow Agent with Human-in-the-Loop Demo") print("=" * 50) - # Create executors for the workflow. - print("Creating chat client and executors...") - mini_chat_client = AzureOpenAIChatClient(credential=AzureCliCredential()) - worker = Worker(id="sub-worker", chat_client=mini_chat_client) - reviewer = ReviewerWithHumanInTheLoop(worker_id=worker.id) - print("Building workflow with Worker-Reviewer cycle...") # Build a workflow with bidirectional communication between Worker and Reviewer, # and escalation paths for human review. agent = ( WorkflowBuilder() - .add_edge(worker, reviewer) # Worker sends requests to Reviewer - .add_edge(reviewer, worker) # Reviewer sends feedback to Worker - .set_start_executor(worker) + .register_executor( + lambda: Worker( + id="sub-worker", + chat_client=AzureOpenAIChatClient(credential=AzureCliCredential()), + ), + name="worker", + ) + .register_executor( + lambda: ReviewerWithHumanInTheLoop(worker_id="sub-worker"), + name="reviewer", + ) + .add_edge("worker", "reviewer") # Worker sends requests to Reviewer + .add_edge("reviewer", "worker") # Reviewer sends feedback to Worker + .set_start_executor("worker") .build() .as_agent() # Convert workflow into an agent interface ) diff --git a/python/samples/getting_started/workflows/agents/workflow_as_agent_reflection_pattern.py b/python/samples/getting_started/workflows/agents/workflow_as_agent_reflection_pattern.py index f8840845ac..85003239db 100644 --- a/python/samples/getting_started/workflows/agents/workflow_as_agent_reflection_pattern.py +++ b/python/samples/getting_started/workflows/agents/workflow_as_agent_reflection_pattern.py @@ -195,19 +195,20 @@ async def main() -> None: print("Starting Workflow Agent Demo") print("=" * 50) - # Initialize chat clients and executors. - print("Creating chat client and executors...") - mini_chat_client = OpenAIChatClient(model_id="gpt-4.1-nano") - chat_client = OpenAIChatClient(model_id="gpt-4.1") - reviewer = Reviewer(id="reviewer", chat_client=chat_client) - worker = Worker(id="worker", chat_client=mini_chat_client) - print("Building workflow with Worker โ†” Reviewer cycle...") agent = ( WorkflowBuilder() - .add_edge(worker, reviewer) # Worker sends responses to Reviewer - .add_edge(reviewer, worker) # Reviewer provides feedback to Worker - .set_start_executor(worker) + .register_executor( + lambda: Worker(id="worker", chat_client=OpenAIChatClient(model_id="gpt-4.1-nano")), + name="worker", + ) + .register_executor( + lambda: Reviewer(id="reviewer", chat_client=OpenAIChatClient(model_id="gpt-4.1")), + name="reviewer", + ) + .add_edge("worker", "reviewer") # Worker sends responses to Reviewer + .add_edge("reviewer", "worker") # Reviewer provides feedback to Worker + .set_start_executor("worker") .build() .as_agent() # Wrap workflow as an agent ) diff --git a/python/samples/getting_started/workflows/checkpoint/checkpoint_with_human_in_the_loop.py b/python/samples/getting_started/workflows/checkpoint/checkpoint_with_human_in_the_loop.py index 7effcdd28e..7b8d08a1af 100644 --- a/python/samples/getting_started/workflows/checkpoint/checkpoint_with_human_in_the_loop.py +++ b/python/samples/getting_started/workflows/checkpoint/checkpoint_with_human_in_the_loop.py @@ -10,7 +10,6 @@ # `agent_framework.builtin` chat client or mock the writer executor. We keep the # concrete import here so readers can see an end-to-end configuration. from agent_framework import ( - AgentExecutor, AgentExecutorRequest, AgentExecutorResponse, ChatMessage, @@ -173,25 +172,25 @@ async def on_checkpoint_restore(self, state: dict[str, Any]) -> None: def create_workflow(checkpoint_storage: FileCheckpointStorage) -> Workflow: """Assemble the workflow graph used by both the initial run and resume.""" - - # The Azure client is created once so our agent executor can issue calls to the hosted - # model. The agent id is stable across runs which keeps checkpoints deterministic. - chat_client = AzureOpenAIChatClient(credential=AzureCliCredential()) - agent = chat_client.create_agent(instructions="Write concise, warm release notes that sound human and helpful.") - - writer = AgentExecutor(agent, id="writer") - gateway = ReviewGateway(id="review_gateway", writer_id=writer.id) - prepare = BriefPreparer(id="prepare_brief", agent_id=writer.id) - # Wire the workflow DAG. Edges mirror the numbered steps described in the # module docstring. Because `WorkflowBuilder` is declarative, reading these # edges is often the quickest way to understand execution order. workflow_builder = ( WorkflowBuilder(max_iterations=6) - .set_start_executor(prepare) - .add_edge(prepare, writer) - .add_edge(writer, gateway) - .add_edge(gateway, writer) # revisions loop + .register_agent( + lambda: AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent( + instructions="Write concise, warm release notes that sound human and helpful.", + # The agent name is stable across runs which keeps checkpoints deterministic. + name="writer", + ), + name="writer", + ) + .register_executor(lambda: ReviewGateway(id="review_gateway", writer_id="writer"), name="review_gateway") + .register_executor(lambda: BriefPreparer(id="prepare_brief", agent_id="writer"), name="prepare_brief") + .set_start_executor("prepare_brief") + .add_edge("prepare_brief", "writer") + .add_edge("writer", "review_gateway") + .add_edge("review_gateway", "writer") # revisions loop .with_checkpointing(checkpoint_storage=checkpoint_storage) ) diff --git a/python/samples/getting_started/workflows/checkpoint/checkpoint_with_resume.py b/python/samples/getting_started/workflows/checkpoint/checkpoint_with_resume.py index 0b7e874a1a..a6f0a2431b 100644 --- a/python/samples/getting_started/workflows/checkpoint/checkpoint_with_resume.py +++ b/python/samples/getting_started/workflows/checkpoint/checkpoint_with_resume.py @@ -99,16 +99,14 @@ async def on_checkpoint_restore(self, state: dict[str, Any]) -> None: async def main(): - # Create workflow executors - start_executor = StartExecutor(id="start") - worker_executor = WorkerExecutor(id="worker") - # Build workflow with checkpointing enabled workflow_builder = ( WorkflowBuilder() - .set_start_executor(start_executor) - .add_edge(start_executor, worker_executor) - .add_edge(worker_executor, worker_executor) # Self-loop for iterative processing + .register_executor(lambda: StartExecutor(id="start"), name="start") + .register_executor(lambda: WorkerExecutor(id="worker"), name="worker") + .set_start_executor("start") + .add_edge("start", "worker") + .add_edge("worker", "worker") # Self-loop for iterative processing ) checkpoint_storage = InMemoryCheckpointStorage() workflow_builder = workflow_builder.with_checkpointing(checkpoint_storage=checkpoint_storage) diff --git a/python/samples/getting_started/workflows/checkpoint/sub_workflow_checkpoint.py b/python/samples/getting_started/workflows/checkpoint/sub_workflow_checkpoint.py index 2d90db8c2c..24dec9fb3e 100644 --- a/python/samples/getting_started/workflows/checkpoint/sub_workflow_checkpoint.py +++ b/python/samples/getting_started/workflows/checkpoint/sub_workflow_checkpoint.py @@ -292,16 +292,16 @@ async def on_checkpoint_restore(self, state: dict[str, Any]) -> None: def build_sub_workflow() -> WorkflowExecutor: - writer = DraftWriter() - router = DraftReviewRouter() - finaliser = DraftFinaliser() - + """Assemble the sub-workflow used by the parent workflow executor.""" sub_workflow = ( WorkflowBuilder() - .set_start_executor(writer) - .add_edge(writer, router) - .add_edge(router, finaliser) - .add_edge(finaliser, writer) # permits revision loops + .register_executor(DraftWriter, name="writer") + .register_executor(DraftReviewRouter, name="router") + .register_executor(DraftFinaliser, name="finaliser") + .set_start_executor("writer") + .add_edge("writer", "router") + .add_edge("router", "finaliser") + .add_edge("finaliser", "writer") # permits revision loops .build() ) @@ -309,14 +309,14 @@ def build_sub_workflow() -> WorkflowExecutor: def build_parent_workflow(storage: FileCheckpointStorage) -> Workflow: - coordinator = LaunchCoordinator() - sub_executor = build_sub_workflow() - + """Assemble the parent workflow that embeds the sub-workflow.""" return ( WorkflowBuilder() - .set_start_executor(coordinator) - .add_edge(coordinator, sub_executor) - .add_edge(sub_executor, coordinator) + .register_executor(LaunchCoordinator, name="coordinator") + .register_executor(build_sub_workflow, name="sub_executor") + .set_start_executor("coordinator") + .add_edge("coordinator", "sub_executor") + .add_edge("sub_executor", "coordinator") .with_checkpointing(storage) .build() ) diff --git a/python/samples/getting_started/workflows/composition/sub_workflow_basics.py b/python/samples/getting_started/workflows/composition/sub_workflow_basics.py index 683104f21a..826425a0ae 100644 --- a/python/samples/getting_started/workflows/composition/sub_workflow_basics.py +++ b/python/samples/getting_started/workflows/composition/sub_workflow_basics.py @@ -8,7 +8,6 @@ Executor, WorkflowBuilder, WorkflowContext, - WorkflowEvent, WorkflowExecutor, handler, ) @@ -46,13 +45,6 @@ class TextProcessingResult: char_count: int -class AllTasksCompleted(WorkflowEvent): - """Event triggered when all processing tasks are complete.""" - - def __init__(self, results: list[TextProcessingResult]): - super().__init__(results) - - # Sub-workflow executor class TextProcessor(Executor): """Processes text strings - counts words and characters.""" @@ -113,7 +105,11 @@ async def start_processing(self, texts: list[str], ctx: WorkflowContext[TextProc await ctx.send_message(request, target_id="text_processor_workflow") @handler - async def collect_result(self, result: TextProcessingResult, ctx: WorkflowContext) -> None: + async def collect_result( + self, + result: TextProcessingResult, + ctx: WorkflowContext[Never, list[TextProcessingResult]], + ) -> None: """Collect results from sub-workflows.""" print(f"๐Ÿ“ฅ Collected result from {result.task_id}") self.results.append(result) @@ -121,48 +117,54 @@ async def collect_result(self, result: TextProcessingResult, ctx: WorkflowContex # Check if all results are collected if len(self.results) == self.expected_count: print("\n๐ŸŽ‰ All tasks completed!") - await ctx.add_event(AllTasksCompleted(self.results)) + await ctx.yield_output(self.results) - def get_summary(self) -> dict[str, Any]: - """Get a summary of all processing results.""" - total_words = sum(result.word_count for result in self.results) - total_chars = sum(result.char_count for result in self.results) - avg_words = total_words / len(self.results) if self.results else 0 - avg_chars = total_chars / len(self.results) if self.results else 0 - return { - "total_texts": len(self.results), - "total_words": total_words, - "total_characters": total_chars, - "average_words_per_text": round(avg_words, 2), - "average_characters_per_text": round(avg_chars, 2), - } +def get_result_summary(results: list[TextProcessingResult]) -> dict[str, Any]: + """Get a summary of all processing results.""" + total_words = sum(result.word_count for result in results) + total_chars = sum(result.char_count for result in results) + avg_words = total_words / len(results) if results else 0 + avg_chars = total_chars / len(results) if results else 0 + return { + "total_texts": len(results), + "total_words": total_words, + "total_characters": total_chars, + "average_words_per_text": round(avg_words, 2), + "average_characters_per_text": round(avg_chars, 2), + } -async def main(): - """Main function to run the basic sub-workflow example.""" - print("๐Ÿš€ Setting up sub-workflow...") - # Step 1: Create the text processing sub-workflow - text_processor = TextProcessor() +def create_sub_workflow() -> WorkflowExecutor: + """Create the text processing sub-workflow.""" + print("๐Ÿš€ Setting up sub-workflow...") - processing_workflow = WorkflowBuilder().set_start_executor(text_processor).build() + processing_workflow = ( + WorkflowBuilder() + .register_executor(TextProcessor, name="text_processor") + .set_start_executor("text_processor") + .build() + ) - print("๐Ÿ”ง Setting up parent workflow...") + return WorkflowExecutor(processing_workflow, id="text_processor_workflow") - # Step 2: Create the parent workflow - orchestrator = TextProcessingOrchestrator() - workflow_executor = WorkflowExecutor(processing_workflow, id="text_processor_workflow") +async def main(): + """Main function to run the basic sub-workflow example.""" + print("๐Ÿ”ง Setting up parent workflow...") + # Step 1: Create the parent workflow main_workflow = ( WorkflowBuilder() - .set_start_executor(orchestrator) - .add_edge(orchestrator, workflow_executor) - .add_edge(workflow_executor, orchestrator) + .register_executor(TextProcessingOrchestrator, name="text_orchestrator") + .register_executor(create_sub_workflow, name="text_processor_workflow") + .set_start_executor("text_orchestrator") + .add_edge("text_orchestrator", "text_processor_workflow") + .add_edge("text_processor_workflow", "text_orchestrator") .build() ) - # Step 3: Test data - various text strings + # Step 2: Test data - various text strings test_texts = [ "Hello world! This is a simple test.", "Python is a powerful programming language used for many applications.", @@ -175,15 +177,17 @@ async def main(): print(f"\n๐Ÿงช Testing with {len(test_texts)} text strings") print("=" * 60) - # Step 4: Run the workflow - await main_workflow.run(test_texts) + # Step 3: Run the workflow + result = await main_workflow.run(test_texts) - # Step 5: Display results + # Step 4: Display results print("\n๐Ÿ“Š Processing Results:") print("=" * 60) # Sort results by task_id for consistent display - sorted_results = sorted(orchestrator.results, key=lambda r: r.task_id) + task_results = result.get_outputs() + assert len(task_results) == 1 + sorted_results = sorted(task_results[0], key=lambda r: r.task_id) for result in sorted_results: preview = result.text[:30] + "..." if len(result.text) > 30 else result.text @@ -191,7 +195,7 @@ async def main(): print(f"โœ… {result.task_id}: '{preview}' -> {result.word_count} words, {result.char_count} chars") # Step 6: Display summary - summary = orchestrator.get_summary() + summary = get_result_summary(sorted_results) print("\n๐Ÿ“ˆ Summary:") print("=" * 60) print(f"๐Ÿ“„ Total texts processed: {summary['total_texts']}") diff --git a/python/samples/getting_started/workflows/composition/sub_workflow_parallel_requests.py b/python/samples/getting_started/workflows/composition/sub_workflow_parallel_requests.py index b33a24b8b5..0959f591f0 100644 --- a/python/samples/getting_started/workflows/composition/sub_workflow_parallel_requests.py +++ b/python/samples/getting_started/workflows/composition/sub_workflow_parallel_requests.py @@ -169,19 +169,18 @@ async def collect(self, response: ResourceResponse | PolicyResponse, ctx: Workfl elif len(self._responses) > self._request_count: raise ValueError("Received more responses than expected") - orchestrator = RequestDistribution("orchestrator") - resource_requester = ResourceRequester("resource_requester") - policy_checker = PolicyChecker("policy_checker") - result_collector = ResultCollector("result_collector") - return ( WorkflowBuilder() - .set_start_executor(orchestrator) - .add_edge(orchestrator, resource_requester) - .add_edge(orchestrator, policy_checker) - .add_edge(resource_requester, result_collector) - .add_edge(policy_checker, result_collector) - .add_edge(orchestrator, result_collector) # For request count + .register_executor(lambda: RequestDistribution("orchestrator"), name="orchestrator") + .register_executor(lambda: ResourceRequester("resource_requester"), name="resource_requester") + .register_executor(lambda: PolicyChecker("policy_checker"), name="policy_checker") + .register_executor(lambda: ResultCollector("result_collector"), name="result_collector") + .set_start_executor("orchestrator") + .add_edge("orchestrator", "resource_requester") + .add_edge("orchestrator", "policy_checker") + .add_edge("resource_requester", "result_collector") + .add_edge("policy_checker", "result_collector") + .add_edge("orchestrator", "result_collector") # For request count .build() ) @@ -288,29 +287,27 @@ async def handle_external_response( async def main() -> None: - # Create executors in the main workflow - sub_workflow = build_resource_request_distribution_workflow() - resource_allocator = ResourceAllocator("resource_allocator") - policy_engine = PolicyEngine("policy_engine") - - # Create the WorkflowExecutor for the sub-workflow - # Setting allow_direct_output=True to let the sub-workflow output directly. - # This is because the sub-workflow is the both the entry point and the exit - # point of the main workflow. - sub_workflow_executor = WorkflowExecutor( - sub_workflow, - "sub_workflow_executor", - allow_direct_output=True, - ) - # Build the main workflow main_workflow = ( WorkflowBuilder() - .set_start_executor(sub_workflow_executor) - .add_edge(sub_workflow_executor, resource_allocator) - .add_edge(resource_allocator, sub_workflow_executor) - .add_edge(sub_workflow_executor, policy_engine) - .add_edge(policy_engine, sub_workflow_executor) + .register_executor(lambda: ResourceAllocator("resource_allocator"), name="resource_allocator") + .register_executor(lambda: PolicyEngine("policy_engine"), name="policy_engine") + .register_executor( + lambda: WorkflowExecutor( + build_resource_request_distribution_workflow(), + "sub_workflow_executor", + # Setting allow_direct_output=True to let the sub-workflow output directly. + # This is because the sub-workflow is the both the entry point and the exit + # point of the main workflow. + allow_direct_output=True, + ), + name="sub_workflow_executor", + ) + .set_start_executor("sub_workflow_executor") + .add_edge("sub_workflow_executor", "resource_allocator") + .add_edge("resource_allocator", "sub_workflow_executor") + .add_edge("sub_workflow_executor", "policy_engine") + .add_edge("policy_engine", "sub_workflow_executor") .build() ) diff --git a/python/samples/getting_started/workflows/composition/sub_workflow_request_interception.py b/python/samples/getting_started/workflows/composition/sub_workflow_request_interception.py index 241b87e18c..167ae2e950 100644 --- a/python/samples/getting_started/workflows/composition/sub_workflow_request_interception.py +++ b/python/samples/getting_started/workflows/composition/sub_workflow_request_interception.py @@ -154,15 +154,14 @@ async def handle_domain_validation_response( ) # Build the workflow - sanitizer = EmailSanitizer(id="email_sanitizer") - format_validator = EmailFormatValidator(id="email_format_validator") - domain_validator = DomainValidator(id="domain_validator") - return ( WorkflowBuilder() - .set_start_executor(sanitizer) - .add_edge(sanitizer, format_validator) - .add_edge(format_validator, domain_validator) + .register_executor(lambda: EmailSanitizer(id="email_sanitizer"), name="email_sanitizer") + .register_executor(lambda: EmailFormatValidator(id="email_format_validator"), name="email_format_validator") + .register_executor(lambda: DomainValidator(id="domain_validator"), name="domain_validator") + .set_start_executor("email_sanitizer") + .add_edge("email_sanitizer", "email_format_validator") + .add_edge("email_format_validator", "domain_validator") .build() ) @@ -270,21 +269,22 @@ async def main() -> None: # A list of approved domains approved_domains = {"example.com", "company.com"} - # Create executors in the main workflow - orchestrator = SmartEmailOrchestrator(id="smart_email_orchestrator", approved_domains=approved_domains) - email_delivery = EmailDelivery(id="email_delivery") - - # Create the sub-workflow for email address validation - validation_workflow = build_email_address_validation_workflow() - validation_workflow_executor = WorkflowExecutor(validation_workflow, id="email_validation_workflow") - # Build the main workflow workflow = ( WorkflowBuilder() - .set_start_executor(orchestrator) - .add_edge(orchestrator, validation_workflow_executor) - .add_edge(validation_workflow_executor, orchestrator) - .add_edge(orchestrator, email_delivery) + .register_executor( + lambda: SmartEmailOrchestrator(id="smart_email_orchestrator", approved_domains=approved_domains), + name="smart_email_orchestrator", + ) + .register_executor(lambda: EmailDelivery(id="email_delivery"), name="email_delivery") + .register_executor( + lambda: WorkflowExecutor(build_email_address_validation_workflow(), id="email_validation_workflow"), + name="email_validation_workflow", + ) + .set_start_executor("smart_email_orchestrator") + .add_edge("smart_email_orchestrator", "email_validation_workflow") + .add_edge("email_validation_workflow", "smart_email_orchestrator") + .add_edge("smart_email_orchestrator", "email_delivery") .build() ) diff --git a/python/samples/getting_started/workflows/control-flow/edge_condition.py b/python/samples/getting_started/workflows/control-flow/edge_condition.py index 6fedc0a86c..0dff43a58a 100644 --- a/python/samples/getting_started/workflows/control-flow/edge_condition.py +++ b/python/samples/getting_started/workflows/control-flow/edge_condition.py @@ -5,9 +5,9 @@ from typing import Any from agent_framework import ( # Core chat primitives used to build requests - AgentExecutor, # Wraps an LLM agent that can be invoked inside a workflow AgentExecutorRequest, # Input message bundle for an AgentExecutor - AgentExecutorResponse, # Output from an AgentExecutor + AgentExecutorResponse, + ChatAgent, # Output from an AgentExecutor ChatMessage, Role, WorkflowBuilder, # Fluent builder for wiring executors and edges @@ -128,38 +128,35 @@ async def to_email_assistant_request( await ctx.send_message(AgentExecutorRequest(messages=[user_msg], should_respond=True)) -async def main() -> None: - # Create agents +def create_spam_detector_agent() -> ChatAgent: + """Helper to create a spam detection agent.""" # AzureCliCredential uses your current az login. This avoids embedding secrets in code. - chat_client = AzureOpenAIChatClient(credential=AzureCliCredential()) - - # Agent 1. Classifies spam and returns a DetectionResult object. - # response_format enforces that the LLM returns parsable JSON for the Pydantic model. - spam_detection_agent = AgentExecutor( - chat_client.create_agent( - instructions=( - "You are a spam detection assistant that identifies spam emails. " - "Always return JSON with fields is_spam (bool), reason (string), and email_content (string). " - "Include the original email content in email_content." - ), - response_format=DetectionResult, + return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent( + instructions=( + "You are a spam detection assistant that identifies spam emails. " + "Always return JSON with fields is_spam (bool), reason (string), and email_content (string). " + "Include the original email content in email_content." ), - id="spam_detection_agent", + name="spam_detection_agent", + response_format=DetectionResult, ) - # Agent 2. Drafts a professional reply. Also uses structured JSON output for reliability. - email_assistant_agent = AgentExecutor( - chat_client.create_agent( - instructions=( - "You are an email assistant that helps users draft professional responses to emails. " - "Your input may be a JSON object that includes 'email_content'; base your reply on that content. " - "Return JSON with a single field 'response' containing the drafted reply." - ), - response_format=EmailResponse, + +def create_email_assistant_agent() -> ChatAgent: + """Helper to create an email assistant agent.""" + # AzureCliCredential uses your current az login. This avoids embedding secrets in code. + return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent( + instructions=( + "You are an email assistant that helps users draft professional responses to emails. " + "Your input may be a JSON object that includes 'email_content'; base your reply on that content. " + "Return JSON with a single field 'response' containing the drafted reply." ), - id="email_assistant_agent", + name="email_assistant_agent", + response_format=EmailResponse, ) + +async def main() -> None: # Build the workflow graph. # Start at the spam detector. # If not spam, hop to a transformer that creates a new AgentExecutorRequest, @@ -167,13 +164,18 @@ async def main() -> None: # If spam, go directly to the spam handler and finalize. workflow = ( WorkflowBuilder() - .set_start_executor(spam_detection_agent) + .register_agent(create_spam_detector_agent, name="spam_detection_agent") + .register_agent(create_email_assistant_agent, name="email_assistant_agent") + .register_executor(lambda: to_email_assistant_request, name="to_email_assistant_request") + .register_executor(lambda: handle_email_response, name="send_email") + .register_executor(lambda: handle_spam_classifier_response, name="handle_spam") + .set_start_executor("spam_detection_agent") # Not spam path: transform response -> request for assistant -> assistant -> send email - .add_edge(spam_detection_agent, to_email_assistant_request, condition=get_condition(False)) - .add_edge(to_email_assistant_request, email_assistant_agent) - .add_edge(email_assistant_agent, handle_email_response) + .add_edge("spam_detection_agent", "to_email_assistant_request", condition=get_condition(False)) + .add_edge("to_email_assistant_request", "email_assistant_agent") + .add_edge("email_assistant_agent", "send_email") # Spam path: send to spam handler - .add_edge(spam_detection_agent, handle_spam_classifier_response, condition=get_condition(True)) + .add_edge("spam_detection_agent", "handle_spam", condition=get_condition(True)) .build() ) diff --git a/python/samples/getting_started/workflows/control-flow/multi_selection_edge_group.py b/python/samples/getting_started/workflows/control-flow/multi_selection_edge_group.py index 9be33befc3..b7935a5e75 100644 --- a/python/samples/getting_started/workflows/control-flow/multi_selection_edge_group.py +++ b/python/samples/getting_started/workflows/control-flow/multi_selection_edge_group.py @@ -9,9 +9,9 @@ from uuid import uuid4 from agent_framework import ( - AgentExecutor, AgentExecutorRequest, AgentExecutorResponse, + ChatAgent, ChatMessage, Role, WorkflowBuilder, @@ -181,40 +181,38 @@ async def database_access(analysis: AnalysisResult, ctx: WorkflowContext[Never, await ctx.add_event(DatabaseEvent(f"Email {analysis.email_id} saved to database.")) -async def main() -> None: - # Agents - chat_client = AzureOpenAIChatClient(credential=AzureCliCredential()) - - email_analysis_agent = AgentExecutor( - chat_client.create_agent( - instructions=( - "You are a spam detection assistant that identifies spam emails. " - "Always return JSON with fields 'spam_decision' (one of NotSpam, Spam, Uncertain) " - "and 'reason' (string)." - ), - response_format=AnalysisResultAgent, +def create_email_analysis_agent() -> ChatAgent: + """Creates the email analysis agent.""" + return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent( + instructions=( + "You are a spam detection assistant that identifies spam emails. " + "Always return JSON with fields 'spam_decision' (one of NotSpam, Spam, Uncertain) " + "and 'reason' (string)." ), - id="email_analysis_agent", + name="email_analysis_agent", + response_format=AnalysisResultAgent, ) - email_assistant_agent = AgentExecutor( - chat_client.create_agent( - instructions=( - "You are an email assistant that helps users draft responses to emails with professionalism." - ), - response_format=EmailResponse, - ), - id="email_assistant_agent", + +def create_email_assistant_agent() -> ChatAgent: + """Creates the email assistant agent.""" + return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent( + instructions=("You are an email assistant that helps users draft responses to emails with professionalism."), + name="email_assistant_agent", + response_format=EmailResponse, ) - email_summary_agent = AgentExecutor( - chat_client.create_agent( - instructions=("You are an assistant that helps users summarize emails."), - response_format=EmailSummaryModel, - ), - id="email_summary_agent", + +def create_email_summary_agent() -> ChatAgent: + """Creates the email summary agent.""" + return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent( + instructions=("You are an assistant that helps users summarize emails."), + name="email_summary_agent", + response_format=EmailSummaryModel, ) + +async def main() -> None: # Build the workflow def select_targets(analysis: AnalysisResult, target_ids: list[str]) -> list[str]: # Order: [handle_spam, submit_to_email_assistant, summarize_email, handle_uncertain] @@ -228,24 +226,39 @@ def select_targets(analysis: AnalysisResult, target_ids: list[str]) -> list[str] return targets return [handle_uncertain_id] - workflow = ( + workflow_builder = ( WorkflowBuilder() - .set_start_executor(store_email) - .add_edge(store_email, email_analysis_agent) - .add_edge(email_analysis_agent, to_analysis_result) + .register_agent(create_email_analysis_agent, name="email_analysis_agent") + .register_agent(create_email_assistant_agent, name="email_assistant_agent") + .register_agent(create_email_summary_agent, name="email_summary_agent") + .register_executor(lambda: store_email, name="store_email") + .register_executor(lambda: to_analysis_result, name="to_analysis_result") + .register_executor(lambda: submit_to_email_assistant, name="submit_to_email_assistant") + .register_executor(lambda: finalize_and_send, name="finalize_and_send") + .register_executor(lambda: summarize_email, name="summarize_email") + .register_executor(lambda: merge_summary, name="merge_summary") + .register_executor(lambda: handle_spam, name="handle_spam") + .register_executor(lambda: handle_uncertain, name="handle_uncertain") + .register_executor(lambda: database_access, name="database_access") + ) + + workflow = ( + workflow_builder.set_start_executor("store_email") + .add_edge("store_email", "email_analysis_agent") + .add_edge("email_analysis_agent", "to_analysis_result") .add_multi_selection_edge_group( - to_analysis_result, - [handle_spam, submit_to_email_assistant, summarize_email, handle_uncertain], + "to_analysis_result", + ["handle_spam", "submit_to_email_assistant", "summarize_email", "handle_uncertain"], selection_func=select_targets, ) - .add_edge(submit_to_email_assistant, email_assistant_agent) - .add_edge(email_assistant_agent, finalize_and_send) - .add_edge(summarize_email, email_summary_agent) - .add_edge(email_summary_agent, merge_summary) + .add_edge("submit_to_email_assistant", "email_assistant_agent") + .add_edge("email_assistant_agent", "finalize_and_send") + .add_edge("summarize_email", "email_summary_agent") + .add_edge("email_summary_agent", "merge_summary") # Save to DB if short (no summary path) - .add_edge(to_analysis_result, database_access, condition=lambda r: r.email_length <= LONG_EMAIL_THRESHOLD) + .add_edge("to_analysis_result", "database_access", condition=lambda r: r.email_length <= LONG_EMAIL_THRESHOLD) # Save to DB with summary when long - .add_edge(merge_summary, database_access) + .add_edge("merge_summary", "database_access") .build() ) diff --git a/python/samples/getting_started/workflows/control-flow/sequential_executors.py b/python/samples/getting_started/workflows/control-flow/sequential_executors.py index 81d911e4fb..e422009766 100644 --- a/python/samples/getting_started/workflows/control-flow/sequential_executors.py +++ b/python/samples/getting_started/workflows/control-flow/sequential_executors.py @@ -61,20 +61,18 @@ async def reverse_text(self, text: str, ctx: WorkflowContext[Never, str]) -> Non async def main() -> None: """Build a two step sequential workflow and run it with streaming to observe events.""" - # Step 1: Create executor instances. - upper_case_executor = UpperCaseExecutor(id="upper_case_executor") - reverse_text_executor = ReverseTextExecutor(id="reverse_text_executor") - - # Step 2: Build the workflow graph. + # Step 1: Build the workflow graph. # Order matters. We connect upper_case_executor -> reverse_text_executor and set the start. workflow = ( WorkflowBuilder() - .add_edge(upper_case_executor, reverse_text_executor) - .set_start_executor(upper_case_executor) + .register_executor(lambda: UpperCaseExecutor(id="upper_case_executor"), name="upper_case_executor") + .register_executor(lambda: ReverseTextExecutor(id="reverse_text_executor"), name="reverse_text_executor") + .add_edge("upper_case_executor", "reverse_text_executor") + .set_start_executor("upper_case_executor") .build() ) - # Step 3: Stream events for a single input. + # Step 2: Stream events for a single input. # The stream will include executor invoke and completion events, plus workflow outputs. outputs: list[str] = [] async for event in workflow.run_stream("hello world"): diff --git a/python/samples/getting_started/workflows/control-flow/sequential_streaming.py b/python/samples/getting_started/workflows/control-flow/sequential_streaming.py index 91515d82f0..3030d4ff44 100644 --- a/python/samples/getting_started/workflows/control-flow/sequential_streaming.py +++ b/python/samples/getting_started/workflows/control-flow/sequential_streaming.py @@ -52,11 +52,18 @@ async def reverse_text(text: str, ctx: WorkflowContext[Never, str]) -> None: async def main(): """Build a two-step sequential workflow and run it with streaming to observe events.""" - # Step 2: Build the workflow with the defined edges. + # Step 1: Build the workflow with the defined edges. # Order matters. upper_case_executor runs first, then reverse_text_executor. - workflow = WorkflowBuilder().add_edge(to_upper_case, reverse_text).set_start_executor(to_upper_case).build() - - # Step 3: Run the workflow and stream events in real time. + workflow = ( + WorkflowBuilder() + .register_executor(lambda: to_upper_case, name="upper_case_executor") + .register_executor(lambda: reverse_text, name="reverse_text_executor") + .add_edge("upper_case_executor", "reverse_text_executor") + .set_start_executor("upper_case_executor") + .build() + ) + + # Step 2: Run the workflow and stream events in real time. async for event in workflow.run_stream("hello world"): # You will see executor invoke and completion events as the workflow progresses. print(f"Event: {event}") diff --git a/python/samples/getting_started/workflows/control-flow/simple_loop.py b/python/samples/getting_started/workflows/control-flow/simple_loop.py index 53d65331ae..7bb3389a08 100644 --- a/python/samples/getting_started/workflows/control-flow/simple_loop.py +++ b/python/samples/getting_started/workflows/control-flow/simple_loop.py @@ -4,16 +4,15 @@ from enum import Enum from agent_framework import ( - AgentExecutor, AgentExecutorRequest, AgentExecutorResponse, + ChatAgent, ChatMessage, Executor, ExecutorCompletedEvent, Role, WorkflowBuilder, WorkflowContext, - WorkflowOutputEvent, handler, ) from agent_framework.azure import AzureOpenAIChatClient @@ -49,9 +48,9 @@ class NumberSignal(Enum): class GuessNumberExecutor(Executor): """An executor that guesses a number.""" - def __init__(self, bound: tuple[int, int], id: str | None = None): + def __init__(self, bound: tuple[int, int], id: str): """Initialize the executor with a target number.""" - super().__init__(id=id or "guess_number") + super().__init__(id=id) self._lower = bound[0] self._upper = bound[1] @@ -116,43 +115,37 @@ async def parse(self, response: AgentExecutorResponse, ctx: WorkflowContext[Numb await ctx.send_message(NumberSignal.BELOW) -async def main(): - """Main function to run the workflow.""" - # Step 1: Create the executors. - guess_number_executor = GuessNumberExecutor((1, 100)) - - # Agent judge setup - chat_client = AzureOpenAIChatClient(credential=AzureCliCredential()) - judge_agent = AgentExecutor( - chat_client.create_agent( - instructions=( - "You strictly respond with one of: MATCHED, ABOVE, BELOW based on the given target and guess." - ) - ), - id="judge_agent", +def create_judge_agent() -> ChatAgent: + """Create a judge agent that evaluates guesses.""" + return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent( + instructions=("You strictly respond with one of: MATCHED, ABOVE, BELOW based on the given target and guess."), + name="judge_agent", ) - submit_to_judge = SubmitToJudgeAgent(judge_agent_id=judge_agent.id, target=30, id="submit_judge") - parse_judge = ParseJudgeResponse(id="parse_judge") - # Step 2: Build the workflow with the defined edges. + +async def main(): + """Main function to run the workflow.""" + # Step 1: Build the workflow with the defined edges. # This time we are creating a loop in the workflow. workflow = ( WorkflowBuilder() - .add_edge(guess_number_executor, submit_to_judge) - .add_edge(submit_to_judge, judge_agent) - .add_edge(judge_agent, parse_judge) - .add_edge(parse_judge, guess_number_executor) - .set_start_executor(guess_number_executor) + .register_executor(lambda: GuessNumberExecutor((1, 100), "guess_number"), name="guess_number") + .register_agent(create_judge_agent, name="judge_agent") + .register_executor(lambda: SubmitToJudgeAgent(judge_agent_id="judge_agent", target=30), name="submit_judge") + .register_executor(lambda: ParseJudgeResponse(id="parse_judge"), name="parse_judge") + .add_edge("guess_number", "submit_judge") + .add_edge("submit_judge", "judge_agent") + .add_edge("judge_agent", "parse_judge") + .add_edge("parse_judge", "guess_number") + .set_start_executor("guess_number") .build() ) - # Step 3: Run the workflow and print the events. + # Step 2: Run the workflow and print the events. iterations = 0 async for event in workflow.run_stream(NumberSignal.INIT): - if isinstance(event, ExecutorCompletedEvent) and event.executor_id == guess_number_executor.id: + if isinstance(event, ExecutorCompletedEvent) and event.executor_id == "guess_number": iterations += 1 - elif isinstance(event, WorkflowOutputEvent): - print(f"Final result: {event.data}") print(f"Event: {event}") # This is essentially a binary search, so the number of iterations should be logarithmic. diff --git a/python/samples/getting_started/workflows/control-flow/switch_case_edge_group.py b/python/samples/getting_started/workflows/control-flow/switch_case_edge_group.py index 97e038c8ee..c325d74d7f 100644 --- a/python/samples/getting_started/workflows/control-flow/switch_case_edge_group.py +++ b/python/samples/getting_started/workflows/control-flow/switch_case_edge_group.py @@ -7,10 +7,10 @@ from uuid import uuid4 from agent_framework import ( # Core chat primitives used to form LLM requests - AgentExecutor, # Wraps an agent so it can run inside a workflow AgentExecutorRequest, # Message bundle sent to an AgentExecutor AgentExecutorResponse, # Result returned by an AgentExecutor - Case, # Case entry for a switch-case edge group + Case, + ChatAgent, # Case entry for a switch-case edge group ChatMessage, Default, # Default branch when no cases match Role, @@ -152,51 +152,56 @@ async def handle_uncertain(detection: DetectionResult, ctx: WorkflowContext[Neve raise RuntimeError("This executor should only handle Uncertain messages.") -async def main(): - """Main function to run the workflow.""" - chat_client = AzureOpenAIChatClient(credential=AzureCliCredential()) - - # Agents. response_format enforces that the LLM returns JSON that Pydantic can validate. - spam_detection_agent = AgentExecutor( - chat_client.create_agent( - instructions=( - "You are a spam detection assistant that identifies spam emails. " - "Be less confident in your assessments. " - "Always return JSON with fields 'spam_decision' (one of NotSpam, Spam, Uncertain) " - "and 'reason' (string)." - ), - response_format=DetectionResultAgent, +def create_spam_detection_agent() -> ChatAgent: + """Create and return the spam detection agent.""" + return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent( + instructions=( + "You are a spam detection assistant that identifies spam emails. " + "Be less confident in your assessments. " + "Always return JSON with fields 'spam_decision' (one of NotSpam, Spam, Uncertain) " + "and 'reason' (string)." ), - id="spam_detection_agent", + name="spam_detection_agent", + response_format=DetectionResultAgent, ) - email_assistant_agent = AgentExecutor( - chat_client.create_agent( - instructions=( - "You are an email assistant that helps users draft responses to emails with professionalism." - ), - response_format=EmailResponse, - ), - id="email_assistant_agent", + +def create_email_assistant_agent() -> ChatAgent: + """Create and return the email assistant agent.""" + return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent( + instructions=("You are an email assistant that helps users draft responses to emails with professionalism."), + name="email_assistant_agent", + response_format=EmailResponse, ) + +async def main(): + """Main function to run the workflow.""" # Build workflow: store -> detection agent -> to_detection_result -> switch (NotSpam or Spam or Default). # The switch-case group evaluates cases in order, then falls back to Default when none match. workflow = ( WorkflowBuilder() - .set_start_executor(store_email) - .add_edge(store_email, spam_detection_agent) - .add_edge(spam_detection_agent, to_detection_result) + .register_agent(create_spam_detection_agent, name="spam_detection_agent") + .register_agent(create_email_assistant_agent, name="email_assistant_agent") + .register_executor(lambda: store_email, name="store_email") + .register_executor(lambda: to_detection_result, name="to_detection_result") + .register_executor(lambda: submit_to_email_assistant, name="submit_to_email_assistant") + .register_executor(lambda: finalize_and_send, name="finalize_and_send") + .register_executor(lambda: handle_spam, name="handle_spam") + .register_executor(lambda: handle_uncertain, name="handle_uncertain") + .set_start_executor("store_email") + .add_edge("store_email", "spam_detection_agent") + .add_edge("spam_detection_agent", "to_detection_result") .add_switch_case_edge_group( - to_detection_result, + "to_detection_result", [ - Case(condition=get_case("NotSpam"), target=submit_to_email_assistant), - Case(condition=get_case("Spam"), target=handle_spam), - Default(target=handle_uncertain), + Case(condition=get_case("NotSpam"), target="submit_to_email_assistant"), + Case(condition=get_case("Spam"), target="handle_spam"), + Default(target="handle_uncertain"), ], ) - .add_edge(submit_to_email_assistant, email_assistant_agent) - .add_edge(email_assistant_agent, finalize_and_send) + .add_edge("submit_to_email_assistant", "email_assistant_agent") + .add_edge("email_assistant_agent", "finalize_and_send") .build() ) diff --git a/python/samples/getting_started/workflows/human-in-the-loop/agents_with_approval_requests.py b/python/samples/getting_started/workflows/human-in-the-loop/agents_with_approval_requests.py index a51088e886..6f5370edf8 100644 --- a/python/samples/getting_started/workflows/human-in-the-loop/agents_with_approval_requests.py +++ b/python/samples/getting_started/workflows/human-in-the-loop/agents_with_approval_requests.py @@ -7,6 +7,7 @@ from agent_framework import ( AgentExecutorResponse, + ChatAgent, ChatMessage, Executor, FunctionApprovalRequestContent, @@ -210,10 +211,9 @@ async def conclude_workflow( await ctx.yield_output(email_response.agent_run_response.text) -async def main() -> None: - # Create the agent and executors - chat_client = OpenAIChatClient() - email_writer = chat_client.create_agent( +def create_email_writer_agent() -> ChatAgent: + """Create the Email Writer agent with tools that require approval.""" + return OpenAIChatClient().create_agent( name="Email Writer", instructions=("You are an excellent email assistant. You respond to incoming emails."), # tools with `approval_mode="always_require"` will trigger approval requests @@ -225,14 +225,21 @@ async def main() -> None: get_my_information, ], ) - email_preprocessor = EmailPreprocessor(special_email_addresses={"mike@contoso.com"}) + +async def main() -> None: # Build the workflow workflow = ( WorkflowBuilder() - .set_start_executor(email_preprocessor) - .add_edge(email_preprocessor, email_writer) - .add_edge(email_writer, conclude_workflow) + .register_agent(create_email_writer_agent, name="email_writer") + .register_executor( + lambda: EmailPreprocessor(special_email_addresses={"mike@contoso.com"}), + name="email_preprocessor", + ) + .register_executor(lambda: conclude_workflow, name="conclude_workflow") + .set_start_executor("email_preprocessor") + .add_edge("email_preprocessor", "email_writer") + .add_edge("email_writer", "conclude_workflow") .build() ) diff --git a/python/samples/getting_started/workflows/human-in-the-loop/guessing_game_with_human_input.py b/python/samples/getting_started/workflows/human-in-the-loop/guessing_game_with_human_input.py index 6904edffea..d711861502 100644 --- a/python/samples/getting_started/workflows/human-in-the-loop/guessing_game_with_human_input.py +++ b/python/samples/getting_started/workflows/human-in-the-loop/guessing_game_with_human_input.py @@ -5,7 +5,8 @@ from agent_framework import ( AgentExecutorRequest, # Message bundle sent to an AgentExecutor - AgentExecutorResponse, # Result returned by an AgentExecutor + AgentExecutorResponse, + ChatAgent, # Result returned by an AgentExecutor ChatMessage, # Chat message structure Executor, # Base class for workflow executors RequestInfoEvent, # Event emitted when human input is requested @@ -142,11 +143,9 @@ async def on_human_feedback( await ctx.send_message(AgentExecutorRequest(messages=[user_msg], should_respond=True)) -async def main() -> None: - # Create the chat agent and wrap it in an AgentExecutor. - # response_format enforces that the model produces JSON compatible with GuessOutput. - chat_client = AzureOpenAIChatClient(credential=AzureCliCredential()) - agent = chat_client.create_agent( +def create_guessing_agent() -> ChatAgent: + """Create the guessing agent with instructions to guess a number between 1 and 10.""" + return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent( name="GuessingAgent", instructions=( "You guess a number between 1 and 10. " @@ -154,19 +153,22 @@ async def main() -> None: 'You MUST return ONLY a JSON object exactly matching this schema: {"guess": }. ' "No explanations or additional text." ), - # Structured output enforced via Pydantic model. + # response_format enforces that the model produces JSON compatible with GuessOutput. response_format=GuessOutput, ) - # TurnManager coordinates and gathers human replies while AgentExecutor runs the model. - turn_manager = TurnManager(id="turn_manager") + +async def main() -> None: + """Run the human-in-the-loop guessing game workflow.""" # Build a simple loop: TurnManager <-> AgentExecutor. workflow = ( WorkflowBuilder() - .set_start_executor(turn_manager) - .add_edge(turn_manager, agent) # Ask agent to make/adjust a guess - .add_edge(agent, turn_manager) # Agent's response comes back to coordinator + .register_agent(create_guessing_agent, name="guessing_agent") + .register_executor(lambda: TurnManager(id="turn_manager"), name="turn_manager") + .set_start_executor("turn_manager") + .add_edge("turn_manager", "guessing_agent") # Ask agent to make/adjust a guess + .add_edge("guessing_agent", "turn_manager") # Agent's response comes back to coordinator ).build() # Human in the loop run: alternate between invoking the workflow and supplying collected responses. diff --git a/python/samples/getting_started/workflows/parallelism/aggregate_results_of_different_types.py b/python/samples/getting_started/workflows/parallelism/aggregate_results_of_different_types.py index f718397cbd..f59b1ea0c8 100644 --- a/python/samples/getting_started/workflows/parallelism/aggregate_results_of_different_types.py +++ b/python/samples/getting_started/workflows/parallelism/aggregate_results_of_different_types.py @@ -72,22 +72,20 @@ async def handle(self, results: list[int | float], ctx: WorkflowContext[Never, l async def main() -> None: - # 1) Create the executors - dispatcher = Dispatcher(id="dispatcher") - average = Average(id="average") - summation = Sum(id="summation") - aggregator = Aggregator(id="aggregator") - - # 2) Build a simple fan out and fan in workflow + # 1) Build a simple fan out and fan in workflow workflow = ( WorkflowBuilder() - .set_start_executor(dispatcher) - .add_fan_out_edges(dispatcher, [average, summation]) - .add_fan_in_edges([average, summation], aggregator) + .register_executor(lambda: Dispatcher(id="dispatcher"), name="dispatcher") + .register_executor(lambda: Average(id="average"), name="average") + .register_executor(lambda: Sum(id="summation"), name="summation") + .register_executor(lambda: Aggregator(id="aggregator"), name="aggregator") + .set_start_executor("dispatcher") + .add_fan_out_edges("dispatcher", ["average", "summation"]) + .add_fan_in_edges(["average", "summation"], "aggregator") .build() ) - # 3) Run the workflow + # 2) Run the workflow output: list[int | float] | None = None async for event in workflow.run_stream([random.randint(1, 100) for _ in range(10)]): if isinstance(event, WorkflowOutputEvent): diff --git a/python/samples/getting_started/workflows/parallelism/fan_out_fan_in_edges.py b/python/samples/getting_started/workflows/parallelism/fan_out_fan_in_edges.py index 20a3932275..c6f9cad496 100644 --- a/python/samples/getting_started/workflows/parallelism/fan_out_fan_in_edges.py +++ b/python/samples/getting_started/workflows/parallelism/fan_out_fan_in_edges.py @@ -4,10 +4,10 @@ from dataclasses import dataclass from agent_framework import ( # Core chat primitives to build LLM requests - AgentExecutor, # Wraps an LLM agent for use inside a workflow AgentExecutorRequest, # The message bundle sent to an AgentExecutor AgentExecutorResponse, # The structured result returned by an AgentExecutor - AgentRunEvent, # Tracing event for agent execution steps + AgentRunEvent, + ChatAgent, # Tracing event for agent execution steps ChatMessage, # Chat message structure Executor, # Base class for custom Python executors Role, # Enum of chat roles (user, assistant, system) @@ -16,7 +16,7 @@ WorkflowOutputEvent, # Event emitted when workflow yields output handler, # Decorator to mark an Executor method as invokable ) -from agent_framework.azure import AzureOpenAIChatClient # Client wrapper for Azure OpenAI chat models +from agent_framework.azure import AzureOpenAIChatClient from azure.identity import AzureCliCredential # Uses your az CLI login for credentials from typing_extensions import Never @@ -42,20 +42,11 @@ class DispatchToExperts(Executor): """Dispatches the incoming prompt to all expert agent executors for parallel processing (fan out).""" - def __init__(self, expert_ids: list[str], id: str | None = None): - super().__init__(id=id or "dispatch_to_experts") - self._expert_ids = expert_ids - @handler async def dispatch(self, prompt: str, ctx: WorkflowContext[AgentExecutorRequest]) -> None: # Wrap the incoming prompt as a user message for each expert and request a response. - # Each send_message targets a different AgentExecutor by id so that branches run in parallel. initial_message = ChatMessage(Role.USER, text=prompt) - for expert_id in self._expert_ids: - await ctx.send_message( - AgentExecutorRequest(messages=[initial_message], should_respond=True), - target_id=expert_id, - ) + await ctx.send_message(AgentExecutorRequest(messages=[initial_message], should_respond=True)) @dataclass @@ -70,10 +61,6 @@ class AggregatedInsights: class AggregateInsights(Executor): """Aggregates expert agent responses into a single consolidated result (fan in).""" - def __init__(self, expert_ids: list[str], id: str | None = None): - super().__init__(id=id or "aggregate_insights") - self._expert_ids = expert_ids - @handler async def aggregate(self, results: list[AgentExecutorResponse], ctx: WorkflowContext[Never, str]) -> None: # Map responses to text by executor id for a simple, predictable demo. @@ -104,49 +91,51 @@ async def aggregate(self, results: list[AgentExecutorResponse], ctx: WorkflowCon await ctx.yield_output(consolidated) -async def main() -> None: - # 1) Create agent executors for domain experts - chat_client = AzureOpenAIChatClient(credential=AzureCliCredential()) - - researcher = AgentExecutor( - chat_client.create_agent( - instructions=( - "You're an expert market and product researcher. Given a prompt, provide concise, factual insights," - " opportunities, and risks." - ), +def create_researcher_agent() -> ChatAgent: + """Creates a research domain expert agent.""" + return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent( + instructions=( + "You're an expert market and product researcher. Given a prompt, provide concise, factual insights," + " opportunities, and risks." ), - id="researcher", + name="researcher", ) - marketer = AgentExecutor( - chat_client.create_agent( - instructions=( - "You're a creative marketing strategist. Craft compelling value propositions and target messaging" - " aligned to the prompt." - ), + + +def create_marketer_agent() -> ChatAgent: + """Creates a marketing domain expert agent.""" + return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent( + instructions=( + "You're a creative marketing strategist. Craft compelling value propositions and target messaging" + " aligned to the prompt." ), - id="marketer", + name="marketer", ) - legal = AgentExecutor( - chat_client.create_agent( - instructions=( - "You're a cautious legal/compliance reviewer. Highlight constraints, disclaimers, and policy concerns" - " based on the prompt." - ), + + +def create_legal_agent() -> ChatAgent: + """Creates a legal/compliance domain expert agent.""" + return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent( + instructions=( + "You're a cautious legal/compliance reviewer. Highlight constraints, disclaimers, and policy concerns" + " based on the prompt." ), - id="legal", + name="legal", ) - expert_ids = [researcher.id, marketer.id, legal.id] - dispatcher = DispatchToExperts(expert_ids=expert_ids, id="dispatcher") - aggregator = AggregateInsights(expert_ids=expert_ids, id="aggregator") - - # 2) Build a simple fan out and fan in workflow +async def main() -> None: + # 1) Build a simple fan out and fan in workflow workflow = ( WorkflowBuilder() - .set_start_executor(dispatcher) - .add_fan_out_edges(dispatcher, [researcher, marketer, legal]) # Parallel branches - .add_fan_in_edges([researcher, marketer, legal], aggregator) # Join at the aggregator + .register_agent(create_researcher_agent, name="researcher") + .register_agent(create_marketer_agent, name="marketer") + .register_agent(create_legal_agent, name="legal") + .register_executor(lambda: DispatchToExperts(id="dispatcher"), name="dispatcher") + .register_executor(lambda: AggregateInsights(id="aggregator"), name="aggregator") + .set_start_executor("dispatcher") + .add_fan_out_edges("dispatcher", ["researcher", "marketer", "legal"]) # Parallel branches + .add_fan_in_edges(["researcher", "marketer", "legal"], "aggregator") # Join at the aggregator .build() ) diff --git a/python/samples/getting_started/workflows/parallelism/map_reduce_and_visualization.py b/python/samples/getting_started/workflows/parallelism/map_reduce_and_visualization.py index 035d90e5a9..e443df0354 100644 --- a/python/samples/getting_started/workflows/parallelism/map_reduce_and_visualization.py +++ b/python/samples/getting_started/workflows/parallelism/map_reduce_and_visualization.py @@ -259,27 +259,50 @@ async def complete(self, data: list[ReduceCompleted], ctx: WorkflowContext[Never async def main(): """Construct the map reduce workflow, visualize it, then run it over a sample file.""" - # Step 1: Create the executors. - map_operations = [Map(id=f"map_executor_{i}") for i in range(3)] - split_operation = Split( - [map_operation.id for map_operation in map_operations], - id="split_data_executor", - ) - reduce_operations = [Reduce(id=f"reduce_executor_{i}") for i in range(4)] - shuffle_operation = Shuffle( - [reduce_operation.id for reduce_operation in reduce_operations], - id="shuffle_executor", + + # Step 1: Create the workflow builder and register executors. + workflow_builder = ( + WorkflowBuilder() + .register_executor(lambda: Map(id="map_executor_0"), name="map_executor_0") + .register_executor(lambda: Map(id="map_executor_1"), name="map_executor_1") + .register_executor(lambda: Map(id="map_executor_2"), name="map_executor_2") + .register_executor( + lambda: Split(["map_executor_0", "map_executor_1", "map_executor_2"], id="split_data_executor"), + name="split_data_executor", + ) + .register_executor(lambda: Reduce(id="reduce_executor_0"), name="reduce_executor_0") + .register_executor(lambda: Reduce(id="reduce_executor_1"), name="reduce_executor_1") + .register_executor(lambda: Reduce(id="reduce_executor_2"), name="reduce_executor_2") + .register_executor(lambda: Reduce(id="reduce_executor_3"), name="reduce_executor_3") + .register_executor( + lambda: Shuffle( + ["reduce_executor_0", "reduce_executor_1", "reduce_executor_2", "reduce_executor_3"], + id="shuffle_executor", + ), + name="shuffle_executor", + ) + .register_executor(lambda: CompletionExecutor(id="completion_executor"), name="completion_executor") ) - completion_executor = CompletionExecutor(id="completion_executor") # Step 2: Build the workflow graph using fan out and fan in edges. workflow = ( - WorkflowBuilder() - .set_start_executor(split_operation) - .add_fan_out_edges(split_operation, map_operations) # Split -> many mappers - .add_fan_in_edges(map_operations, shuffle_operation) # All mappers -> shuffle - .add_fan_out_edges(shuffle_operation, reduce_operations) # Shuffle -> many reducers - .add_fan_in_edges(reduce_operations, completion_executor) # All reducers -> completion + workflow_builder.set_start_executor("split_data_executor") + .add_fan_out_edges( + "split_data_executor", + ["map_executor_0", "map_executor_1", "map_executor_2"], + ) # Split -> many mappers + .add_fan_in_edges( + ["map_executor_0", "map_executor_1", "map_executor_2"], + "shuffle_executor", + ) # All mappers -> shuffle + .add_fan_out_edges( + "shuffle_executor", + ["reduce_executor_0", "reduce_executor_1", "reduce_executor_2", "reduce_executor_3"], + ) # Shuffle -> many reducers + .add_fan_in_edges( + ["reduce_executor_0", "reduce_executor_1", "reduce_executor_2", "reduce_executor_3"], + "completion_executor", + ) # All reducers -> completion .build() ) diff --git a/python/samples/getting_started/workflows/state-management/shared_states_with_agents.py b/python/samples/getting_started/workflows/state-management/shared_states_with_agents.py index ea5bcc3195..e9098f996e 100644 --- a/python/samples/getting_started/workflows/state-management/shared_states_with_agents.py +++ b/python/samples/getting_started/workflows/state-management/shared_states_with_agents.py @@ -1,14 +1,15 @@ # Copyright (c) Microsoft. All rights reserved. import asyncio -import os from dataclasses import dataclass +from pathlib import Path from typing import Any from uuid import uuid4 from agent_framework import ( AgentExecutorRequest, AgentExecutorResponse, + ChatAgent, ChatMessage, Role, WorkflowBuilder, @@ -154,28 +155,35 @@ async def handle_spam(detection: DetectionResult, ctx: WorkflowContext[Never, st raise RuntimeError("This executor should only handle spam messages.") -async def main() -> None: - # Create chat client and agents. response_format enforces structured JSON from each agent. - chat_client = AzureOpenAIChatClient(credential=AzureCliCredential()) - - spam_detection_agent = chat_client.create_agent( +def create_spam_detection_agent() -> ChatAgent: + """Creates a spam detection agent.""" + return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent( instructions=( "You are a spam detection assistant that identifies spam emails. " "Always return JSON with fields is_spam (bool) and reason (string)." ), response_format=DetectionResultAgent, + # response_format enforces structured JSON from each agent. name="spam_detection_agent", ) - email_assistant_agent = chat_client.create_agent( + +def create_email_assistant_agent() -> ChatAgent: + """Creates an email assistant agent.""" + return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent( instructions=( "You are an email assistant that helps users draft responses to emails with professionalism. " "Return JSON with a single field 'response' containing the drafted reply." ), + # response_format enforces structured JSON from each agent. response_format=EmailResponse, name="email_assistant_agent", ) + +async def main() -> None: + """Build and run the shared state with agents and conditional routing workflow.""" + # Build the workflow graph with conditional edges. # Flow: # store_email -> spam_detection_agent -> to_detection_result -> branch: @@ -183,25 +191,28 @@ async def main() -> None: # True -> handle_spam workflow = ( WorkflowBuilder() - .set_start_executor(store_email) - .add_edge(store_email, spam_detection_agent) - .add_edge(spam_detection_agent, to_detection_result) - .add_edge(to_detection_result, submit_to_email_assistant, condition=get_condition(False)) - .add_edge(to_detection_result, handle_spam, condition=get_condition(True)) - .add_edge(submit_to_email_assistant, email_assistant_agent) - .add_edge(email_assistant_agent, finalize_and_send) + .register_agent(create_spam_detection_agent, name="spam_detection_agent") + .register_agent(create_email_assistant_agent, name="email_assistant_agent") + .register_executor(lambda: store_email, name="store_email") + .register_executor(lambda: to_detection_result, name="to_detection_result") + .register_executor(lambda: submit_to_email_assistant, name="submit_to_email_assistant") + .register_executor(lambda: finalize_and_send, name="finalize_and_send") + .register_executor(lambda: handle_spam, name="handle_spam") + .set_start_executor("store_email") + .add_edge("store_email", "spam_detection_agent") + .add_edge("spam_detection_agent", "to_detection_result") + .add_edge("to_detection_result", "submit_to_email_assistant", condition=get_condition(False)) + .add_edge("to_detection_result", "handle_spam", condition=get_condition(True)) + .add_edge("submit_to_email_assistant", "email_assistant_agent") + .add_edge("email_assistant_agent", "finalize_and_send") .build() ) # Read an email from resources/spam.txt if available; otherwise use a default sample. - resources_path = os.path.join( - os.path.dirname(os.path.dirname(os.path.realpath(__file__))), - "resources", - "spam.txt", - ) - if os.path.exists(resources_path): - with open(resources_path, encoding="utf-8") as f: # noqa: ASYNC230 - email = f.read() + current_file = Path(__file__) + resources_path = current_file.parent.parent / "resources" / "spam.txt" + if resources_path.exists(): + email = resources_path.read_text(encoding="utf-8") else: print("Unable to find resource file, using default text.") email = "You are a WINNER! Click here for a free lottery offer!!!" diff --git a/python/samples/getting_started/workflows/visualization/concurrent_with_visualization.py b/python/samples/getting_started/workflows/visualization/concurrent_with_visualization.py index c7c66d9342..21a9ff4b08 100644 --- a/python/samples/getting_started/workflows/visualization/concurrent_with_visualization.py +++ b/python/samples/getting_started/workflows/visualization/concurrent_with_visualization.py @@ -4,10 +4,10 @@ from dataclasses import dataclass from agent_framework import ( - AgentExecutor, AgentExecutorRequest, AgentExecutorResponse, AgentRunEvent, + ChatAgent, ChatMessage, Executor, Role, @@ -39,19 +39,11 @@ class DispatchToExperts(Executor): """Dispatches the incoming prompt to all expert agent executors (fan-out).""" - def __init__(self, expert_ids: list[str], id: str | None = None): - super().__init__(id=id or "dispatch_to_experts") - self._expert_ids = expert_ids - @handler async def dispatch(self, prompt: str, ctx: WorkflowContext[AgentExecutorRequest]) -> None: # Wrap the incoming prompt as a user message for each expert and request a response. initial_message = ChatMessage(Role.USER, text=prompt) - for expert_id in self._expert_ids: - await ctx.send_message( - AgentExecutorRequest(messages=[initial_message], should_respond=True), - target_id=expert_id, - ) + await ctx.send_message(AgentExecutorRequest(messages=[initial_message], should_respond=True)) @dataclass @@ -66,10 +58,6 @@ class AggregatedInsights: class AggregateInsights(Executor): """Aggregates expert agent responses into a single consolidated result (fan-in).""" - def __init__(self, expert_ids: list[str], id: str | None = None): - super().__init__(id=id or "aggregate_insights") - self._expert_ids = expert_ids - @handler async def aggregate(self, results: list[AgentExecutorResponse], ctx: WorkflowContext[Never, str]) -> None: # Map responses to text by executor id for a simple, predictable demo. @@ -100,53 +88,57 @@ async def aggregate(self, results: list[AgentExecutorResponse], ctx: WorkflowCon await ctx.yield_output(consolidated) -async def main() -> None: - # 1) Create agent executors for domain experts - chat_client = AzureOpenAIChatClient(credential=AzureCliCredential()) - - researcher = AgentExecutor( - chat_client.create_agent( - instructions=( - "You're an expert market and product researcher. Given a prompt, provide concise, factual insights," - " opportunities, and risks." - ), +def create_researcher_agent() -> ChatAgent: + """Creates a research domain expert agent.""" + return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent( + instructions=( + "You're an expert market and product researcher. Given a prompt, provide concise, factual insights," + " opportunities, and risks." ), - id="researcher", + name="researcher", ) - marketer = AgentExecutor( - chat_client.create_agent( - instructions=( - "You're a creative marketing strategist. Craft compelling value propositions and target messaging" - " aligned to the prompt." - ), + + +def create_marketer_agent() -> ChatAgent: + """Creates a marketing domain expert agent.""" + return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent( + instructions=( + "You're a creative marketing strategist. Craft compelling value propositions and target messaging" + " aligned to the prompt." ), - id="marketer", + name="marketer", ) - legal = AgentExecutor( - chat_client.create_agent( - instructions=( - "You're a cautious legal/compliance reviewer. Highlight constraints, disclaimers, and policy concerns" - " based on the prompt." - ), + + +def create_legal_agent() -> ChatAgent: + """Creates a legal domain expert agent.""" + return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent( + instructions=( + "You're a cautious legal/compliance reviewer. Highlight constraints, disclaimers, and policy concerns" + " based on the prompt." ), - id="legal", + name="legal", ) - expert_ids = [researcher.id, marketer.id, legal.id] - dispatcher = DispatchToExperts(expert_ids=expert_ids, id="dispatcher") - aggregator = AggregateInsights(expert_ids=expert_ids, id="aggregator") +async def main() -> None: + """Build and run the concurrent workflow with visualization.""" - # 2) Build a simple fan-out/fan-in workflow + # 1) Build a simple fan-out/fan-in workflow workflow = ( WorkflowBuilder() - .set_start_executor(dispatcher) - .add_fan_out_edges(dispatcher, [researcher, marketer, legal]) - .add_fan_in_edges([researcher, marketer, legal], aggregator) + .register_agent(create_researcher_agent, name="researcher") + .register_agent(create_marketer_agent, name="marketer") + .register_agent(create_legal_agent, name="legal") + .register_executor(lambda: DispatchToExperts(id="dispatcher"), name="dispatcher") + .register_executor(lambda: AggregateInsights(id="aggregator"), name="aggregator") + .set_start_executor("dispatcher") + .add_fan_out_edges("dispatcher", ["researcher", "marketer", "legal"]) + .add_fan_in_edges(["researcher", "marketer", "legal"], "aggregator") .build() ) - # 2.5) Generate workflow visualization + # 1.5) Generate workflow visualization print("Generating workflow visualization...") viz = WorkflowViz(workflow) # Print out the mermaid string. @@ -162,7 +154,7 @@ async def main() -> None: svg_file = viz.export(format="svg") print(f"SVG file saved to: {svg_file}") - # 3) Run with a single prompt + # 2) Run with a single prompt async for event in workflow.run_stream("We are launching a new budget-friendly electric bike for urban commuters."): if isinstance(event, AgentRunEvent): # Show which agent ran and what step completed.