diff --git a/python/packages/core/agent_framework/_workflows/_agent.py b/python/packages/core/agent_framework/_workflows/_agent.py index 9e766d354c..5074139fdb 100644 --- a/python/packages/core/agent_framework/_workflows/_agent.py +++ b/python/packages/core/agent_framework/_workflows/_agent.py @@ -2,11 +2,13 @@ import json import logging +import types +import typing import uuid -from collections.abc import AsyncIterable, Sequence +from collections.abc import AsyncIterable, Callable, Sequence from dataclasses import dataclass from datetime import datetime -from typing import TYPE_CHECKING, Any, ClassVar, TypedDict, cast +from typing import TYPE_CHECKING, Any, ClassVar, TypedDict, cast, get_args, get_origin from agent_framework import ( AgentRunResponse, @@ -29,6 +31,7 @@ ) if TYPE_CHECKING: + from ._executor import Executor from ._workflow import Workflow logger = logging.getLogger(__name__) @@ -91,8 +94,7 @@ def __init__( except KeyError as exc: # Defensive: workflow lacks a configured entry point raise ValueError("Workflow's start executor is not defined.") from exc - if list[ChatMessage] not in start_executor.input_types: - raise ValueError("Workflow's start executor cannot handle list[ChatMessage]") + self._start_payload_encoder = self._resolve_start_payload_encoder(start_executor) super().__init__(id=id, name=name, description=description, **kwargs) self._workflow: "Workflow" = workflow @@ -106,6 +108,143 @@ def workflow(self) -> "Workflow": def pending_requests(self) -> dict[str, RequestInfoEvent]: return self._pending_requests + def _resolve_start_payload_encoder(self, start_executor: "Executor") -> Callable[[list[ChatMessage]], Any]: + """Determine how to map agent chat messages to the workflow's start executor input.""" + probe_conversation = [ChatMessage(role=Role.USER, text="__agent_probe__")] + if start_executor.can_handle(probe_conversation): + return lambda messages: list(messages) + + for adapter in self._candidate_adapters_from_input_types(start_executor.input_types): + try: + probe_payload = adapter(probe_conversation) + except ValueError: + continue + if start_executor.can_handle(probe_payload): + return adapter + + raise ValueError("Workflow's start executor cannot be adapted to agent chat inputs.") + + def _candidate_adapters_from_input_types( + self, + input_types: Sequence[type[Any]], + ) -> list[Callable[[list[ChatMessage]], Any]]: + adapters: list[Callable[[list[ChatMessage]], Any]] = [] + for annotation in input_types: + for candidate in self._flatten_type_annotation(annotation): + adapter = self._adapter_for_concrete_type(candidate) + if adapter is not None and adapter not in adapters: + adapters.append(adapter) + return adapters + + def _flatten_type_annotation(self, annotation: Any) -> list[Any]: + origin = get_origin(annotation) + if origin is None: + return [annotation] + + if origin in (types.UnionType, typing.Union): + flattened: list[Any] = [] + for arg in get_args(annotation): + flattened.extend(self._flatten_type_annotation(arg)) + return flattened + + if origin is typing.Annotated: + args = get_args(annotation) + return self._flatten_type_annotation(args[0]) if args else [] + + return [annotation] + + def _adapter_for_concrete_type(self, message_type: Any) -> Callable[[list[ChatMessage]], Any] | None: + if self._is_chat_message_list_type(message_type): + return lambda messages: list(messages) + + if self._is_chat_message_type(message_type): + return self._messages_to_single_chat_message + + if message_type is str: + return self._messages_to_text + + if isinstance(message_type, type): + specialized = self._adapter_for_specialized_class(message_type) + if specialized is not None: + return specialized + + return None + + def _adapter_for_specialized_class(self, message_cls: type[Any]) -> Callable[[list[ChatMessage]], Any] | None: + try: + from ._magentic import MagenticStartMessage + except Exception: # pragma: no cover - optional dependency + MagenticStartMessage = None # type: ignore + + if MagenticStartMessage is not None and message_cls is MagenticStartMessage: + return self._build_magentic_start_adapter(MagenticStartMessage) + + return None + + def _build_magentic_start_adapter( + self, + message_cls: type[Any], + ) -> Callable[[list[ChatMessage]], Any]: + def adapter(messages: list[ChatMessage]) -> Any: + task_message = self._select_user_or_last_message(messages) + try: + return message_cls(task=task_message) + except TypeError as exc: + raise ValueError("Cannot construct MagenticStartMessage from provided chat messages.") from exc + + return adapter + + def _is_chat_message_list_type(self, annotation: Any) -> bool: + if annotation is list: + return True + + origin = get_origin(annotation) + if origin in (list, Sequence): + args = get_args(annotation) + if not args: + return origin is list + return all(self._is_chat_message_type(arg) for arg in args) + + return False + + def _is_chat_message_type(self, annotation: Any) -> bool: + if annotation is ChatMessage: + return True + return isinstance(annotation, type) and issubclass(annotation, ChatMessage) + + def _messages_to_single_chat_message(self, messages: list[ChatMessage]) -> ChatMessage: + return self._select_user_or_last_message(messages) + + def _messages_to_text(self, messages: list[ChatMessage]) -> str: + message = self._select_user_or_last_message(messages) + text = message.text.strip() + if text: + return text + + fallback_parts: list[str] = [] + for content in message.contents: + candidate = getattr(content, "text", None) + if isinstance(candidate, str) and candidate: + fallback_parts.append(candidate) + else: + rendered = str(content) + if rendered: + fallback_parts.append(rendered) + + if fallback_parts: + return " ".join(fallback_parts) + + raise ValueError("Cannot derive plain-text prompt from chat message contents.") + + def _select_user_or_last_message(self, messages: list[ChatMessage]) -> ChatMessage: + if not messages: + raise ValueError("At least one ChatMessage is required to start the workflow.") + + for message in reversed(messages): + if getattr(message, "role", None) == Role.USER: + return message + return messages[-1] + async def run( self, messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None, @@ -213,8 +352,8 @@ async def _run_stream_impl( event_stream = self.workflow.send_responses_streaming(function_responses) else: # Execute workflow with streaming (initial run or no function responses) - # Pass the new input messages directly to the workflow - event_stream = self.workflow.run_stream(input_messages) + start_payload = self._start_payload_encoder(input_messages) + event_stream = self.workflow.run_stream(start_payload) # Process events from the stream async for event in event_stream: diff --git a/python/packages/core/agent_framework/_workflows/_magentic.py b/python/packages/core/agent_framework/_workflows/_magentic.py index 3ee1c10690..3ddcfc4258 100644 --- a/python/packages/core/agent_framework/_workflows/_magentic.py +++ b/python/packages/core/agent_framework/_workflows/_magentic.py @@ -10,7 +10,7 @@ from collections.abc import AsyncIterable, Awaitable, Callable from dataclasses import dataclass, field from enum import Enum -from typing import Any, Literal, Protocol, TypeVar, Union, cast +from typing import TYPE_CHECKING, Any, Literal, Protocol, TypeVar, Union, cast from uuid import uuid4 from agent_framework import ( @@ -38,6 +38,9 @@ else: from typing_extensions import Self # pragma: no cover +if TYPE_CHECKING: + from ._agent import WorkflowAgent + logger = logging.getLogger(__name__) # Consistent author name for messages produced by the Magentic manager/orchestrator @@ -2043,6 +2046,10 @@ def workflow(self) -> Workflow: """Access the underlying workflow.""" return self._workflow + def as_agent(self, name: str | None = None) -> "WorkflowAgent": + """Expose the underlying workflow as a WorkflowAgent.""" + return self._workflow.as_agent(name=name) + async def run_streaming_with_string(self, task_text: str) -> AsyncIterable[WorkflowEvent]: """Run the workflow with a task string. diff --git a/python/packages/core/tests/workflow/test_workflow_agent.py b/python/packages/core/tests/workflow/test_workflow_agent.py index 842ec142ef..713e275edc 100644 --- a/python/packages/core/tests/workflow/test_workflow_agent.py +++ b/python/packages/core/tests/workflow/test_workflow_agent.py @@ -1,6 +1,7 @@ # Copyright (c) Microsoft. All rights reserved. import uuid +from collections.abc import AsyncIterable from typing import Any import pytest @@ -9,13 +10,22 @@ AgentRunResponse, AgentRunResponseUpdate, AgentRunUpdateEvent, + AgentThread, + BaseAgent, ChatMessage, + ConcurrentBuilder, Executor, FunctionCallContent, FunctionResultContent, + MagenticBuilder, + MagenticContext, + MagenticManagerBase, + MagenticProgressLedger, + MagenticProgressLedgerItem, RequestInfoExecutor, RequestInfoMessage, Role, + SequentialBuilder, TextContent, UsageContent, UsageDetails, @@ -24,6 +34,7 @@ WorkflowContext, handler, ) +from agent_framework._workflows._magentic import MagenticStartMessage class SimpleExecutor(Executor): @@ -73,6 +84,88 @@ async def handle_request_response(self, _: Any, ctx: WorkflowContext[ChatMessage await ctx.add_event(AgentRunUpdateEvent(executor_id=self.id, data=update)) +class _StubAgent(BaseAgent): + """Minimal agent that returns a fixed reply for testing orchestrators.""" + + def __init__(self, name: str, reply: str) -> None: + super().__init__(name=name) + self._reply = reply + + async def run( + self, + messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None, + *, + thread: AgentThread | None = None, + **kwargs: Any, + ) -> AgentRunResponse: + response_message = ChatMessage(role=Role.ASSISTANT, contents=[TextContent(text=self._reply)]) + response = AgentRunResponse(messages=[response_message], response_id=str(uuid.uuid4())) + thread = thread or self.get_new_thread() + normalized_messages: list[ChatMessage] = [] + if isinstance(messages, ChatMessage): + normalized_messages = [messages] + elif isinstance(messages, list): + normalized_messages = [ + m if isinstance(m, ChatMessage) else ChatMessage(role=Role.USER, text=str(m)) for m in messages + ] + elif isinstance(messages, str): + normalized_messages = [ChatMessage(role=Role.USER, text=messages)] + await self._notify_thread_of_new_messages(thread, normalized_messages, response.messages) + return response + + def run_stream( + self, + messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None, + *, + thread: AgentThread | None = None, + **kwargs: Any, + ) -> AsyncIterable[AgentRunResponseUpdate]: + async def _stream() -> AsyncIterable[AgentRunResponseUpdate]: + yield AgentRunResponseUpdate( + contents=[TextContent(text=self._reply)], + role=Role.ASSISTANT, + response_id=str(uuid.uuid4()), + message_id=str(uuid.uuid4()), + ) + + return _stream() + + +class _MiniMagenticManager(MagenticManagerBase): + """Deterministic manager that drives a single agent to completion quickly.""" + + def __init__(self) -> None: + super().__init__(max_round_count=3, max_stall_count=1) + self._progress_calls = 0 + + async def plan(self, magentic_context: MagenticContext) -> ChatMessage: + return ChatMessage(role=Role.ASSISTANT, text="Plan: proceed") + + async def replan(self, magentic_context: MagenticContext) -> ChatMessage: + return ChatMessage(role=Role.ASSISTANT, text="Plan: revise") + + async def create_progress_ledger(self, magentic_context: MagenticContext) -> MagenticProgressLedger: + self._progress_calls += 1 + agent_name = next(iter(magentic_context.participant_descriptions), "agent") + is_satisfied = self._progress_calls > 1 and any( + msg.role == Role.ASSISTANT for msg in magentic_context.chat_history + ) + + return MagenticProgressLedger( + is_request_satisfied=MagenticProgressLedgerItem(reason="status", answer=is_satisfied), + is_in_loop=MagenticProgressLedgerItem(reason="status", answer=False), + is_progress_being_made=MagenticProgressLedgerItem(reason="status", answer=True), + next_speaker=MagenticProgressLedgerItem(reason="status", answer=agent_name if not is_satisfied else ""), + instruction_or_question=MagenticProgressLedgerItem( + reason="status", + answer="Provide update" if not is_satisfied else "", + ), + ) + + async def prepare_final_answer(self, magentic_context: MagenticContext) -> ChatMessage: + return ChatMessage(role=Role.ASSISTANT, text="Final answer prepared.") + + class TestWorkflowAgent: """Test cases for WorkflowAgent end-to-end functionality.""" @@ -234,9 +327,71 @@ async def handle_bool(self, message: bool, context: WorkflowContext[Any]) -> Non workflow = WorkflowBuilder().set_start_executor(executor).build() # Try to create an agent with unsupported input types - with pytest.raises(ValueError, match="Workflow's start executor cannot handle list\\[ChatMessage\\]"): + with pytest.raises(ValueError, match="Workflow's start executor cannot be adapted to agent chat inputs"): workflow.as_agent() + async def test_sequential_builder_as_agent_roundtrip(self) -> None: + """Ensure SequentialBuilder workflows can be invoked through WorkflowAgent.""" + workflow = SequentialBuilder().participants([SimpleExecutor(id="seq", response_text="SeqStep")]).build() + agent = workflow.as_agent(name="Sequential") + + result = await agent.run("Sequential input") + assert isinstance(result, AgentRunResponse) + + texts = [ + content.text + for message in result.messages or [] + for content in message.contents + if isinstance(content, TextContent) + ] + assert any("SeqStep" in text for text in texts) + + async def test_concurrent_builder_as_agent_roundtrip(self) -> None: + """Ensure ConcurrentBuilder workflows can be invoked through WorkflowAgent.""" + workflow = ( + ConcurrentBuilder() + .participants([ + _StubAgent(name="agent-1", reply="Reply One"), + _StubAgent(name="agent-2", reply="Reply Two"), + ]) + .build() + ) + + agent = workflow.as_agent(name="Concurrent") + result = await agent.run("Concurrent input") + assert isinstance(result, AgentRunResponse) + + texts = [ + content.text + for message in result.messages or [] + for content in message.contents + if isinstance(content, TextContent) + ] + assert any("Reply One" in text for text in texts) + assert any("Reply Two" in text for text in texts) + + async def test_magentic_builder_as_agent_payload(self) -> None: + """Ensure MagenticBuilder workflows expose valid agents and payload adapters.""" + workflow_wrapper = ( + MagenticBuilder() + .participants(coordinator=_StubAgent(name="coordinator", reply="Agent completed.")) + .with_standard_manager(_MiniMagenticManager()) + .build() + ) + + agent = workflow_wrapper.as_agent(name="Magentic") + + source_messages = [ChatMessage(role=Role.USER, text="Execute task")] + payload = agent._start_payload_encoder(source_messages) + assert isinstance(payload, MagenticStartMessage) + assert payload.task.text == "Execute task" + + result = await agent.run("Execute task") + assert isinstance(result, AgentRunResponse) + + assert isinstance(result.messages, list) + assert result.response_id is not None + class TestWorkflowAgentMergeUpdates: """Test cases specifically for the WorkflowAgent.merge_updates static method.""" diff --git a/python/samples/getting_started/workflows/README.md b/python/samples/getting_started/workflows/README.md index 17780a7aac..ca51b75fc7 100644 --- a/python/samples/getting_started/workflows/README.md +++ b/python/samples/getting_started/workflows/README.md @@ -39,6 +39,9 @@ Once comfortable with these, explore the rest of the samples below. | Azure Chat Agents (Function Bridge) | [agents/azure_chat_agents_function_bridge.py](./agents/azure_chat_agents_function_bridge.py) | Chain two agents with a function executor that injects external context | | Azure Chat Agents (Tools + HITL) | [agents/azure_chat_agents_tool_calls_with_feedback.py](./agents/azure_chat_agents_tool_calls_with_feedback.py) | Tool-enabled writer/editor pipeline with human feedback gating via RequestInfoExecutor | | Custom Agent Executors | [agents/custom_agent_executors.py](./agents/custom_agent_executors.py) | Create executors to handle agent run methods | +| Sequential Workflow as Agent | [agents/sequential_workflow_as_agent.py](./agents/sequential_workflow_as_agent.py) | Build a sequential workflow orchestrating agents, then expose it as a reusable agent | +| Concurrent Workflow as Agent | [agents/concurrent_workflow_as_agent.py](./agents/concurrent_workflow_as_agent.py) | Build a concurrent fan-out/fan-in workflow, then expose it as a reusable agent | +| Magentic Workflow as Agent | [agents/magentic_workflow_as_agent.py](./agents/magentic_workflow_as_agent.py) | Configure Magentic orchestration with callbacks, then expose the workflow as an agent | | Workflow as Agent (Reflection Pattern) | [agents/workflow_as_agent_reflection_pattern.py](./agents/workflow_as_agent_reflection_pattern.py) | Wrap a workflow so it can behave like an agent (reflection pattern) | | Workflow as Agent + HITL | [agents/workflow_as_agent_human_in_the_loop.py](./agents/workflow_as_agent_human_in_the_loop.py) | Extend workflow-as-agent with human-in-the-loop capability | diff --git a/python/samples/getting_started/workflows/agents/concurrent_workflow_as_agent.py b/python/samples/getting_started/workflows/agents/concurrent_workflow_as_agent.py new file mode 100644 index 0000000000..29dfc1874f --- /dev/null +++ b/python/samples/getting_started/workflows/agents/concurrent_workflow_as_agent.py @@ -0,0 +1,126 @@ +# Copyright (c) Microsoft. All rights reserved. + +import asyncio + +from agent_framework import ConcurrentBuilder +from agent_framework.azure import AzureOpenAIChatClient +from azure.identity import AzureCliCredential + +""" +Sample: Build a concurrent workflow orchestration and wrap it as an agent. + +This script wires up a fan-out/fan-in workflow using `ConcurrentBuilder`, and then +invokes the entire orchestration through the `workflow.as_agent(...)` interface so +downstream coordinators can reuse the orchestration as a single agent. + +Demonstrates: +- Fan-out to multiple agents, fan-in aggregation of final ChatMessages. +- Reusing the orchestrated workflow as an agent entry point with `workflow.as_agent(...)`. +- Workflow completion when idle with no pending work + +Prerequisites: +- Azure OpenAI access configured for AzureOpenAIChatClient (use az login + env vars) +- Familiarity with Workflow events (AgentRunEvent, WorkflowOutputEvent) +""" + + +async def main() -> None: + # 1) Create three domain agents using AzureOpenAIChatClient + chat_client = AzureOpenAIChatClient(credential=AzureCliCredential()) + + researcher = chat_client.create_agent( + instructions=( + "You're an expert market and product researcher. Given a prompt, provide concise, factual insights," + " opportunities, and risks." + ), + name="researcher", + ) + + marketer = chat_client.create_agent( + instructions=( + "You're a creative marketing strategist. Craft compelling value propositions and target messaging" + " aligned to the prompt." + ), + name="marketer", + ) + + legal = chat_client.create_agent( + instructions=( + "You're a cautious legal/compliance reviewer. Highlight constraints, disclaimers, and policy concerns" + " based on the prompt." + ), + name="legal", + ) + + # 2) Build a concurrent workflow + workflow = ConcurrentBuilder().participants([researcher, marketer, legal]).build() + + # 3) Expose the concurrent workflow as an agent for easy reuse + agent = workflow.as_agent(name="ConcurrentWorkflowAgent") + prompt = "We are launching a new budget-friendly electric bike for urban commuters." + agent_response = await agent.run(prompt) + + if agent_response.messages: + print("\n===== Aggregated Messages =====") + for i, msg in enumerate(agent_response.messages, start=1): + role = getattr(msg.role, "value", msg.role) + name = msg.author_name if msg.author_name else role + print(f"{'-' * 60}\n\n{i:02d} [{name}]:\n{msg.text}") + + """ + Sample Output: + + ===== Aggregated Messages ===== + ------------------------------------------------------------ + + 01 [user]: + We are launching a new budget-friendly electric bike for urban commuters. + ------------------------------------------------------------ + + 02 [researcher]: + **Insights:** + + - **Target Demographic:** Urban commuters seeking affordable, eco-friendly transport; + likely to include students, young professionals, and price-sensitive urban residents. + - **Market Trends:** E-bike sales are growing globally, with increasing urbanization, + higher fuel costs, and sustainability concerns driving adoption. + - **Competitive Landscape:** Key competitors include brands like Rad Power Bikes, Aventon, + Lectric, and domestic budget-focused manufacturers in North America, Europe, and Asia. + - **Feature Expectations:** Customers expect reliability, ease-of-use, theft protection, + lightweight design, sufficient battery range for daily city commutes (typically 25-40 miles), + and low-maintenance components. + + **Opportunities:** + + - **First-time Buyers:** Capture newcomers to e-biking by emphasizing affordability, ease of + operation, and cost savings vs. public transit/car ownership. + ... + ------------------------------------------------------------ + + 03 [marketer]: + **Value Proposition:** + "Empowering your city commute: Our new electric bike combines affordability, reliability, and + sustainable design—helping you conquer urban journeys without breaking the bank." + + **Target Messaging:** + + *For Young Professionals:* + ... + ------------------------------------------------------------ + + 04 [legal]: + **Constraints, Disclaimers, & Policy Concerns for Launching a Budget-Friendly Electric Bike for Urban Commuters:** + + **1. Regulatory Compliance** + - Verify that the electric bike meets all applicable federal, state, and local regulations + regarding e-bike classification, speed limits, power output, and safety features. + - Ensure necessary certifications (e.g., UL certification for batteries, CE markings if sold internationally) are obtained. + + **2. Product Safety** + - Include consumer safety warnings regarding use, battery handling, charging protocols, and age restrictions. + ... + """ # noqa: E501 + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/python/samples/getting_started/workflows/agents/magentic_workflow_as_agent.py b/python/samples/getting_started/workflows/agents/magentic_workflow_as_agent.py new file mode 100644 index 0000000000..d56470e696 --- /dev/null +++ b/python/samples/getting_started/workflows/agents/magentic_workflow_as_agent.py @@ -0,0 +1,136 @@ +# Copyright (c) Microsoft. All rights reserved. + +import asyncio +import logging + +from agent_framework import ( + ChatAgent, + HostedCodeInterpreterTool, + MagenticAgentDeltaEvent, + MagenticAgentMessageEvent, + MagenticBuilder, + MagenticCallbackEvent, + MagenticCallbackMode, + MagenticFinalResultEvent, + MagenticOrchestratorMessageEvent, +) +from agent_framework.openai import OpenAIChatClient, OpenAIResponsesClient + +logging.basicConfig(level=logging.DEBUG) +logger = logging.getLogger(__name__) + +""" +Sample: Build a Magentic orchestration and wrap it as an agent. + +The script configures a Magentic workflow with streaming callbacks, then invokes the +orchestration through `workflow.as_agent(...)` so the entire Magentic loop can be reused +like any other agent while still emitting callback telemetry. + +Prerequisites: +- OpenAI credentials configured for `OpenAIChatClient` and `OpenAIResponsesClient`. +""" + + +async def main() -> None: + researcher_agent = ChatAgent( + name="ResearcherAgent", + description="Specialist in research and information gathering", + instructions=( + "You are a Researcher. You find information without additional computation or quantitative analysis." + ), + # This agent requires the gpt-4o-search-preview model to perform web searches. + # Feel free to explore with other agents that support web search, for example, + # the `OpenAIResponseAgent` or `AzureAgentProtocol` with bing grounding. + chat_client=OpenAIChatClient(model_id="gpt-4o-search-preview"), + ) + + coder_agent = ChatAgent( + name="CoderAgent", + description="A helpful assistant that writes and executes code to process and analyze data.", + instructions="You solve questions using code. Please provide detailed analysis and computation process.", + chat_client=OpenAIResponsesClient(), + tools=HostedCodeInterpreterTool(), + ) + + # Unified callback + async def on_event(event: MagenticCallbackEvent) -> None: + """ + The `on_event` callback processes events emitted by the workflow. + Events include: orchestrator messages, agent delta updates, agent messages, and final result events. + """ + nonlocal last_stream_agent_id, stream_line_open + if isinstance(event, MagenticOrchestratorMessageEvent): + print(f"\n[ORCH:{event.kind}]\n\n{getattr(event.message, 'text', '')}\n{'-' * 26}") + elif isinstance(event, MagenticAgentDeltaEvent): + if last_stream_agent_id != event.agent_id or not stream_line_open: + if stream_line_open: + print() + print(f"\n[STREAM:{event.agent_id}]: ", end="", flush=True) + last_stream_agent_id = event.agent_id + stream_line_open = True + print(event.text, end="", flush=True) + elif isinstance(event, MagenticAgentMessageEvent): + if stream_line_open: + print(" (final)") + stream_line_open = False + print() + msg = event.message + if msg is not None: + response_text = (msg.text or "").replace("\n", " ") + print(f"\n[AGENT:{event.agent_id}] {msg.role.value}\n\n{response_text}\n{'-' * 26}") + elif isinstance(event, MagenticFinalResultEvent): + print("\n" + "=" * 50) + print("FINAL RESULT:") + print("=" * 50) + if event.message is not None: + print(event.message.text) + print("=" * 50) + + print("\nBuilding Magentic Workflow...") + + # State used by on_agent_stream callback + last_stream_agent_id: str | None = None + stream_line_open: bool = False + + workflow = ( + MagenticBuilder() + .participants(researcher=researcher_agent, coder=coder_agent) + .on_event(on_event, mode=MagenticCallbackMode.STREAMING) + .with_standard_manager( + chat_client=OpenAIChatClient(), + max_round_count=10, + max_stall_count=3, + max_reset_count=2, + ) + .build() + ) + + task = ( + "I am preparing a report on the energy efficiency of different machine learning model architectures. " + "Compare the estimated training and inference energy consumption of ResNet-50, BERT-base, and GPT-2 " + "on standard datasets (e.g., ImageNet for ResNet, GLUE for BERT, WebText for GPT-2). " + "Then, estimate the CO2 emissions associated with each, assuming training on an Azure Standard_NC6s_v3 " + "VM for 24 hours. Provide tables for clarity, and recommend the most energy-efficient model " + "per task type (image classification, text classification, and text generation)." + ) + + print(f"\nTask: {task}") + print("\nStarting workflow execution...") + + try: + workflow_agent = workflow.as_agent(name="MagenticWorkflowAgent") + agent_result = await workflow_agent.run(task) + + if agent_result.messages: + print("\n===== as_agent() Transcript =====") + for i, msg in enumerate(agent_result.messages, start=1): + role_value = getattr(msg.role, "value", msg.role) + speaker = msg.author_name or role_value + print(f"{'-' * 50}\n{i:02d} [{speaker}]\n{msg.text}") + + except Exception as e: + print(f"Workflow execution failed: {e}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/python/samples/getting_started/workflows/agents/sequential_workflow_as_agent.py b/python/samples/getting_started/workflows/agents/sequential_workflow_as_agent.py new file mode 100644 index 0000000000..a50337135e --- /dev/null +++ b/python/samples/getting_started/workflows/agents/sequential_workflow_as_agent.py @@ -0,0 +1,87 @@ +# Copyright (c) Microsoft. All rights reserved. + +import asyncio + +from agent_framework import Role, SequentialBuilder +from agent_framework.azure import AzureOpenAIChatClient +from azure.identity import AzureCliCredential + +""" +Sample: Build a sequential workflow orchestration and wrap it as an agent. + +The script assembles a sequential conversation flow with `SequentialBuilder`, then +invokes the entire orchestration through the `workflow.as_agent(...)` interface so +other coordinators can reuse the chain as a single participant. + +Note on internal adapters: +- Sequential orchestration includes small adapter nodes for input normalization + ("input-conversation"), agent-response conversion ("to-conversation:"), + and completion ("complete"). These may appear as ExecutorInvoke/Completed events in + the stream—similar to how concurrent orchestration includes a dispatcher/aggregator. + You can safely ignore them when focusing on agent progress. + +Prerequisites: +- Azure OpenAI access configured for AzureOpenAIChatClient (use az login + env vars) +""" + + +async def main() -> None: + # 1) Create agents + chat_client = AzureOpenAIChatClient(credential=AzureCliCredential()) + + writer = chat_client.create_agent( + instructions=("You are a concise copywriter. Provide a single, punchy marketing sentence based on the prompt."), + name="writer", + ) + + reviewer = chat_client.create_agent( + instructions=("You are a thoughtful reviewer. Give brief feedback on the previous assistant message."), + name="reviewer", + ) + + # 2) Build sequential workflow: writer -> reviewer + workflow = SequentialBuilder().participants([writer, reviewer]).build() + + # 3) Treat the workflow itself as an agent for follow-up invocations + agent = workflow.as_agent(name="SequentialWorkflowAgent") + prompt = "Write a tagline for a budget-friendly eBike." + agent_response = await agent.run(prompt) + + if agent_response.messages: + print("\n===== Conversation =====") + for i, msg in enumerate(agent_response.messages, start=1): + role_value = getattr(msg.role, "value", msg.role) + normalized_role = str(role_value).lower() if role_value is not None else "assistant" + name = msg.author_name or ("assistant" if normalized_role == Role.ASSISTANT.value else "user") + print(f"{'-' * 60}\n{i:02d} [{name}]\n{msg.text}") + + """ + Sample Output: + + ===== Final Conversation ===== + ------------------------------------------------------------ + 01 [user] + Write a tagline for a budget-friendly eBike. + ------------------------------------------------------------ + 02 [writer] + Ride farther, spend less—your affordable eBike adventure starts here. + ------------------------------------------------------------ + 03 [reviewer] + This tagline clearly communicates affordability and the benefit of extended travel, making it + appealing to budget-conscious consumers. It has a friendly and motivating tone, though it could + be slightly shorter for more punch. Overall, a strong and effective suggestion! + + ===== as_agent() Conversation ===== + ------------------------------------------------------------ + 01 [writer] + Go electric, save big—your affordable ride awaits! + ------------------------------------------------------------ + 02 [reviewer] + Catchy and straightforward! The tagline clearly emphasizes both the electric aspect and the affordability of the + eBike. It's inviting and actionable. For even more impact, consider making it slightly shorter: + "Go electric, save big." Overall, this is an effective and appealing suggestion for a budget-friendly eBike. + """ + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/python/uv.lock b/python/uv.lock index b2633681e7..505a17b059 100644 --- a/python/uv.lock +++ b/python/uv.lock @@ -128,7 +128,7 @@ dev = [ { name = "ruff", specifier = ">=0.11.8" }, { name = "tomli" }, { name = "tomli-w" }, - { name = "uv", specifier = ">=0.8.2,<0.9.0" }, + { name = "uv", specifier = ">=0.8.2,<0.10.0" }, ] docs = [ { name = "debugpy", specifier = ">=1.8.16" },