Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 60 additions & 18 deletions python/packages/core/agent_framework/_workflows/_sequential.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
""" # noqa: E501

import logging
from collections.abc import Sequence
from collections.abc import Callable, Sequence
from typing import Any

from agent_framework import AgentProtocol, ChatMessage
Expand Down Expand Up @@ -72,11 +72,7 @@ async def from_message(self, message: ChatMessage, ctx: WorkflowContext[list[Cha
await ctx.send_message(normalize_messages_input(message))

@handler
async def from_messages(
self,
messages: list[str | ChatMessage],
ctx: WorkflowContext[list[ChatMessage]],
) -> None:
async def from_messages(self, messages: list[str | ChatMessage], ctx: WorkflowContext[list[ChatMessage]]) -> None:
await ctx.send_message(normalize_messages_input(messages))


Expand All @@ -102,7 +98,10 @@ async def end(self, conversation: list[ChatMessage], ctx: WorkflowContext[Any, l
class SequentialBuilder:
r"""High-level builder for sequential agent/executor workflows with shared context.

- `participants([...])` accepts a list of AgentProtocol (recommended) or Executor
- `participants([...])` accepts a list of AgentProtocol (recommended) or Executor instances
- `register_participants([...])` accepts a list of factories for AgentProtocol (recommended)
or Executor factories
- Executors must define a handler that consumes list[ChatMessage] and sends out a list[ChatMessage]
- The workflow wires participants in order, passing a list[ChatMessage] down the chain
- Agents append their assistant messages to the conversation
- Custom executors can transform/summarize and return a list[ChatMessage]
Expand All @@ -114,8 +113,14 @@ class SequentialBuilder:

from agent_framework import SequentialBuilder

# With agent instances
workflow = SequentialBuilder().participants([agent1, agent2, summarizer_exec]).build()

# With agent factories
workflow = (
SequentialBuilder().register_participants([create_agent1, create_agent2, create_summarizer_exec]).build()
)

# Enable checkpoint persistence
workflow = SequentialBuilder().participants([agent1, agent2]).with_checkpointing(storage).build()

Expand All @@ -133,16 +138,38 @@ class SequentialBuilder:

def __init__(self) -> None:
self._participants: list[AgentProtocol | Executor] = []
self._participant_factories: list[Callable[[], AgentProtocol | Executor]] = []
self._checkpoint_storage: CheckpointStorage | None = None
self._request_info_enabled: bool = False
self._request_info_filter: set[str] | None = None

def register_participants(
self,
participant_factories: Sequence[Callable[[], AgentProtocol | Executor]],
) -> "SequentialBuilder":
"""Register participant factories for this sequential workflow."""
if self._participants:
raise ValueError(
"Cannot mix .participants([...]) and .register_participants() in the same builder instance."
)

if not participant_factories:
raise ValueError("participant_factories cannot be empty")

self._participant_factories = list(participant_factories)
return self

def participants(self, participants: Sequence[AgentProtocol | Executor]) -> "SequentialBuilder":
"""Define the ordered participants for this sequential workflow.

Accepts AgentProtocol instances (auto-wrapped as AgentExecutor) or Executor instances.
Raises if empty or duplicates are provided for clarity.
"""
if self._participant_factories:
raise ValueError(
"Cannot mix .participants([...]) and .register_participants() in the same builder instance."
)

if not participants:
raise ValueError("participants cannot be empty")

Expand Down Expand Up @@ -217,13 +244,22 @@ def build(self) -> Workflow:
- _InputToConversation normalizes the initial input into list[ChatMessage]
- For each participant in order:
- If Agent (or AgentExecutor): pass conversation to the agent, then optionally
route through human input interceptor, then convert response to conversation
route through a request info interceptor, then convert response to conversation
via _ResponseToConversation
- Else (custom Executor): pass conversation directly to the executor
- _EndWithConversation yields the final conversation and the workflow becomes idle
"""
if not self._participants:
raise ValueError("No participants provided. Call .participants([...]) first.")
if not self._participants and not self._participant_factories:
raise ValueError(
"No participants or participant factories provided to the builder. "
"Use .participants([...]) or .ss([...])."
)

if self._participants and self._participant_factories:
# Defensive strategy: this should never happen due to checks in respective methods
raise ValueError(
"Cannot mix .participants([...]) and .register_participants() in the same builder instance."
)

# Internal nodes
input_conv = _InputToConversation(id="input-conversation")
Expand All @@ -235,13 +271,17 @@ def build(self) -> Workflow:
# Start of the chain is the input normalizer
prior: Executor | AgentProtocol = input_conv

for p in self._participants:
# Agent-like branch: either explicitly an AgentExecutor or any non-AgentExecutor
if not (isinstance(p, Executor) and not isinstance(p, AgentExecutor)):
# input conversation -> [human_input_interceptor] -> (agent) -> response -> conversation
label: str
label = p.id if isinstance(p, Executor) else getattr(p, "name", None) or p.__class__.__name__
resp_to_conv = _ResponseToConversation(id=f"to-conversation:{label}")
participants: list[Executor | AgentProtocol] = []
if self._participant_factories:
for factory in self._participant_factories:
p = factory()
participants.append(p)
else:
participants = self._participants

for p in participants:
if isinstance(p, (AgentProtocol, AgentExecutor)):
label = p.id if isinstance(p, AgentExecutor) else p.display_name

if self._request_info_enabled:
# Insert request info interceptor BEFORE the agent
Expand All @@ -254,13 +294,15 @@ def build(self) -> Workflow:
else:
builder.add_edge(prior, p)

resp_to_conv = _ResponseToConversation(id=f"to-conversation:{label}")
builder.add_edge(p, resp_to_conv)
prior = resp_to_conv
elif isinstance(p, Executor):
# Custom executor operates on list[ChatMessage]
# If the executor doesn't handle list[ChatMessage] correctly, validation will fail
builder.add_edge(prior, p)
prior = p
else: # pragma: no cover - defensive
else:
raise TypeError(f"Unsupported participant type: {type(p).__name__}")

# Terminate with the final conversation
Expand Down
62 changes: 0 additions & 62 deletions python/packages/core/agent_framework/_workflows/_typing_utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# Copyright (c) Microsoft. All rights reserved.

import logging
from dataclasses import fields, is_dataclass
from types import UnionType
from typing import Any, TypeVar, Union, cast, get_args, get_origin

Expand All @@ -10,67 +9,6 @@
T = TypeVar("T")


def _coerce_to_type(value: Any, target_type: type[T]) -> T | None:
"""Best-effort conversion of value into target_type.

Args:
value: The value to convert (can be dict, dataclass, or object with __dict__)
target_type: The target type to convert to

Returns:
Instance of target_type if conversion succeeds, None otherwise
"""
if isinstance(value, target_type):
return value # type: ignore[return-value]

# Convert dataclass instances or objects with __dict__ into dict first
value_as_dict: dict[str, Any]
if not isinstance(value, dict):
if is_dataclass(value):
value_as_dict = {f.name: getattr(value, f.name) for f in fields(value)}
else:
value_dict = getattr(value, "__dict__", None)
if isinstance(value_dict, dict):
value_as_dict = cast(dict[str, Any], value_dict)
else:
return None
else:
value_as_dict = cast(dict[str, Any], value)

# Try to construct the target type from the dict
ctor_kwargs: dict[str, Any] = dict(value_as_dict)

if is_dataclass(target_type):
field_names = {f.name for f in fields(target_type)}
ctor_kwargs = {k: v for k, v in value_as_dict.items() if k in field_names}

try:
return target_type(**ctor_kwargs) # type: ignore[call-arg,return-value]
except TypeError as exc:
logger.debug(f"_coerce_to_type could not call {target_type.__name__}(**..): {exc}")
except Exception as exc: # pragma: no cover - unexpected constructor failure
logger.warning(
f"_coerce_to_type encountered unexpected error calling {target_type.__name__} constructor: {exc}"
)

# Fallback: try to create instance without __init__ and set attributes
try:
instance = object.__new__(target_type)
except Exception as exc: # pragma: no cover - pathological type
logger.debug(f"_coerce_to_type could not allocate {target_type.__name__} without __init__: {exc}")
return None

for key, val in value_as_dict.items():
try:
setattr(instance, key, val)
except Exception as exc:
logger.debug(
f"_coerce_to_type could not set {target_type.__name__}.{key} during fallback assignment: {exc}"
)
continue
return instance # type: ignore[return-value]


def is_instance_of(data: Any, target_type: type | UnionType | Any) -> bool:
"""Check if the data is an instance of the target type.

Expand Down
Loading
Loading