From 39ff183fadebaba98beedc69f79c25f4a7c0be2b Mon Sep 17 00:00:00 2001 From: Gerardo Moreno Date: Wed, 4 Dec 2024 10:39:26 -0800 Subject: [PATCH 1/2] Core API example showing multiple agents concurrently (#4461) * Core API example showing multiple agents concurrently (#4427) * Apply feedback * Add different topics for closure agent example * Address feedback * Mypy fix * Modify import path based on refactoring --------- Co-authored-by: Eric Zhu --- .../design-patterns/concurrent-agents.ipynb | 403 ++++++++++++++++++ .../core-user-guide/design-patterns/index.md | 1 + 2 files changed, 404 insertions(+) create mode 100644 python/packages/autogen-core/docs/src/user-guide/core-user-guide/design-patterns/concurrent-agents.ipynb diff --git a/python/packages/autogen-core/docs/src/user-guide/core-user-guide/design-patterns/concurrent-agents.ipynb b/python/packages/autogen-core/docs/src/user-guide/core-user-guide/design-patterns/concurrent-agents.ipynb new file mode 100644 index 000000000000..6ba7efc45d65 --- /dev/null +++ b/python/packages/autogen-core/docs/src/user-guide/core-user-guide/design-patterns/concurrent-agents.ipynb @@ -0,0 +1,403 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Concurrent Agents\n", + "\n", + "In this section, we explore the use of multiple agents working concurrently. We cover three main patterns:\n", + "\n", + "1. **Single Message & Multiple Processors** \n", + " Demonstrates how a single message can be processed by multiple agents subscribed to the same topic simultaneously.\n", + "\n", + "2. **Multiple Messages & Multiple Processors** \n", + " Illustrates how specific message types can be routed to dedicated agents based on topics.\n", + "\n", + "3. **Direct Messaging** \n", + " Focuses on sending messages between agents and from the runtime to agents." + ] + }, + { + "cell_type": "code", + "execution_count": 21, + "metadata": {}, + "outputs": [], + "source": [ + "import asyncio\n", + "from dataclasses import dataclass\n", + "\n", + "from autogen_core import (\n", + " AgentId,\n", + " ClosureAgent,\n", + " ClosureContext,\n", + " DefaultTopicId,\n", + " MessageContext,\n", + " RoutedAgent,\n", + " TopicId,\n", + " TypeSubscription,\n", + " default_subscription,\n", + " message_handler,\n", + " type_subscription,\n", + ")\n", + "from autogen_core.application import SingleThreadedAgentRuntime" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "@dataclass\n", + "class Task:\n", + " task_id: str\n", + "\n", + "\n", + "@dataclass\n", + "class TaskResponse:\n", + " task_id: str\n", + " result: str" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Single Message & Multiple Processors\n", + "The first pattern shows how a single message can be processed by multiple agents simultaneously:\n", + "\n", + "- Each `Processor` agent subscribes to the default topic using the {py:meth}`~autogen_core.components.default_subscription` decorator.\n", + "- When publishing a message to the default topic, all registered agents will process the message independently.\n", + "```{note}\n", + "Below, we are subscribing `Processor` using the {py:meth}`~autogen_core.components.default_subscription` decorator, there's an alternative way to subscribe an agent without using decorators altogether as shown in [Subscribe and Publish to Topics](../framework/message-and-communication.ipynb#subscribe-and-publish-to-topics), this way the same agent class can be subscribed to different topics.\n", + "```\n" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [], + "source": [ + "@default_subscription\n", + "class Processor(RoutedAgent):\n", + " @message_handler\n", + " async def on_task(self, message: Task, ctx: MessageContext) -> None:\n", + " print(f\"{self._description} starting task {message.task_id}\")\n", + " await asyncio.sleep(2) # Simulate work\n", + " print(f\"{self._description} finished task {message.task_id}\")" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Agent 1 starting task task-1\n", + "Agent 2 starting task task-1\n", + "Agent 1 finished task task-1\n", + "Agent 2 finished task task-1\n" + ] + } + ], + "source": [ + "runtime = SingleThreadedAgentRuntime()\n", + "\n", + "await Processor.register(runtime, \"agent_1\", lambda: Processor(\"Agent 1\"))\n", + "await Processor.register(runtime, \"agent_2\", lambda: Processor(\"Agent 2\"))\n", + "\n", + "runtime.start()\n", + "\n", + "await runtime.publish_message(Task(task_id=\"task-1\"), topic_id=DefaultTopicId())\n", + "\n", + "await runtime.stop_when_idle()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Multiple messages & Multiple Processors\n", + "Second, this pattern demonstrates routing different types of messages to specific processors:\n", + "- `UrgentProcessor` subscribes to the \"urgent\" topic\n", + "- `NormalProcessor` subscribes to the \"normal\" topic\n", + "\n", + "We make an agent subscribe to a specific topic type using the {py:meth}`~autogen_core.components.type_subscription` decorator." + ] + }, + { + "cell_type": "code", + "execution_count": 50, + "metadata": {}, + "outputs": [], + "source": [ + "TASK_RESULTS_TOPIC_TYPE = \"task-results\"\n", + "task_results_topic_id = TopicId(type=TASK_RESULTS_TOPIC_TYPE, source=\"default\")\n", + "\n", + "\n", + "@type_subscription(topic_type=\"urgent\")\n", + "class UrgentProcessor(RoutedAgent):\n", + " @message_handler\n", + " async def on_task(self, message: Task, ctx: MessageContext) -> None:\n", + " print(f\"Urgent processor starting task {message.task_id}\")\n", + " await asyncio.sleep(1) # Simulate work\n", + " print(f\"Urgent processor finished task {message.task_id}\")\n", + "\n", + " task_response = TaskResponse(task_id=message.task_id, result=\"Results by Urgent Processor\")\n", + " await self.publish_message(task_response, topic_id=task_results_topic_id)\n", + "\n", + "\n", + "@type_subscription(topic_type=\"normal\")\n", + "class NormalProcessor(RoutedAgent):\n", + " @message_handler\n", + " async def on_task(self, message: Task, ctx: MessageContext) -> None:\n", + " print(f\"Normal processor starting task {message.task_id}\")\n", + " await asyncio.sleep(3) # Simulate work\n", + " print(f\"Normal processor finished task {message.task_id}\")\n", + "\n", + " task_response = TaskResponse(task_id=message.task_id, result=\"Results by Normal Processor\")\n", + " await self.publish_message(task_response, topic_id=task_results_topic_id)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "After registering the agents, we can publish messages to the \"urgent\" and \"normal\" topics:" + ] + }, + { + "cell_type": "code", + "execution_count": 51, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Normal processor starting task normal-1\n", + "Urgent processor starting task urgent-1\n", + "Urgent processor finished task urgent-1\n", + "Normal processor finished task normal-1\n" + ] + } + ], + "source": [ + "runtime = SingleThreadedAgentRuntime()\n", + "\n", + "await UrgentProcessor.register(runtime, \"urgent_processor\", lambda: UrgentProcessor(\"Urgent Processor\"))\n", + "await NormalProcessor.register(runtime, \"normal_processor\", lambda: NormalProcessor(\"Normal Processor\"))\n", + "\n", + "runtime.start()\n", + "\n", + "await runtime.publish_message(Task(task_id=\"normal-1\"), topic_id=TopicId(type=\"normal\", source=\"default\"))\n", + "await runtime.publish_message(Task(task_id=\"urgent-1\"), topic_id=TopicId(type=\"urgent\", source=\"default\"))\n", + "\n", + "await runtime.stop_when_idle()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Collecting Results" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "In the previous example, we relied on console printing to verify task completion. However, in real applications, we typically want to collect and process the results programmatically.\n", + "\n", + "To collect these messages, we'll use a {py:class}`~autogen_core.components.ClosureAgent`. We've defined a dedicated topic `TASK_RESULTS_TOPIC_TYPE` where both `UrgentProcessor` and `NormalProcessor` publish their results. The ClosureAgent will then process messages from this topic." + ] + }, + { + "cell_type": "code", + "execution_count": 52, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Normal processor starting task normal-1\n", + "Urgent processor starting task urgent-1\n", + "Urgent processor finished task urgent-1\n", + "Normal processor finished task normal-1\n" + ] + } + ], + "source": [ + "queue = asyncio.Queue[TaskResponse]()\n", + "\n", + "\n", + "async def collect_result(_agent: ClosureContext, message: TaskResponse, ctx: MessageContext) -> None:\n", + " await queue.put(message)\n", + "\n", + "\n", + "runtime.start()\n", + "\n", + "CLOSURE_AGENT_TYPE = \"collect_result_agent\"\n", + "await ClosureAgent.register_closure(\n", + " runtime,\n", + " CLOSURE_AGENT_TYPE,\n", + " collect_result,\n", + " subscriptions=lambda: [TypeSubscription(topic_type=TASK_RESULTS_TOPIC_TYPE, agent_type=CLOSURE_AGENT_TYPE)],\n", + ")\n", + "\n", + "await runtime.publish_message(Task(task_id=\"normal-1\"), topic_id=TopicId(type=\"normal\", source=\"default\"))\n", + "await runtime.publish_message(Task(task_id=\"urgent-1\"), topic_id=TopicId(type=\"urgent\", source=\"default\"))\n", + "\n", + "await runtime.stop_when_idle()" + ] + }, + { + "cell_type": "code", + "execution_count": 53, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "TaskResponse(task_id='urgent-1', result='Results by Urgent Processor')\n", + "TaskResponse(task_id='normal-1', result='Results by Normal Processor')\n" + ] + } + ], + "source": [ + "while not queue.empty():\n", + " print(await queue.get())" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Direct Messages\n", + "\n", + "In contrast to the previous patterns, this pattern focuses on direct messages. Here we demonstrate two ways to send them:\n", + "\n", + "- Direct messaging between agents \n", + "- Sending messages from the runtime to specific agents \n", + "\n", + "Things to consider in the example below:\n", + "\n", + "- Messages are addressed using the {py:class}`~autogen_core.components.AgentId`. \n", + "- The sender can expect to receive a response from the target agent. \n", + "- We register the `WorkerAgent` class only once; however, we send tasks to two different workers.\n", + " - How? As stated in [Agent lifecycle](../core-concepts/agent-identity-and-lifecycle.md#agent-lifecycle), when delivering a message using an {py:class}`~autogen_core.components.AgentId`, the runtime will either fetch the instance or create one if it doesn't exist. In this case, the runtime creates two instances of workers when sending those two messages." + ] + }, + { + "cell_type": "code", + "execution_count": 36, + "metadata": {}, + "outputs": [], + "source": [ + "class WorkerAgent(RoutedAgent):\n", + " @message_handler\n", + " async def on_task(self, message: Task, ctx: MessageContext) -> TaskResponse:\n", + " print(f\"{self.id} starting task {message.task_id}\")\n", + " await asyncio.sleep(2) # Simulate work\n", + " print(f\"{self.id} finished task {message.task_id}\")\n", + " return TaskResponse(task_id=message.task_id, result=f\"Results by {self.id}\")\n", + "\n", + "\n", + "class DelegatorAgent(RoutedAgent):\n", + " def __init__(self, description: str, worker_type: str):\n", + " super().__init__(description)\n", + " self.worker_instances = [AgentId(worker_type, f\"{worker_type}-1\"), AgentId(worker_type, f\"{worker_type}-2\")]\n", + "\n", + " @message_handler\n", + " async def on_task(self, message: Task, ctx: MessageContext) -> TaskResponse:\n", + " print(f\"Delegator received task {message.task_id}.\")\n", + "\n", + " subtask1 = Task(task_id=\"task-part-1\")\n", + " subtask2 = Task(task_id=\"task-part-2\")\n", + "\n", + " worker1_result, worker2_result = await asyncio.gather(\n", + " self.send_message(subtask1, self.worker_instances[0]), self.send_message(subtask2, self.worker_instances[1])\n", + " )\n", + "\n", + " combined_result = f\"Part 1: {worker1_result.result}, \" f\"Part 2: {worker2_result.result}\"\n", + " task_response = TaskResponse(task_id=message.task_id, result=combined_result)\n", + " return task_response" + ] + }, + { + "cell_type": "code", + "execution_count": 37, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Delegator received task main-task.\n", + "worker/worker-1 starting task task-part-1\n", + "worker/worker-2 starting task task-part-2\n", + "worker/worker-1 finished task task-part-1\n", + "worker/worker-2 finished task task-part-2\n", + "Final result: Part 1: Results by worker/worker-1, Part 2: Results by worker/worker-2\n" + ] + } + ], + "source": [ + "runtime = SingleThreadedAgentRuntime()\n", + "\n", + "await WorkerAgent.register(runtime, \"worker\", lambda: WorkerAgent(\"Worker Agent\"))\n", + "await DelegatorAgent.register(runtime, \"delegator\", lambda: DelegatorAgent(\"Delegator Agent\", \"worker\"))\n", + "\n", + "runtime.start()\n", + "\n", + "delegator = AgentId(\"delegator\", \"default\")\n", + "response = await runtime.send_message(Task(task_id=\"main-task\"), recipient=delegator)\n", + "\n", + "print(f\"Final result: {response.result}\")\n", + "await runtime.stop_when_idle()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Additional Resources" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "If you're interested in more about concurrent processing, check out the [Mixture of Agents](./mixture-of-agents.ipynb) pattern, which relies heavily on concurrent agents." + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "autogen", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.7" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/python/packages/autogen-core/docs/src/user-guide/core-user-guide/design-patterns/index.md b/python/packages/autogen-core/docs/src/user-guide/core-user-guide/design-patterns/index.md index b54917c557cc..df4da43dc49c 100644 --- a/python/packages/autogen-core/docs/src/user-guide/core-user-guide/design-patterns/index.md +++ b/python/packages/autogen-core/docs/src/user-guide/core-user-guide/design-patterns/index.md @@ -20,6 +20,7 @@ group chat for task decomposition, and reflection for robustness. ```{toctree} :maxdepth: 1 +concurrent-agents group-chat handoffs mixture-of-agents From e1d58333ee1c9b6bd077fb4ae6960953297f010b Mon Sep 17 00:00:00 2001 From: afourney Date: Wed, 4 Dec 2024 11:28:44 -0800 Subject: [PATCH 2/2] Address issue_4543 and add tests. (#4546) * Address issue_4543 and add tests. * Fixed pyright errors. --- .../agents/_code_executor_agent.py | 18 +- .../tests/test_code_executor_agent.py | 166 ++++++++++++++++++ 2 files changed, 182 insertions(+), 2 deletions(-) create mode 100644 python/packages/autogen-agentchat/tests/test_code_executor_agent.py diff --git a/python/packages/autogen-agentchat/src/autogen_agentchat/agents/_code_executor_agent.py b/python/packages/autogen-agentchat/src/autogen_agentchat/agents/_code_executor_agent.py index 4667c4a58658..7feccb14d430 100644 --- a/python/packages/autogen-agentchat/src/autogen_agentchat/agents/_code_executor_agent.py +++ b/python/packages/autogen-agentchat/src/autogen_agentchat/agents/_code_executor_agent.py @@ -81,9 +81,23 @@ async def on_messages(self, messages: Sequence[ChatMessage], cancellation_token: if code_blocks: # Execute the code blocks. result = await self._code_executor.execute_code_blocks(code_blocks, cancellation_token=cancellation_token) - return Response(chat_message=TextMessage(content=result.output, source=self.name)) + + code_output = result.output + if code_output.strip() == "": + # No output + code_output = f"The script ran but produced no output to console. The POSIX exit code was: {result.exit_code}. If you were expecting output, consider revising the script to ensure content is printed to stdout." + elif result.exit_code != 0: + # Error + code_output = f"The script ran, then exited with an error (POSIX exit code: {result.exit_code})\nIts output was:\n{result.output}" + + return Response(chat_message=TextMessage(content=code_output, source=self.name)) else: - return Response(chat_message=TextMessage(content="No code blocks found in the thread.", source=self.name)) + return Response( + chat_message=TextMessage( + content="No code blocks found in the thread. Please provide at least one markdown-encoded code block to execute (i.e., quoting code in ```python or ```sh code blocks).", + source=self.name, + ) + ) async def on_reset(self, cancellation_token: CancellationToken) -> None: """It it's a no-op as the code executor agent has no mutable state.""" diff --git a/python/packages/autogen-agentchat/tests/test_code_executor_agent.py b/python/packages/autogen-agentchat/tests/test_code_executor_agent.py new file mode 100644 index 000000000000..ca09a203b65b --- /dev/null +++ b/python/packages/autogen-agentchat/tests/test_code_executor_agent.py @@ -0,0 +1,166 @@ +import pytest +from autogen_agentchat.agents import CodeExecutorAgent +from autogen_agentchat.base import Response +from autogen_agentchat.messages import TextMessage +from autogen_core import CancellationToken +from autogen_core.components.code_executor import LocalCommandLineCodeExecutor + + +@pytest.mark.asyncio +async def test_basic_code_execution() -> None: + """Test basic code execution""" + + agent = CodeExecutorAgent(name="code_executor", code_executor=LocalCommandLineCodeExecutor()) + + messages = [ + TextMessage( + content=""" +```python +import math + +number = 42 +square_root = math.sqrt(number) +print("%0.3f" % (square_root,)) +``` +""".strip(), + source="assistant", + ) + ] + response = await agent.on_messages(messages, CancellationToken()) + + assert isinstance(response, Response) + assert isinstance(response.chat_message, TextMessage) + assert response.chat_message.content.strip() == "6.481" + assert response.chat_message.source == "code_executor" + + +@pytest.mark.asyncio +async def test_code_execution_error() -> None: + """Test basic code execution""" + + agent = CodeExecutorAgent(name="code_executor", code_executor=LocalCommandLineCodeExecutor()) + + messages = [ + TextMessage( + content=""" +```python +import math + +number = -1.0 +square_root = math.sqrt(number) +print("%0.3f" % (square_root,)) +``` +""".strip(), + source="assistant", + ) + ] + response = await agent.on_messages(messages, CancellationToken()) + + assert isinstance(response, Response) + assert isinstance(response.chat_message, TextMessage) + assert "The script ran, then exited with an error (POSIX exit code: 1)" in response.chat_message.content + assert "ValueError: math domain error" in response.chat_message.content + + +@pytest.mark.asyncio +async def test_code_execution_no_output() -> None: + """Test basic code execution""" + + agent = CodeExecutorAgent(name="code_executor", code_executor=LocalCommandLineCodeExecutor()) + + messages = [ + TextMessage( + content=""" +```python +import math + +number = 42 +square_root = math.sqrt(number) +``` +""".strip(), + source="assistant", + ) + ] + response = await agent.on_messages(messages, CancellationToken()) + + assert isinstance(response, Response) + assert isinstance(response.chat_message, TextMessage) + assert ( + "The script ran but produced no output to console. The POSIX exit code was: 0. If you were expecting output, consider revising the script to ensure content is printed to stdout." + in response.chat_message.content + ) + + +@pytest.mark.asyncio +async def test_code_execution_no_block() -> None: + """Test basic code execution""" + + agent = CodeExecutorAgent(name="code_executor", code_executor=LocalCommandLineCodeExecutor()) + + messages = [ + TextMessage( + content=""" +import math + +number = 42 +square_root = math.sqrt(number) +""".strip(), + source="assistant", + ) + ] + response = await agent.on_messages(messages, CancellationToken()) + + assert isinstance(response, Response) + assert isinstance(response.chat_message, TextMessage) + assert ( + "No code blocks found in the thread. Please provide at least one markdown-encoded code block" + in response.chat_message.content + ) + + +@pytest.mark.asyncio +async def test_code_execution_multiple_blocks() -> None: + """Test basic code execution""" + + agent = CodeExecutorAgent(name="code_executor", code_executor=LocalCommandLineCodeExecutor()) + + messages = [ + TextMessage( + content=""" +```python +import math + +number = 42 +square_root = math.sqrt(number) +print("%0.3f" % (square_root,)) +``` + +And also: + +```python +import time +print(f"The current time is: {time.time()}") + +``` + +And this should result in an error: +```python +import math + +number = -1.0 +square_root = math.sqrt(number) +print("%0.3f" % (square_root,)) +``` + +""".strip(), + source="assistant", + ) + ] + response = await agent.on_messages(messages, CancellationToken()) + + assert isinstance(response, Response) + assert isinstance(response.chat_message, TextMessage) + assert "6.481" in response.chat_message.content + assert "The current time is:" in response.chat_message.content + assert "The script ran, then exited with an error (POSIX exit code: 1)" in response.chat_message.content + assert "ValueError: math domain error" in response.chat_message.content