diff --git a/python/packages/core/agent_framework/_workflows/_concurrent.py b/python/packages/core/agent_framework/_workflows/_concurrent.py index a6fcaa1a3e..2900254126 100644 --- a/python/packages/core/agent_framework/_workflows/_concurrent.py +++ b/python/packages/core/agent_framework/_workflows/_concurrent.py @@ -3,7 +3,6 @@ import asyncio import inspect import logging -import uuid from collections.abc import Callable, Sequence from typing import Any @@ -30,7 +29,8 @@ - a default aggregator that combines all agent conversations and completes the workflow Notes: -- Participants should be AgentProtocol instances or Executors. +- Participants can be provided as AgentProtocol or Executor instances via `.participants()`, + or as factories returning AgentProtocol or Executor via `.register_participants()`. - A custom aggregator can be provided as: - an Executor instance (it should handle list[AgentExecutorResponse], yield output), or @@ -396,7 +396,7 @@ def with_aggregator( | Callable[[list[AgentExecutorResponse]], Any] | Callable[[list[AgentExecutorResponse], WorkflowContext[Never, Any]], Any], ) -> "ConcurrentBuilder": - r"""Override the default aggregator with an executor, an executor factory, or a callback. + r"""Override the default aggregator with an executor or a callback. - Executor: must handle `list[AgentExecutorResponse]` and yield output using `ctx.yield_output(...)` - Callback: sync or async callable with one of the signatures: @@ -521,52 +521,30 @@ def build(self) -> Workflow: ) ) - builder = WorkflowBuilder() + participants: list[Executor | AgentProtocol] = [] if self._participant_factories: - # Register executors/agents to avoid warnings from the workflow builder - # if factories are provided instead of direct instances. This doesn't - # break the factory pattern since the concurrent builder still creates - # new instances per workflow build. - factory_names: list[str] = [] + # Resolve the participant factories now. This doesn't break the factory pattern + # since the Concurrent builder still creates new instances per workflow build. for factory in self._participant_factories: - factory_name = uuid.uuid4().hex - factory_names.append(factory_name) - instance = factory() - if isinstance(instance, Executor): - builder.register_executor(lambda executor=instance: executor, name=factory_name) # type: ignore[misc] - else: - builder.register_agent(lambda agent=instance: agent, name=factory_name) # type: ignore[misc] - # Register the dispatcher and the aggregator - builder.register_executor(lambda: dispatcher, name="dispatcher") - builder.register_executor(lambda: aggregator, name="aggregator") - - builder.set_start_executor("dispatcher") - builder.add_fan_out_edges("dispatcher", factory_names) - if self._request_info_enabled: - # Insert interceptor between fan-in and aggregator - # participants -> fan-in -> interceptor -> aggregator - builder.register_executor( - lambda: RequestInfoInterceptor(executor_id="request_info"), - name="request_info_interceptor", - ) - builder.add_fan_in_edges(factory_names, "request_info_interceptor") - builder.add_edge("request_info_interceptor", "aggregator") - else: - # Direct fan-in to aggregator - builder.add_fan_in_edges(factory_names, "aggregator") + p = factory() + participants.append(p) else: - builder.set_start_executor(dispatcher) - builder.add_fan_out_edges(dispatcher, self._participants) - - if self._request_info_enabled: - # Insert interceptor between fan-in and aggregator - # participants -> fan-in -> interceptor -> aggregator - request_info_interceptor = RequestInfoInterceptor(executor_id="request_info") - builder.add_fan_in_edges(self._participants, request_info_interceptor) - builder.add_edge(request_info_interceptor, aggregator) - else: - # Direct fan-in to aggregator - builder.add_fan_in_edges(self._participants, aggregator) + participants = self._participants + + builder = WorkflowBuilder() + builder.set_start_executor(dispatcher) + builder.add_fan_out_edges(dispatcher, participants) + + if self._request_info_enabled: + # Insert interceptor between fan-in and aggregator + # participants -> fan-in -> interceptor -> aggregator + request_info_interceptor = RequestInfoInterceptor(executor_id="request_info") + builder.add_fan_in_edges(participants, request_info_interceptor) + builder.add_edge(request_info_interceptor, aggregator) + else: + # Direct fan-in to aggregator + builder.add_fan_in_edges(participants, aggregator) + if self._checkpoint_storage is not None: builder = builder.with_checkpointing(self._checkpoint_storage) diff --git a/python/packages/core/agent_framework/_workflows/_sequential.py b/python/packages/core/agent_framework/_workflows/_sequential.py index b6a49ecab8..0f849926b6 100644 --- a/python/packages/core/agent_framework/_workflows/_sequential.py +++ b/python/packages/core/agent_framework/_workflows/_sequential.py @@ -4,7 +4,8 @@ This module provides a high-level, agent-focused API to assemble a sequential workflow where: -- Participants are a sequence of AgentProtocol instances or Executors +- Participants can be provided as AgentProtocol or Executor instances via `.participants()`, + or as factories returning AgentProtocol or Executor via `.register_participants()` - A shared conversation context (list[ChatMessage]) is passed along the chain - Agents append their assistant messages to the context - Custom executors can transform or summarize and return a refined context @@ -15,7 +16,7 @@ Notes: - Participants can mix AgentProtocol and Executor objects -- Agents are auto-wrapped by WorkflowBuilder as AgentExecutor +- Agents are auto-wrapped by WorkflowBuilder as AgentExecutor (unless already wrapped) - AgentExecutor produces AgentExecutorResponse; _ResponseToConversation converts this to list[ChatMessage] - Non-agent executors must define a handler that consumes `list[ChatMessage]` and sends back the updated `list[ChatMessage]` via their workflow context @@ -252,7 +253,7 @@ def build(self) -> Workflow: if not self._participants and not self._participant_factories: raise ValueError( "No participants or participant factories provided to the builder. " - "Use .participants([...]) or .ss([...])." + "Use .participants([...]) or .register_participants([...])." ) if self._participants and self._participant_factories: @@ -273,6 +274,8 @@ def build(self) -> Workflow: participants: list[Executor | AgentProtocol] = [] if self._participant_factories: + # Resolve the participant factories now. This doesn't break the factory pattern + # since the Sequential builder still creates new instances per workflow build. for factory in self._participant_factories: p = factory() participants.append(p) diff --git a/python/packages/core/agent_framework/_workflows/_workflow_builder.py b/python/packages/core/agent_framework/_workflows/_workflow_builder.py index 5bf36b6ccd..60c959823f 100644 --- a/python/packages/core/agent_framework/_workflows/_workflow_builder.py +++ b/python/packages/core/agent_framework/_workflows/_workflow_builder.py @@ -302,7 +302,8 @@ async def process(self, text: str, ctx: WorkflowContext[Never, str]) -> None: If multiple names are provided, the same factory function will be registered under each name. - ...code-block:: python + .. code-block:: python + from agent_framework import WorkflowBuilder, Executor, WorkflowContext, handler @@ -315,7 +316,7 @@ async def log(self, message: str, ctx: WorkflowContext) -> None: # Register the same executor factory under multiple names workflow = ( WorkflowBuilder() - .register_executor(lambda: CustomExecutor(id="logger"), name=["ExecutorA", "ExecutorB"]) + .register_executor(lambda: LoggerExecutor(id="logger"), name=["ExecutorA", "ExecutorB"]) .set_start_executor("ExecutorA") .add_edge("ExecutorA", "ExecutorB") .build() @@ -456,7 +457,12 @@ def add_edge( source: The source executor or registered name of the source factory for the edge. target: The target executor or registered name of the target factory for the edge. condition: An optional condition function that determines whether the edge - should be traversed based on the message type. + should be traversed based on the message. + + Note: If instances are provided for both source and target, they will be shared across + all workflow instances created from the built Workflow. To avoid this, consider + registering the executors and agents using `register_executor` and `register_agent` + and referencing them by factory name for lazy initialization instead. Returns: Self: The WorkflowBuilder instance for method chaining. @@ -505,17 +511,13 @@ def only_large_numbers(msg: int) -> bool: .build() ) """ - if not isinstance(source, str) or not isinstance(target, str): - logger.warning( - "Adding an edge with Executor or AgentProtocol instances directly is not recommended, " - "because workflow instances created from the builder will share the same executor/agent instances. " - "Consider using a registered name for lazy initialization instead." - ) - if (isinstance(source, str) and not isinstance(target, str)) or ( not isinstance(source, str) and isinstance(target, str) ): - raise ValueError("Both source and target must be either names (str) or Executor/AgentProtocol instances.") + raise ValueError( + "Both source and target must be either registered factory names (str) " + "or Executor/AgentProtocol instances." + ) if isinstance(source, str) and isinstance(target, str): # Both are names; defer resolution to build time @@ -547,6 +549,11 @@ def add_fan_out_edges( Returns: Self: The WorkflowBuilder instance for method chaining. + Note: If instances are provided for source and targets, they will be shared across + all workflow instances created from the built Workflow. To avoid this, consider + registering the executors and agents using `register_executor` and `register_agent` + and referencing them by factory name for lazy initialization instead. + Example: .. code-block:: python @@ -583,17 +590,13 @@ async def validate(self, data: str, ctx: WorkflowContext) -> None: .build() ) """ - if not isinstance(source, str) or any(not isinstance(t, str) for t in targets): - logger.warning( - "Adding fan-out edges with Executor or AgentProtocol instances directly is not recommended, " - "because workflow instances created from the builder will share the same executor/agent instances. " - "Consider using registered names for lazy initialization instead." - ) - if (isinstance(source, str) and not all(isinstance(t, str) for t in targets)) or ( not isinstance(source, str) and any(isinstance(t, str) for t in targets) ): - raise ValueError("Both source and targets must be either names (str) or Executor/AgentProtocol instances.") + raise ValueError( + "Both source and targets must be either registered factory names (str) " + "or Executor/AgentProtocol instances." + ) if isinstance(source, str) and all(isinstance(t, str) for t in targets): # Both are names; defer resolution to build time @@ -624,7 +627,7 @@ def add_switch_case_edge_group( Each condition function will be evaluated in order, and the first one that returns True will determine which target executor receives the message. - The last case (the default case) will receive messages that fall through all conditions + The default case (if provided) will receive messages that fall through all conditions (i.e., no condition matched). Args: @@ -634,6 +637,11 @@ def add_switch_case_edge_group( Returns: Self: The WorkflowBuilder instance for method chaining. + Note: If instances are provided for source and case targets, they will be shared across + all workflow instances created from the built Workflow. To avoid this, consider + registering the executors and agents using `register_executor` and `register_agent` + and referencing them by factory name for lazy initialization instead. + Example: .. code-block:: python @@ -681,18 +689,12 @@ async def handle(self, result: Result, ctx: WorkflowContext) -> None: .build() ) """ - if not isinstance(source, str) or not all(isinstance(case.target, str) for case in cases): - logger.warning( - "Adding a switch-case edge group with Executor or AgentProtocol instances directly is not recommended, " - "because workflow instances created from the builder will share the same executor/agent instance. " - "Consider using a registered name for lazy initialization instead." - ) - if (isinstance(source, str) and not all(isinstance(case.target, str) for case in cases)) or ( not isinstance(source, str) and any(isinstance(case.target, str) for case in cases) ): raise ValueError( - "Both source and case targets must be either names (str) or Executor/AgentProtocol instances." + "Both source and case targets must be either registered factory names (str) " + "or Executor/AgentProtocol instances." ) if isinstance(source, str) and all(isinstance(case.target, str) for case in cases): @@ -736,12 +738,16 @@ def add_multi_selection_edge_group( source: The source executor or registered name of the source factory for the edge group. targets: A list of target executors or registered names of the target factories for the edges. selection_func: A function that selects target executors for messages. - Takes (message, list[executor_id or registered target names]) and - returns list[executor_id or registered target names]. + Takes (message, list[executor_id]) and returns list[executor_id]. Returns: Self: The WorkflowBuilder instance for method chaining. + Note: If instances are provided for source and targets, they will be shared across + all workflow instances created from the built Workflow. To avoid this, consider + registering the executors and agents using `register_executor` and `register_agent` + and referencing them by factory name for lazy initialization instead. + Example: .. code-block:: python @@ -795,17 +801,13 @@ def select_workers(task: Task, available: list[str]) -> list[str]: .build() ) """ - if not isinstance(source, str) or any(not isinstance(t, str) for t in targets): - logger.warning( - "Adding fan-out edges with Executor or AgentProtocol instances directly is not recommended, " - "because workflow instances created from the builder will share the same executor/agent instances. " - "Consider using registered names for lazy initialization instead." - ) - if (isinstance(source, str) and not all(isinstance(t, str) for t in targets)) or ( not isinstance(source, str) and any(isinstance(t, str) for t in targets) ): - raise ValueError("Both source and targets must be either names (str) or Executor/AgentProtocol instances.") + raise ValueError( + "Both source and targets must be either registered factory names (str) " + "or Executor/AgentProtocol instances." + ) if isinstance(source, str) and all(isinstance(t, str) for t in targets): # Both are names; defer resolution to build time @@ -848,6 +850,11 @@ def add_fan_in_edges( Returns: Self: The WorkflowBuilder instance for method chaining. + Note: If instances are provided for sources and target, they will be shared across + all workflow instances created from the built Workflow. To avoid this, consider + registering the executors and agents using `register_executor` and `register_agent` + and referencing them by factory name for lazy initialization instead. + Example: .. code-block:: python @@ -879,17 +886,13 @@ async def aggregate(self, results: list[str], ctx: WorkflowContext[Never, str]) .build() ) """ - if not all(isinstance(s, str) for s in sources) or not isinstance(target, str): - logger.warning( - "Adding fan-in edges with Executor or AgentProtocol instances directly is not recommended, " - "because workflow instances created from the builder will share the same executor/agent instances. " - "Consider using registered names for lazy initialization instead." - ) - if (all(isinstance(s, str) for s in sources) and not isinstance(target, str)) or ( not all(isinstance(s, str) for s in sources) and isinstance(target, str) ): - raise ValueError("Both sources and target must be either names (str) or Executor/AgentProtocol instances.") + raise ValueError( + "Both sources and target must be either registered factory names (str) " + "or Executor/AgentProtocol instances." + ) if all(isinstance(s, str) for s in sources) and isinstance(target, str): # Both are names; defer resolution to build time @@ -911,7 +914,7 @@ def add_chain(self, executors: Sequence[Executor | AgentProtocol | str]) -> Self The output of each executor in the chain will be sent to the next executor in the chain. The input types of each executor must be compatible with the output types of the previous executor. - Circles in the chain are not allowed, meaning the chain cannot have two executors with the same ID. + Cycles in the chain are not allowed, meaning an executor cannot appear more than once in the chain. Args: executors: A list of executors or registered names of the executor factories to chain together. @@ -919,6 +922,11 @@ def add_chain(self, executors: Sequence[Executor | AgentProtocol | str]) -> Self Returns: Self: The WorkflowBuilder instance for method chaining. + Note: If executor instances are provided, they will be shared across all workflow instances created + from the built Workflow. To avoid this, consider registering the executors and agents using + `register_executor` and `register_agent` and referencing them by factory name for lazy + initialization instead. + Example: .. code-block:: python @@ -958,16 +966,10 @@ async def process(self, text: str, ctx: WorkflowContext[Never, str]) -> None: if len(executors) < 2: raise ValueError("At least two executors are required to form a chain.") - if not all(isinstance(e, str) for e in executors): - logger.warning( - "Adding a chain with Executor or AgentProtocol instances directly is not recommended, " - "because workflow instances created from the builder will share the same executor/agent instances. " - "Consider using registered names for lazy initialization instead." - ) - if not all(isinstance(e, str) for e in executors) and any(isinstance(e, str) for e in executors): raise ValueError( - "All executors in the chain must be either names (str) or Executor/AgentProtocol instances." + "All executors in the chain must be either registered factory names (str) " + "or Executor/AgentProtocol instances." ) if all(isinstance(e, str) for e in executors): @@ -976,7 +978,7 @@ async def process(self, text: str, ctx: WorkflowContext[Never, str]) -> None: self.add_edge(executors[i], executors[i + 1]) return self - # Both are Executor/AgentProtocol instances; wrap and add now + # All are Executor/AgentProtocol instances; wrap and add now # Wrap each candidate first to ensure stable IDs before adding edges wrapped: list[Executor] = [self._maybe_wrap_agent(e) for e in executors] # type: ignore[arg-type] for i in range(len(wrapped) - 1): diff --git a/python/packages/core/tests/workflow/test_concurrent.py b/python/packages/core/tests/workflow/test_concurrent.py index 66cc8cfc68..57810b8f59 100644 --- a/python/packages/core/tests/workflow/test_concurrent.py +++ b/python/packages/core/tests/workflow/test_concurrent.py @@ -63,7 +63,7 @@ def create_dup2() -> Executor: return _FakeAgentExec("dup", "B") # same executor id builder = ConcurrentBuilder().register_participants([create_dup1, create_dup2]) - with pytest.raises(ValueError, match="Executor with ID 'dup' has already been created."): + with pytest.raises(ValueError, match="Duplicate executor ID 'dup' detected in workflow."): builder.build() diff --git a/python/packages/core/tests/workflow/test_workflow_builder.py b/python/packages/core/tests/workflow/test_workflow_builder.py index b281edee34..83c9d41c22 100644 --- a/python/packages/core/tests/workflow/test_workflow_builder.py +++ b/python/packages/core/tests/workflow/test_workflow_builder.py @@ -497,7 +497,13 @@ def test_mixing_eager_and_lazy_initialization_error(): builder.register_executor(lambda: MockExecutor(id="Lazy"), name="Lazy") # Mixing eager and lazy should raise an error during add_edge - with pytest.raises(ValueError, match="Both source and target must be either names"): + with pytest.raises( + ValueError, + match=( + r"Both source and target must be either registered factory names \(str\) " + r"or Executor/AgentProtocol instances\." + ), + ): builder.add_edge(eager_executor, "Lazy")