Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 24 additions & 46 deletions python/packages/core/agent_framework/_workflows/_concurrent.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import asyncio
import inspect
import logging
import uuid
from collections.abc import Callable, Sequence
from typing import Any

Expand All @@ -30,7 +29,8 @@
- a default aggregator that combines all agent conversations and completes the workflow

Notes:
- Participants should be AgentProtocol instances or Executors.
- Participants can be provided as AgentProtocol or Executor instances via `.participants()`,
or as factories returning AgentProtocol or Executor via `.register_participants()`.
- A custom aggregator can be provided as:
- an Executor instance (it should handle list[AgentExecutorResponse],
yield output), or
Expand Down Expand Up @@ -396,7 +396,7 @@ def with_aggregator(
| Callable[[list[AgentExecutorResponse]], Any]
| Callable[[list[AgentExecutorResponse], WorkflowContext[Never, Any]], Any],
) -> "ConcurrentBuilder":
r"""Override the default aggregator with an executor, an executor factory, or a callback.
r"""Override the default aggregator with an executor or a callback.

- Executor: must handle `list[AgentExecutorResponse]` and yield output using `ctx.yield_output(...)`
- Callback: sync or async callable with one of the signatures:
Expand Down Expand Up @@ -521,52 +521,30 @@ def build(self) -> Workflow:
)
)

builder = WorkflowBuilder()
participants: list[Executor | AgentProtocol] = []
if self._participant_factories:
# Register executors/agents to avoid warnings from the workflow builder
# if factories are provided instead of direct instances. This doesn't
# break the factory pattern since the concurrent builder still creates
# new instances per workflow build.
factory_names: list[str] = []
# Resolve the participant factories now. This doesn't break the factory pattern
# since the Concurrent builder still creates new instances per workflow build.
for factory in self._participant_factories:
factory_name = uuid.uuid4().hex
factory_names.append(factory_name)
instance = factory()
if isinstance(instance, Executor):
builder.register_executor(lambda executor=instance: executor, name=factory_name) # type: ignore[misc]
else:
builder.register_agent(lambda agent=instance: agent, name=factory_name) # type: ignore[misc]
# Register the dispatcher and the aggregator
builder.register_executor(lambda: dispatcher, name="dispatcher")
builder.register_executor(lambda: aggregator, name="aggregator")

builder.set_start_executor("dispatcher")
builder.add_fan_out_edges("dispatcher", factory_names)
if self._request_info_enabled:
# Insert interceptor between fan-in and aggregator
# participants -> fan-in -> interceptor -> aggregator
builder.register_executor(
lambda: RequestInfoInterceptor(executor_id="request_info"),
name="request_info_interceptor",
)
builder.add_fan_in_edges(factory_names, "request_info_interceptor")
builder.add_edge("request_info_interceptor", "aggregator")
else:
# Direct fan-in to aggregator
builder.add_fan_in_edges(factory_names, "aggregator")
p = factory()
participants.append(p)
else:
builder.set_start_executor(dispatcher)
builder.add_fan_out_edges(dispatcher, self._participants)

if self._request_info_enabled:
# Insert interceptor between fan-in and aggregator
# participants -> fan-in -> interceptor -> aggregator
request_info_interceptor = RequestInfoInterceptor(executor_id="request_info")
builder.add_fan_in_edges(self._participants, request_info_interceptor)
builder.add_edge(request_info_interceptor, aggregator)
else:
# Direct fan-in to aggregator
builder.add_fan_in_edges(self._participants, aggregator)
participants = self._participants

builder = WorkflowBuilder()
builder.set_start_executor(dispatcher)
builder.add_fan_out_edges(dispatcher, participants)

if self._request_info_enabled:
# Insert interceptor between fan-in and aggregator
# participants -> fan-in -> interceptor -> aggregator
request_info_interceptor = RequestInfoInterceptor(executor_id="request_info")
builder.add_fan_in_edges(participants, request_info_interceptor)
builder.add_edge(request_info_interceptor, aggregator)
else:
# Direct fan-in to aggregator
builder.add_fan_in_edges(participants, aggregator)

if self._checkpoint_storage is not None:
builder = builder.with_checkpointing(self._checkpoint_storage)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

This module provides a high-level, agent-focused API to assemble a sequential
workflow where:
- Participants are a sequence of AgentProtocol instances or Executors
- Participants can be provided as AgentProtocol or Executor instances via `.participants()`,
or as factories returning AgentProtocol or Executor via `.register_participants()`
- A shared conversation context (list[ChatMessage]) is passed along the chain
- Agents append their assistant messages to the context
- Custom executors can transform or summarize and return a refined context
Expand All @@ -15,7 +16,7 @@

Notes:
- Participants can mix AgentProtocol and Executor objects
- Agents are auto-wrapped by WorkflowBuilder as AgentExecutor
- Agents are auto-wrapped by WorkflowBuilder as AgentExecutor (unless already wrapped)
- AgentExecutor produces AgentExecutorResponse; _ResponseToConversation converts this to list[ChatMessage]
- Non-agent executors must define a handler that consumes `list[ChatMessage]` and sends back
the updated `list[ChatMessage]` via their workflow context
Expand Down Expand Up @@ -252,7 +253,7 @@ def build(self) -> Workflow:
if not self._participants and not self._participant_factories:
raise ValueError(
"No participants or participant factories provided to the builder. "
"Use .participants([...]) or .ss([...])."
"Use .participants([...]) or .register_participants([...])."
)

if self._participants and self._participant_factories:
Expand All @@ -273,6 +274,8 @@ def build(self) -> Workflow:

participants: list[Executor | AgentProtocol] = []
if self._participant_factories:
# Resolve the participant factories now. This doesn't break the factory pattern
# since the Sequential builder still creates new instances per workflow build.
for factory in self._participant_factories:
p = factory()
participants.append(p)
Expand Down
Loading
Loading