Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
5a5133c
Fix orchestration outputs so as_agent() returns the final answer only…
moonbox3 Apr 16, 2026
240e307
Merge remote-tracking branch 'upstream/main' into improve-orchestrati…
Apr 16, 2026
5080f57
Fix orchestration output issues from review comments
Apr 16, 2026
3af110e
Add tests for sequential workflow with_request_info and intermediate_…
Apr 16, 2026
100093e
Fix pyright type errors from AgentResponse output refactor (#5301)
Apr 16, 2026
675afe0
Fix pyright reportUnknownVariableType in _agent.py (#5301)
Apr 16, 2026
9d37c08
Fix pyright reportMissingImports for orjson in file history samples (…
Apr 16, 2026
28cf71f
Address review feedback for #5301: review comment fixes
Apr 16, 2026
e3057e1
Merge remote-tracking branch 'upstream/main' into improve-orchestrati…
Apr 16, 2026
09a12fe
Address review feedback for #5301: review comment fixes
Apr 16, 2026
cec1993
Revert sequential_workflow_as_agent sample to FoundryChatClient
Apr 16, 2026
12fe4bb
Address ultrareview feedback: emit_data_events rename + WorkflowAgent…
moonbox3 Apr 17, 2026
b6a6d60
Fix pyright: widen event.data to Any to avoid partial-unknown narrowing
moonbox3 Apr 17, 2026
ddfe656
Merge branch 'main' into improve-orchestration-outputs
moonbox3 Apr 20, 2026
f2e4d53
Clean up design
moonbox3 Apr 23, 2026
1a4c975
Scope to agent output semantics only
moonbox3 Apr 28, 2026
96cc455
yield AgentResponseUpdate streaming, AgentResponse non-streaming
moonbox3 Apr 28, 2026
e13fbfe
Merge remote-tracking branch 'upstream/main' into improve-orchestrati…
moonbox3 Apr 28, 2026
d97a693
Fix mypy/pyright: widen cast types at GroupChat callsites
moonbox3 Apr 28, 2026
85c6b78
Merge branch 'main' into improve-orchestration-outputs
moonbox3 Apr 28, 2026
ec2f085
Python: skip flaky Foundry / Foundry Hosting integration tests (#5553)
moonbox3 Apr 29, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions python/packages/core/agent_framework/_workflows/_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,9 @@ async def _run_impl(
function_invocation_kwargs=function_invocation_kwargs,
client_kwargs=client_kwargs,
):
if event.type == "output" or event.type == "request_info":
if event.type in ("output", "request_info") or (
event.type == "data" and isinstance(event.data, (AgentResponse, AgentResponseUpdate))
):
Comment thread
moonbox3 marked this conversation as resolved.
Outdated
output_events.append(event)

result = self._convert_workflow_events_to_agent_response(response_id, output_events)
Expand Down Expand Up @@ -618,19 +620,18 @@ def _convert_workflow_event_to_agent_response_updates(
) -> list[AgentResponseUpdate]:
"""Convert a workflow event to a list of AgentResponseUpdate objects.

Events with type='output' and type='request_info' are processed.
Other workflow events are ignored as they are workflow-internal.

For 'output' events, AgentExecutor yields AgentResponseUpdate for streaming updates
via ctx.yield_output(). This method converts those to agent response updates.
Processes `output` and `request_info` events, plus `data` events carrying
`AgentResponse` or `AgentResponseUpdate` (emitted by orchestrations to surface
Comment thread
moonbox3 marked this conversation as resolved.
Outdated
intermediate participants when `intermediate_outputs=True`). Other event types
are workflow-internal and ignored.

Returns:
A list of AgentResponseUpdate objects. Empty list if the event is not relevant.
"""
if event.type == "output":
data: Any = event.data
if event.type == "output" or (event.type == "data" and isinstance(data, (AgentResponse, AgentResponseUpdate))):
# Convert workflow output to agent response updates.
# Handle different data types appropriately.
data = event.data
executor_id = event.executor_id

if isinstance(data, AgentResponseUpdate):
Expand Down
12 changes: 12 additions & 0 deletions python/packages/core/agent_framework/_workflows/_agent_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from .._types import AgentResponse, AgentResponseUpdate, Message, ResponseStream
from ._agent_utils import resolve_agent_id
from ._const import GLOBAL_KWARGS_KEY, WORKFLOW_RUN_KWARGS_KEY
from ._events import WorkflowEvent
from ._executor import Executor, handler
from ._message_utils import normalize_messages_input
from ._request_info_mixin import response_handler
Expand Down Expand Up @@ -141,6 +142,7 @@ def __init__(
id: str | None = None,
context_mode: Literal["full", "last_agent", "custom"] | None = None,
context_filter: Callable[[list[Message]], list[Message]] | None = None,
emit_intermediate_data: bool = False,
):
"""Initialize the executor with a unique identifier.

Expand All @@ -158,6 +160,10 @@ def __init__(
as context for the agent run.
context_filter: An optional function for filtering conversation context when context_mode is set
to "custom".
emit_intermediate_data: When True, additionally emits `data` events (via
`WorkflowEvent.emit`) carrying each AgentResponse / AgentResponseUpdate alongside
the existing `output` events. Orchestrations use this to surface intermediate
participants while reserving `output` events for the workflow's final answer.
"""
# Prefer provided id; else use agent.name if present; else generate deterministic prefix
exec_id = id or resolve_agent_id(agent)
Expand All @@ -183,6 +189,8 @@ def __init__(
if self._context_mode == "custom" and not self._context_filter:
raise ValueError("context_filter must be provided when context_mode is set to 'custom'.")

self._emit_intermediate_data = emit_intermediate_data

@property
def agent(self) -> SupportsAgentRun:
"""Get the underlying agent wrapped by this executor."""
Expand Down Expand Up @@ -429,6 +437,8 @@ async def _run_agent(self, ctx: WorkflowContext[Never, AgentResponse]) -> AgentR
client_kwargs=client_kwargs,
)
await ctx.yield_output(response)
if self._emit_intermediate_data:
Comment thread
moonbox3 marked this conversation as resolved.
Outdated
await ctx.add_event(WorkflowEvent.emit(self.id, response))

# Handle any user input requests
if response.user_input_requests:
Expand Down Expand Up @@ -472,6 +482,8 @@ async def _run_agent_streaming(self, ctx: WorkflowContext[Never, AgentResponseUp
async for update in stream:
updates.append(update)
await ctx.yield_output(update)
if self._emit_intermediate_data:
await ctx.add_event(WorkflowEvent.emit(self.id, update))
if update.user_input_requests:
streamed_user_input_requests.extend(update.user_input_requests)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,13 @@ async def _process_workflow_result(
else:
await asyncio.gather(*[ctx.send_message(output) for output in outputs])

# Forward data events from the sub-workflow so that intermediate
# observations (e.g. emit_intermediate_data from AgentExecutor) are
# visible in the parent workflow's event stream.
data_events = [event for event in result if isinstance(event, WorkflowEvent) and event.type == "data"]
for data_event in data_events:
await ctx.add_event(WorkflowEvent.emit(data_event.executor_id or "", data_event.data))

# Process request info events
for event in request_info_events:
request_id = event.request_id
Expand Down
58 changes: 58 additions & 0 deletions python/packages/core/tests/workflow/test_agent_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -699,3 +699,61 @@ async def test_resolve_executor_kwargs_empty_per_executor_does_not_fallback_to_g
resolved = {"exec_a": {}, GLOBAL_KWARGS_KEY: {"global_key": "global_val"}}
result = executor._resolve_executor_kwargs(resolved) # pyright: ignore[reportPrivateUsage]
assert result == {}


async def test_emit_intermediate_data_emits_data_events_non_streaming() -> None:
"""When emit_intermediate_data=True, AgentExecutor emits a data event with the AgentResponse."""
agent = _CountingAgent(id="agent_a", name="AgentA")
executor = AgentExecutor(agent, id="exec_a", emit_intermediate_data=True)
workflow = WorkflowBuilder(start_executor=executor).build()

output_events: list[WorkflowEvent[Any]] = []
data_events: list[WorkflowEvent[Any]] = []
for event in await workflow.run("hello"):
if event.type == "output":
output_events.append(event)
elif event.type == "data":
data_events.append(event)

# Output event still emitted (existing behavior unchanged)
assert len(output_events) == 1
assert isinstance(output_events[0].data, AgentResponse)
# Plus a parallel data event with the same AgentResponse payload
assert len(data_events) == 1
assert data_events[0].executor_id == "exec_a"
assert isinstance(data_events[0].data, AgentResponse)
assert data_events[0].data.messages[0].text == output_events[0].data.messages[0].text


async def test_emit_intermediate_data_emits_data_events_streaming() -> None:
"""When emit_intermediate_data=True and streaming, data events accompany each AgentResponseUpdate."""
agent = _CountingAgent(id="agent_a", name="AgentA")
executor = AgentExecutor(agent, id="exec_a", emit_intermediate_data=True)
workflow = WorkflowBuilder(start_executor=executor).build()

output_updates: list[WorkflowEvent[Any]] = []
data_updates: list[WorkflowEvent[Any]] = []
async for event in workflow.run("hello", stream=True):
if event.type == "output":
output_updates.append(event)
elif event.type == "data":
data_updates.append(event)

assert output_updates and all(isinstance(e.data, AgentResponseUpdate) for e in output_updates)
assert len(data_updates) == len(output_updates)
assert all(isinstance(e.data, AgentResponseUpdate) for e in data_updates)
assert all(e.executor_id == "exec_a" for e in data_updates)


async def test_emit_intermediate_data_default_false_no_data_events() -> None:
"""When emit_intermediate_data is not set, no extra data events are emitted (default behavior)."""
agent = _CountingAgent(id="agent_a", name="AgentA")
executor = AgentExecutor(agent, id="exec_a") # default: emit_intermediate_data=False
workflow = WorkflowBuilder(start_executor=executor).build()

data_events: list[WorkflowEvent[Any]] = []
for event in await workflow.run("hello"):
if event.type == "data":
data_events.append(event)

assert data_events == []
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from dataclasses import dataclass
from typing import Any, ClassVar, TypeAlias

from agent_framework._types import Message
from agent_framework._types import AgentResponse, Message
from agent_framework._workflows._agent_executor import AgentExecutor, AgentExecutorRequest, AgentExecutorResponse
from agent_framework._workflows._events import WorkflowEvent
from agent_framework._workflows._executor import Executor, handler
Expand Down Expand Up @@ -351,8 +351,8 @@ async def _check_termination(self) -> bool:
result = await result
return result

async def _check_terminate_and_yield(self, ctx: WorkflowContext[Never, list[Message]]) -> bool:
"""Check termination conditions and yield completion if met.
async def _check_terminate_and_yield(self, ctx: WorkflowContext[Never, AgentResponse]) -> bool:
"""Check termination conditions and yield the completion message if met.

Args:
ctx: Workflow context for yielding output
Expand All @@ -362,8 +362,9 @@ async def _check_terminate_and_yield(self, ctx: WorkflowContext[Never, list[Mess
"""
terminate = await self._check_termination()
if terminate:
self._append_messages([self._create_completion_message(self.TERMINATION_CONDITION_MET_MESSAGE)])
await ctx.yield_output(self._full_conversation)
completion_message = self._create_completion_message(self.TERMINATION_CONDITION_MET_MESSAGE)
self._append_messages([completion_message])
await ctx.yield_output(AgentResponse(messages=[completion_message]))
return True

return False
Expand Down Expand Up @@ -490,8 +491,8 @@ def _check_round_limit(self) -> bool:

return False

async def _check_round_limit_and_yield(self, ctx: WorkflowContext[Never, list[Message]]) -> bool:
"""Check round limit and yield completion if reached.
async def _check_round_limit_and_yield(self, ctx: WorkflowContext[Never, AgentResponse]) -> bool:
"""Check round limit and yield the max-rounds completion message if reached.

Args:
ctx: Workflow context for yielding output
Expand All @@ -501,8 +502,9 @@ async def _check_round_limit_and_yield(self, ctx: WorkflowContext[Never, list[Me
"""
reach_max_rounds = self._check_round_limit()
if reach_max_rounds:
self._append_messages([self._create_completion_message(self.MAX_ROUNDS_MET_MESSAGE)])
await ctx.yield_output(self._full_conversation)
completion_message = self._create_completion_message(self.MAX_ROUNDS_MET_MESSAGE)
self._append_messages([completion_message])
await ctx.yield_output(AgentResponse(messages=[completion_message]))
return True

return False
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from collections.abc import Callable, Sequence
from typing import Any

from agent_framework import Message, SupportsAgentRun
from agent_framework import AgentResponse, Message, SupportsAgentRun
from agent_framework._workflows._agent_executor import AgentExecutor, AgentExecutorRequest, AgentExecutorResponse
from agent_framework._workflows._agent_utils import resolve_agent_id
from agent_framework._workflows._checkpoint import CheckpointStorage
Expand Down Expand Up @@ -71,18 +71,20 @@ async def from_messages(


class _AggregateAgentConversations(Executor):
"""Aggregates agent responses and completes with combined ChatMessages.
"""Aggregates agent responses and completes with a single AgentResponse.

Emits a list[Message] shaped as:
[ single_user_prompt?, agent1_final_assistant, agent2_final_assistant, ... ]
Emits an `AgentResponse` whose `messages` are the final assistant message from each
participant (one message per agent), in deterministic participant order matching
Comment thread
moonbox3 marked this conversation as resolved.
the fan-in `sources` configuration. The user prompt is intentionally not included —
that is part of the input, not the answer.

- Extracts a single user prompt (first user message seen across results).
- For each result, selects the final assistant message (prefers agent_response.messages).
- Avoids duplicating the same user message per agent.
For each participant the final assistant message is sourced from
`r.agent_response.messages`, falling back to scanning `r.full_conversation` for
pathological executors that did not populate the response.
"""

@handler
async def aggregate(self, results: list[AgentExecutorResponse], ctx: WorkflowContext[Never, list[Message]]) -> None:
async def aggregate(self, results: list[AgentExecutorResponse], ctx: WorkflowContext[Never, AgentResponse]) -> None:
if not results:
logger.error("Concurrent aggregator received empty results list")
raise ValueError("Aggregation failed: no results provided")
Expand All @@ -91,12 +93,10 @@ def _is_role(msg: Any, role: str) -> bool:
r = getattr(msg, "role", None)
if r is None:
return False
# Normalize both r and role to lowercase strings for comparison
r_str = str(r).lower() if isinstance(r, str) or hasattr(r, "__str__") else r
role_str = str(role).lower()
return r_str == role_str

prompt_message: Message | None = None
assistant_replies: list[Message] = []

for r in results:
Expand All @@ -107,10 +107,6 @@ def _is_role(msg: Any, role: str) -> bool:
f"{len(resp_messages)} response msgs, {len(r.full_conversation)} conversation msgs"
)

# Capture a single user prompt (first encountered across any conversation)
if prompt_message is None:
prompt_message = next((m for m in r.full_conversation if _is_role(m, "user")), None)

# Pick the final assistant message from the response; fallback to conversation search
final_assistant = next((m for m in reversed(resp_messages) if _is_role(m, "assistant")), None)
if final_assistant is None:
Expand All @@ -127,14 +123,7 @@ def _is_role(msg: Any, role: str) -> bool:
logger.error(f"Aggregation failed: no assistant replies found across {len(results)} results")
raise RuntimeError("Aggregation failed: no assistant replies found")

output: list[Message] = []
if prompt_message is not None:
output.append(prompt_message)
else:
logger.warning("No user prompt found in any conversation; emitting assistants only")
output.extend(assistant_replies)

await ctx.yield_output(output)
await ctx.yield_output(AgentResponse(messages=assistant_replies))


class _CallbackAggregator(Executor):
Expand Down Expand Up @@ -190,7 +179,8 @@ class ConcurrentBuilder:

from agent_framework_orchestrations import ConcurrentBuilder

# Minimal: use default aggregator (returns list[Message])
# Minimal: use default aggregator (yields one AgentResponse with one assistant
# message per participant)
workflow = ConcurrentBuilder(participants=[agent1, agent2, agent3]).build()


Expand Down Expand Up @@ -351,7 +341,13 @@ def with_request_info(
return self

def _resolve_participants(self) -> list[Executor]:
"""Resolve participant instances into Executor objects."""
"""Resolve participant instances into Executor objects.

When `intermediate_outputs=True`, every wrapped agent is constructed with
`emit_intermediate_data=True` so its individual response surfaces as a `data`
event without polluting the single `output` event reserved for the aggregator's
final answer.
"""
if not self._participants:
raise ValueError("No participants provided. Pass participants to the constructor.")

Expand All @@ -366,9 +362,9 @@ def _resolve_participants(self) -> list[Executor]:
not self._request_info_filter or resolve_agent_id(p) in self._request_info_filter
):
# Handle request info enabled agents
executors.append(AgentApprovalExecutor(p))
executors.append(AgentApprovalExecutor(p, emit_intermediate_data=self._intermediate_outputs))
else:
executors.append(AgentExecutor(p))
executors.append(AgentExecutor(p, emit_intermediate_data=self._intermediate_outputs))
else:
raise TypeError(f"Participants must be SupportsAgentRun or Executor instances. Got {type(p).__name__}.")

Expand All @@ -383,7 +379,7 @@ def build(self) -> Workflow:
- If request info is enabled, the orchestration emits a request info event with outputs from all participants
before sending the outputs to the aggregator
- Aggregator yields output and the workflow becomes idle. The output is either:
- list[Message] (default aggregator: one user + one assistant per agent)
- AgentResponse (default aggregator: one assistant message per participant)
- custom payload from the provided aggregator

Returns:
Expand All @@ -408,7 +404,7 @@ def build(self) -> Workflow:
builder = WorkflowBuilder(
start_executor=dispatcher,
checkpoint_storage=self._checkpoint_storage,
output_executors=[aggregator] if not self._intermediate_outputs else None,
output_executors=[aggregator],
)
# Fan-out for parallel execution
builder.add_fan_out_edges(dispatcher, participants)
Expand Down
Loading
Loading