Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 42 additions & 3 deletions python/packages/core/agent_framework/_workflows/_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,15 @@
AgentRunResponseUpdate,
AgentThread,
BaseAgent,
BaseContent,
ChatMessage,
Contents,
FunctionApprovalRequestContent,
FunctionApprovalResponseContent,
FunctionCallContent,
FunctionResultContent,
Role,
TextContent,
UsageDetails,
)

Expand All @@ -28,6 +31,7 @@
AgentRunUpdateEvent,
RequestInfoEvent,
WorkflowEvent,
WorkflowOutputEvent,
)
from ._message_utils import normalize_messages_input
from ._typing_utils import is_type_compatible
Expand Down Expand Up @@ -280,9 +284,8 @@ def _convert_workflow_event_to_agent_update(
) -> AgentRunResponseUpdate | None:
"""Convert a workflow event to an AgentRunResponseUpdate.

Only AgentRunUpdateEvent and RequestInfoEvent are processed.
Other workflow events are ignored as they are workflow-internal and should
have corresponding AgentRunUpdateEvent emissions if relevant to agent consumers.
AgentRunUpdateEvent, RequestInfoEvent, and WorkflowOutputEvent are processed.
Other workflow events are ignored as they are workflow-internal.
"""
match event:
case AgentRunUpdateEvent(data=update):
Expand All @@ -291,6 +294,42 @@ def _convert_workflow_event_to_agent_update(
return update
return None

case WorkflowOutputEvent(data=data, source_executor_id=source_executor_id):
# Convert workflow output to an agent response update.
# Handle different data types appropriately.
if isinstance(data, AgentRunResponseUpdate):
# Already an update, pass through
return data
if isinstance(data, ChatMessage):
# Convert ChatMessage to update
return AgentRunResponseUpdate(
contents=list(data.contents),
role=data.role,
author_name=data.author_name or source_executor_id,
response_id=response_id,
message_id=str(uuid.uuid4()),
created_at=datetime.now(tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
raw_representation=data,
)
# Determine contents based on data type
if isinstance(data, BaseContent):
# Already a content type (TextContent, ImageContent, etc.)
contents: list[Contents] = [cast(Contents, data)]
elif isinstance(data, str):
contents = [TextContent(text=data)]
else:
# Fallback: convert to string representation
contents = [TextContent(text=str(data))]
return AgentRunResponseUpdate(
contents=contents,
role=Role.ASSISTANT,
author_name=source_executor_id,
response_id=response_id,
message_id=str(uuid.uuid4()),
created_at=datetime.now(tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
raw_representation=data,
)

case RequestInfoEvent(request_id=request_id):
# Store the pending request for later correlation
self.pending_requests[request_id] = event
Expand Down
138 changes: 138 additions & 0 deletions python/packages/core/tests/workflow/test_workflow_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,20 @@
AgentThread,
ChatMessage,
ChatMessageStore,
DataContent,
Executor,
FunctionApprovalRequestContent,
FunctionApprovalResponseContent,
FunctionCallContent,
Role,
TextContent,
UriContent,
UsageContent,
UsageDetails,
WorkflowAgent,
WorkflowBuilder,
WorkflowContext,
executor,
handler,
response_handler,
)
Expand Down Expand Up @@ -284,6 +287,141 @@ async def handle_bool(self, message: bool, context: WorkflowContext[Any]) -> Non
with pytest.raises(ValueError, match="Workflow's start executor cannot handle list\\[ChatMessage\\]"):
workflow.as_agent()

async def test_workflow_as_agent_yield_output_surfaces_as_agent_response(self) -> None:
"""Test that ctx.yield_output() in a workflow executor surfaces as agent output when using .as_agent().

This validates the fix for issue #2813: WorkflowOutputEvent should be converted to
AgentRunResponseUpdate when the workflow is wrapped via .as_agent().
"""

@executor
async def yielding_executor(messages: list[ChatMessage], ctx: WorkflowContext) -> None:
# Extract text from input for demonstration
input_text = messages[0].text if messages else "no input"
await ctx.yield_output(f"processed: {input_text}")

workflow = WorkflowBuilder().set_start_executor(yielding_executor).build()

# Run directly - should return WorkflowOutputEvent in result
direct_result = await workflow.run([ChatMessage(role=Role.USER, contents=[TextContent(text="hello")])])
direct_outputs = direct_result.get_outputs()
assert len(direct_outputs) == 1
assert direct_outputs[0] == "processed: hello"

# Run as agent - yield_output should surface as agent response message
agent = workflow.as_agent("test-agent")
agent_result = await agent.run("hello")

assert isinstance(agent_result, AgentRunResponse)
assert len(agent_result.messages) == 1
assert agent_result.messages[0].text == "processed: hello"

async def test_workflow_as_agent_yield_output_surfaces_in_run_stream(self) -> None:
"""Test that ctx.yield_output() surfaces as AgentRunResponseUpdate when streaming."""

@executor
async def yielding_executor(messages: list[ChatMessage], ctx: WorkflowContext) -> None:
await ctx.yield_output("first output")
await ctx.yield_output("second output")

workflow = WorkflowBuilder().set_start_executor(yielding_executor).build()
agent = workflow.as_agent("test-agent")

updates: list[AgentRunResponseUpdate] = []
async for update in agent.run_stream("hello"):
updates.append(update)

# Should have received updates for both yield_output calls
texts = [u.text for u in updates if u.text]
assert "first output" in texts
assert "second output" in texts

async def test_workflow_as_agent_yield_output_with_content_types(self) -> None:
"""Test that yield_output preserves different content types (TextContent, DataContent, etc.)."""

@executor
async def content_yielding_executor(messages: list[ChatMessage], ctx: WorkflowContext) -> None:
# Yield different content types
await ctx.yield_output(TextContent(text="text content"))
await ctx.yield_output(DataContent(data=b"binary data", media_type="application/octet-stream"))
await ctx.yield_output(UriContent(uri="https://example.com/image.png", media_type="image/png"))

workflow = WorkflowBuilder().set_start_executor(content_yielding_executor).build()
agent = workflow.as_agent("content-test-agent")

result = await agent.run("test")

assert isinstance(result, AgentRunResponse)
assert len(result.messages) == 3

# Verify each content type is preserved
assert isinstance(result.messages[0].contents[0], TextContent)
assert result.messages[0].contents[0].text == "text content"

assert isinstance(result.messages[1].contents[0], DataContent)
assert result.messages[1].contents[0].media_type == "application/octet-stream"

assert isinstance(result.messages[2].contents[0], UriContent)
assert result.messages[2].contents[0].uri == "https://example.com/image.png"

async def test_workflow_as_agent_yield_output_with_chat_message(self) -> None:
"""Test that yield_output with ChatMessage preserves the message structure."""

@executor
async def chat_message_executor(messages: list[ChatMessage], ctx: WorkflowContext) -> None:
msg = ChatMessage(
role=Role.ASSISTANT,
contents=[TextContent(text="response text")],
author_name="custom-author",
)
await ctx.yield_output(msg)

workflow = WorkflowBuilder().set_start_executor(chat_message_executor).build()
agent = workflow.as_agent("chat-msg-agent")

result = await agent.run("test")

assert len(result.messages) == 1
assert result.messages[0].role == Role.ASSISTANT
assert result.messages[0].text == "response text"
assert result.messages[0].author_name == "custom-author"

async def test_workflow_as_agent_yield_output_sets_raw_representation(self) -> None:
"""Test that yield_output sets raw_representation with the original data."""

# A custom object to verify raw_representation preserves the original data
class CustomData:
def __init__(self, value: int):
self.value = value

def __str__(self) -> str:
return f"CustomData({self.value})"

@executor
async def raw_yielding_executor(messages: list[ChatMessage], ctx: WorkflowContext) -> None:
# Yield different types of data
await ctx.yield_output("simple string")
await ctx.yield_output(TextContent(text="text content"))
custom = CustomData(42)
await ctx.yield_output(custom)

workflow = WorkflowBuilder().set_start_executor(raw_yielding_executor).build()
agent = workflow.as_agent("raw-test-agent")

updates: list[AgentRunResponseUpdate] = []
async for update in agent.run_stream("test"):
updates.append(update)

# Should have 3 updates
assert len(updates) == 3

# Verify raw_representation is set for each update
assert updates[0].raw_representation == "simple string"
assert isinstance(updates[1].raw_representation, TextContent)
assert updates[1].raw_representation.text == "text content"
assert isinstance(updates[2].raw_representation, CustomData)
assert updates[2].raw_representation.value == 42

async def test_thread_conversation_history_included_in_workflow_run(self) -> None:
"""Test that conversation history from thread is included when running WorkflowAgent.

Expand Down
Loading