diff --git a/python/packages/core/agent_framework/_workflows/_agent.py b/python/packages/core/agent_framework/_workflows/_agent.py index d1ff567f81..d4f6c1411d 100644 --- a/python/packages/core/agent_framework/_workflows/_agent.py +++ b/python/packages/core/agent_framework/_workflows/_agent.py @@ -13,12 +13,15 @@ AgentRunResponseUpdate, AgentThread, BaseAgent, + BaseContent, ChatMessage, + Contents, FunctionApprovalRequestContent, FunctionApprovalResponseContent, FunctionCallContent, FunctionResultContent, Role, + TextContent, UsageDetails, ) @@ -28,6 +31,7 @@ AgentRunUpdateEvent, RequestInfoEvent, WorkflowEvent, + WorkflowOutputEvent, ) from ._message_utils import normalize_messages_input from ._typing_utils import is_type_compatible @@ -280,9 +284,8 @@ def _convert_workflow_event_to_agent_update( ) -> AgentRunResponseUpdate | None: """Convert a workflow event to an AgentRunResponseUpdate. - Only AgentRunUpdateEvent and RequestInfoEvent are processed. - Other workflow events are ignored as they are workflow-internal and should - have corresponding AgentRunUpdateEvent emissions if relevant to agent consumers. + AgentRunUpdateEvent, RequestInfoEvent, and WorkflowOutputEvent are processed. + Other workflow events are ignored as they are workflow-internal. """ match event: case AgentRunUpdateEvent(data=update): @@ -291,6 +294,42 @@ def _convert_workflow_event_to_agent_update( return update return None + case WorkflowOutputEvent(data=data, source_executor_id=source_executor_id): + # Convert workflow output to an agent response update. + # Handle different data types appropriately. + if isinstance(data, AgentRunResponseUpdate): + # Already an update, pass through + return data + if isinstance(data, ChatMessage): + # Convert ChatMessage to update + return AgentRunResponseUpdate( + contents=list(data.contents), + role=data.role, + author_name=data.author_name or source_executor_id, + response_id=response_id, + message_id=str(uuid.uuid4()), + created_at=datetime.now(tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%fZ"), + raw_representation=data, + ) + # Determine contents based on data type + if isinstance(data, BaseContent): + # Already a content type (TextContent, ImageContent, etc.) + contents: list[Contents] = [cast(Contents, data)] + elif isinstance(data, str): + contents = [TextContent(text=data)] + else: + # Fallback: convert to string representation + contents = [TextContent(text=str(data))] + return AgentRunResponseUpdate( + contents=contents, + role=Role.ASSISTANT, + author_name=source_executor_id, + response_id=response_id, + message_id=str(uuid.uuid4()), + created_at=datetime.now(tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%fZ"), + raw_representation=data, + ) + case RequestInfoEvent(request_id=request_id): # Store the pending request for later correlation self.pending_requests[request_id] = event diff --git a/python/packages/core/tests/workflow/test_workflow_agent.py b/python/packages/core/tests/workflow/test_workflow_agent.py index c005a0f9f9..51b3544b22 100644 --- a/python/packages/core/tests/workflow/test_workflow_agent.py +++ b/python/packages/core/tests/workflow/test_workflow_agent.py @@ -12,17 +12,20 @@ AgentThread, ChatMessage, ChatMessageStore, + DataContent, Executor, FunctionApprovalRequestContent, FunctionApprovalResponseContent, FunctionCallContent, Role, TextContent, + UriContent, UsageContent, UsageDetails, WorkflowAgent, WorkflowBuilder, WorkflowContext, + executor, handler, response_handler, ) @@ -284,6 +287,141 @@ async def handle_bool(self, message: bool, context: WorkflowContext[Any]) -> Non with pytest.raises(ValueError, match="Workflow's start executor cannot handle list\\[ChatMessage\\]"): workflow.as_agent() + async def test_workflow_as_agent_yield_output_surfaces_as_agent_response(self) -> None: + """Test that ctx.yield_output() in a workflow executor surfaces as agent output when using .as_agent(). + + This validates the fix for issue #2813: WorkflowOutputEvent should be converted to + AgentRunResponseUpdate when the workflow is wrapped via .as_agent(). + """ + + @executor + async def yielding_executor(messages: list[ChatMessage], ctx: WorkflowContext) -> None: + # Extract text from input for demonstration + input_text = messages[0].text if messages else "no input" + await ctx.yield_output(f"processed: {input_text}") + + workflow = WorkflowBuilder().set_start_executor(yielding_executor).build() + + # Run directly - should return WorkflowOutputEvent in result + direct_result = await workflow.run([ChatMessage(role=Role.USER, contents=[TextContent(text="hello")])]) + direct_outputs = direct_result.get_outputs() + assert len(direct_outputs) == 1 + assert direct_outputs[0] == "processed: hello" + + # Run as agent - yield_output should surface as agent response message + agent = workflow.as_agent("test-agent") + agent_result = await agent.run("hello") + + assert isinstance(agent_result, AgentRunResponse) + assert len(agent_result.messages) == 1 + assert agent_result.messages[0].text == "processed: hello" + + async def test_workflow_as_agent_yield_output_surfaces_in_run_stream(self) -> None: + """Test that ctx.yield_output() surfaces as AgentRunResponseUpdate when streaming.""" + + @executor + async def yielding_executor(messages: list[ChatMessage], ctx: WorkflowContext) -> None: + await ctx.yield_output("first output") + await ctx.yield_output("second output") + + workflow = WorkflowBuilder().set_start_executor(yielding_executor).build() + agent = workflow.as_agent("test-agent") + + updates: list[AgentRunResponseUpdate] = [] + async for update in agent.run_stream("hello"): + updates.append(update) + + # Should have received updates for both yield_output calls + texts = [u.text for u in updates if u.text] + assert "first output" in texts + assert "second output" in texts + + async def test_workflow_as_agent_yield_output_with_content_types(self) -> None: + """Test that yield_output preserves different content types (TextContent, DataContent, etc.).""" + + @executor + async def content_yielding_executor(messages: list[ChatMessage], ctx: WorkflowContext) -> None: + # Yield different content types + await ctx.yield_output(TextContent(text="text content")) + await ctx.yield_output(DataContent(data=b"binary data", media_type="application/octet-stream")) + await ctx.yield_output(UriContent(uri="https://example.com/image.png", media_type="image/png")) + + workflow = WorkflowBuilder().set_start_executor(content_yielding_executor).build() + agent = workflow.as_agent("content-test-agent") + + result = await agent.run("test") + + assert isinstance(result, AgentRunResponse) + assert len(result.messages) == 3 + + # Verify each content type is preserved + assert isinstance(result.messages[0].contents[0], TextContent) + assert result.messages[0].contents[0].text == "text content" + + assert isinstance(result.messages[1].contents[0], DataContent) + assert result.messages[1].contents[0].media_type == "application/octet-stream" + + assert isinstance(result.messages[2].contents[0], UriContent) + assert result.messages[2].contents[0].uri == "https://example.com/image.png" + + async def test_workflow_as_agent_yield_output_with_chat_message(self) -> None: + """Test that yield_output with ChatMessage preserves the message structure.""" + + @executor + async def chat_message_executor(messages: list[ChatMessage], ctx: WorkflowContext) -> None: + msg = ChatMessage( + role=Role.ASSISTANT, + contents=[TextContent(text="response text")], + author_name="custom-author", + ) + await ctx.yield_output(msg) + + workflow = WorkflowBuilder().set_start_executor(chat_message_executor).build() + agent = workflow.as_agent("chat-msg-agent") + + result = await agent.run("test") + + assert len(result.messages) == 1 + assert result.messages[0].role == Role.ASSISTANT + assert result.messages[0].text == "response text" + assert result.messages[0].author_name == "custom-author" + + async def test_workflow_as_agent_yield_output_sets_raw_representation(self) -> None: + """Test that yield_output sets raw_representation with the original data.""" + + # A custom object to verify raw_representation preserves the original data + class CustomData: + def __init__(self, value: int): + self.value = value + + def __str__(self) -> str: + return f"CustomData({self.value})" + + @executor + async def raw_yielding_executor(messages: list[ChatMessage], ctx: WorkflowContext) -> None: + # Yield different types of data + await ctx.yield_output("simple string") + await ctx.yield_output(TextContent(text="text content")) + custom = CustomData(42) + await ctx.yield_output(custom) + + workflow = WorkflowBuilder().set_start_executor(raw_yielding_executor).build() + agent = workflow.as_agent("raw-test-agent") + + updates: list[AgentRunResponseUpdate] = [] + async for update in agent.run_stream("test"): + updates.append(update) + + # Should have 3 updates + assert len(updates) == 3 + + # Verify raw_representation is set for each update + assert updates[0].raw_representation == "simple string" + assert isinstance(updates[1].raw_representation, TextContent) + assert updates[1].raw_representation.text == "text content" + assert isinstance(updates[2].raw_representation, CustomData) + assert updates[2].raw_representation.value == 42 + async def test_thread_conversation_history_included_in_workflow_run(self) -> None: """Test that conversation history from thread is included when running WorkflowAgent.