From 1b63f3c78e00bbd53bac7bb0651126250c40534a Mon Sep 17 00:00:00 2001 From: Jack Gerrits Date: Mon, 30 Dec 2024 15:24:47 -0500 Subject: [PATCH 1/2] Move intervention to root --- .../termination-with-intervention.ipynb | 354 +++++++++--------- .../cookbook/tool-use-with-intervention.ipynb | 3 +- .../samples/slow_human_in_loop.py | 2 +- .../autogen-core/src/autogen_core/__init__.py | 8 + .../src/autogen_core/_intervention.py | 41 ++ .../src/autogen_core/base/intervention.py | 43 +-- .../autogen-core/tests/test_intervention.py | 3 +- 7 files changed, 240 insertions(+), 214 deletions(-) create mode 100644 python/packages/autogen-core/src/autogen_core/_intervention.py diff --git a/python/packages/autogen-core/docs/src/user-guide/core-user-guide/cookbook/termination-with-intervention.ipynb b/python/packages/autogen-core/docs/src/user-guide/core-user-guide/cookbook/termination-with-intervention.ipynb index 1d39f0d9910d..4bed96d6b178 100644 --- a/python/packages/autogen-core/docs/src/user-guide/core-user-guide/cookbook/termination-with-intervention.ipynb +++ b/python/packages/autogen-core/docs/src/user-guide/core-user-guide/cookbook/termination-with-intervention.ipynb @@ -1,179 +1,179 @@ { - "cells": [ - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# Termination using Intervention Handler\n", - "\n", - "```{note}\n", - "This method is valid when using {py:class}`~autogen_core.SingleThreadedAgentRuntime`.\n", - "```\n", - "\n", - "There are many different ways to handle termination in `autogen_core`. Ultimately, the goal is to detect that the runtime no longer needs to be executed and you can proceed to finalization tasks. One way to do this is to use an {py:class}`autogen_core.base.intervention.InterventionHandler` to detect a termination message and then act on it." - ] - }, - { - "cell_type": "code", - "execution_count": 6, - "metadata": {}, - "outputs": [], - "source": [ - "from dataclasses import dataclass\n", - "from typing import Any\n", - "\n", - "from autogen_core import (\n", - " AgentId,\n", - " DefaultTopicId,\n", - " MessageContext,\n", - " RoutedAgent,\n", - " SingleThreadedAgentRuntime,\n", - " default_subscription,\n", - " message_handler,\n", - ")\n", - "from autogen_core.base.intervention import DefaultInterventionHandler" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "First, we define a dataclass for regular message and message that will be used to signal termination." - ] - }, - { - "cell_type": "code", - "execution_count": 7, - "metadata": {}, - "outputs": [], - "source": [ - "@dataclass\n", - "class Message:\n", - " content: Any\n", - "\n", - "\n", - "@dataclass\n", - "class Termination:\n", - " reason: str" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "We code our agent to publish a termination message when it decides it is time to terminate." - ] - }, - { - "cell_type": "code", - "execution_count": 8, - "metadata": {}, - "outputs": [], - "source": [ - "@default_subscription\n", - "class AnAgent(RoutedAgent):\n", - " def __init__(self) -> None:\n", - " super().__init__(\"MyAgent\")\n", - " self.received = 0\n", - "\n", - " @message_handler\n", - " async def on_new_message(self, message: Message, ctx: MessageContext) -> None:\n", - " self.received += 1\n", - " if self.received > 3:\n", - " await self.publish_message(Termination(reason=\"Reached maximum number of messages\"), DefaultTopicId())" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Next, we create an InterventionHandler that will detect the termination message and act on it. This one hooks into publishes and when it encounters `Termination` it alters its internal state to indicate that termination has been requested." - ] - }, - { - "cell_type": "code", - "execution_count": 9, - "metadata": {}, - "outputs": [], - "source": [ - "class TerminationHandler(DefaultInterventionHandler):\n", - " def __init__(self) -> None:\n", - " self._termination_value: Termination | None = None\n", - "\n", - " async def on_publish(self, message: Any, *, sender: AgentId | None) -> Any:\n", - " if isinstance(message, Termination):\n", - " self._termination_value = message\n", - " return message\n", - "\n", - " @property\n", - " def termination_value(self) -> Termination | None:\n", - " return self._termination_value\n", - "\n", - " @property\n", - " def has_terminated(self) -> bool:\n", - " return self._termination_value is not None" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Finally, we add this handler to the runtime and use it to detect termination and stop the runtime when the termination message is received." - ] - }, - { - "cell_type": "code", - "execution_count": 10, - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Termination(reason='Reached maximum number of messages')\n" - ] - } - ], - "source": [ - "termination_handler = TerminationHandler()\n", - "runtime = SingleThreadedAgentRuntime(intervention_handlers=[termination_handler])\n", - "\n", - "await AnAgent.register(runtime, \"my_agent\", AnAgent)\n", - "\n", - "runtime.start()\n", - "\n", - "# Publish more than 3 messages to trigger termination.\n", - "await runtime.publish_message(Message(\"hello\"), DefaultTopicId())\n", - "await runtime.publish_message(Message(\"hello\"), DefaultTopicId())\n", - "await runtime.publish_message(Message(\"hello\"), DefaultTopicId())\n", - "await runtime.publish_message(Message(\"hello\"), DefaultTopicId())\n", - "\n", - "# Wait for termination.\n", - "await runtime.stop_when(lambda: termination_handler.has_terminated)\n", - "\n", - "print(termination_handler.termination_value)" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": ".venv", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.11.9" - } - }, - "nbformat": 4, - "nbformat_minor": 2 + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Termination using Intervention Handler\n", + "\n", + "```{note}\n", + "This method is valid when using {py:class}`~autogen_core.SingleThreadedAgentRuntime`.\n", + "```\n", + "\n", + "There are many different ways to handle termination in `autogen_core`. Ultimately, the goal is to detect that the runtime no longer needs to be executed and you can proceed to finalization tasks. One way to do this is to use an {py:class}`autogen_core.base.intervention.InterventionHandler` to detect a termination message and then act on it." + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [], + "source": [ + "from dataclasses import dataclass\n", + "from typing import Any\n", + "\n", + "from autogen_core import (\n", + " AgentId,\n", + " DefaultInterventionHandler,\n", + " DefaultTopicId,\n", + " MessageContext,\n", + " RoutedAgent,\n", + " SingleThreadedAgentRuntime,\n", + " default_subscription,\n", + " message_handler,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "First, we define a dataclass for regular message and message that will be used to signal termination." + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [], + "source": [ + "@dataclass\n", + "class Message:\n", + " content: Any\n", + "\n", + "\n", + "@dataclass\n", + "class Termination:\n", + " reason: str" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We code our agent to publish a termination message when it decides it is time to terminate." + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [], + "source": [ + "@default_subscription\n", + "class AnAgent(RoutedAgent):\n", + " def __init__(self) -> None:\n", + " super().__init__(\"MyAgent\")\n", + " self.received = 0\n", + "\n", + " @message_handler\n", + " async def on_new_message(self, message: Message, ctx: MessageContext) -> None:\n", + " self.received += 1\n", + " if self.received > 3:\n", + " await self.publish_message(Termination(reason=\"Reached maximum number of messages\"), DefaultTopicId())" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Next, we create an InterventionHandler that will detect the termination message and act on it. This one hooks into publishes and when it encounters `Termination` it alters its internal state to indicate that termination has been requested." + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [], + "source": [ + "class TerminationHandler(DefaultInterventionHandler):\n", + " def __init__(self) -> None:\n", + " self._termination_value: Termination | None = None\n", + "\n", + " async def on_publish(self, message: Any, *, sender: AgentId | None) -> Any:\n", + " if isinstance(message, Termination):\n", + " self._termination_value = message\n", + " return message\n", + "\n", + " @property\n", + " def termination_value(self) -> Termination | None:\n", + " return self._termination_value\n", + "\n", + " @property\n", + " def has_terminated(self) -> bool:\n", + " return self._termination_value is not None" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Finally, we add this handler to the runtime and use it to detect termination and stop the runtime when the termination message is received." + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Termination(reason='Reached maximum number of messages')\n" + ] + } + ], + "source": [ + "termination_handler = TerminationHandler()\n", + "runtime = SingleThreadedAgentRuntime(intervention_handlers=[termination_handler])\n", + "\n", + "await AnAgent.register(runtime, \"my_agent\", AnAgent)\n", + "\n", + "runtime.start()\n", + "\n", + "# Publish more than 3 messages to trigger termination.\n", + "await runtime.publish_message(Message(\"hello\"), DefaultTopicId())\n", + "await runtime.publish_message(Message(\"hello\"), DefaultTopicId())\n", + "await runtime.publish_message(Message(\"hello\"), DefaultTopicId())\n", + "await runtime.publish_message(Message(\"hello\"), DefaultTopicId())\n", + "\n", + "# Wait for termination.\n", + "await runtime.stop_when(lambda: termination_handler.has_terminated)\n", + "\n", + "print(termination_handler.termination_value)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.9" + } + }, + "nbformat": 4, + "nbformat_minor": 2 } diff --git a/python/packages/autogen-core/docs/src/user-guide/core-user-guide/cookbook/tool-use-with-intervention.ipynb b/python/packages/autogen-core/docs/src/user-guide/core-user-guide/cookbook/tool-use-with-intervention.ipynb index 46b7595090ab..37def894b190 100644 --- a/python/packages/autogen-core/docs/src/user-guide/core-user-guide/cookbook/tool-use-with-intervention.ipynb +++ b/python/packages/autogen-core/docs/src/user-guide/core-user-guide/cookbook/tool-use-with-intervention.ipynb @@ -22,13 +22,14 @@ "from autogen_core import (\n", " AgentId,\n", " AgentType,\n", + " DefaultInterventionHandler,\n", + " DropMessage,\n", " FunctionCall,\n", " MessageContext,\n", " RoutedAgent,\n", " SingleThreadedAgentRuntime,\n", " message_handler,\n", ")\n", - "from autogen_core.base.intervention import DefaultInterventionHandler, DropMessage\n", "from autogen_core.models import (\n", " ChatCompletionClient,\n", " LLMMessage,\n", diff --git a/python/packages/autogen-core/samples/slow_human_in_loop.py b/python/packages/autogen-core/samples/slow_human_in_loop.py index 61ea36fda890..eb4e627bbf2f 100644 --- a/python/packages/autogen-core/samples/slow_human_in_loop.py +++ b/python/packages/autogen-core/samples/slow_human_in_loop.py @@ -33,6 +33,7 @@ from autogen_core import ( AgentId, CancellationToken, + DefaultInterventionHandler, DefaultTopicId, FunctionCall, MessageContext, @@ -41,7 +42,6 @@ message_handler, type_subscription, ) -from autogen_core.base.intervention import DefaultInterventionHandler from autogen_core.model_context import BufferedChatCompletionContext from autogen_core.models import ( AssistantMessage, diff --git a/python/packages/autogen-core/src/autogen_core/__init__.py b/python/packages/autogen-core/src/autogen_core/__init__.py index 9523476e47e6..c9d12872dd9d 100644 --- a/python/packages/autogen-core/src/autogen_core/__init__.py +++ b/python/packages/autogen-core/src/autogen_core/__init__.py @@ -31,6 +31,11 @@ from ._default_subscription import DefaultSubscription, default_subscription, type_subscription from ._default_topic import DefaultTopicId from ._image import Image +from ._intervention import ( + DefaultInterventionHandler, + DropMessage, + InterventionHandler, +) from ._message_context import MessageContext from ._message_handler_context import MessageHandlerContext from ._routed_agent import RoutedAgent, event, message_handler, rpc @@ -111,4 +116,7 @@ "ComponentConfigImpl", "ComponentModel", "ComponentType", + "DropMessage", + "InterventionHandler", + "DefaultInterventionHandler", ] diff --git a/python/packages/autogen-core/src/autogen_core/_intervention.py b/python/packages/autogen-core/src/autogen_core/_intervention.py new file mode 100644 index 000000000000..649026fea2d3 --- /dev/null +++ b/python/packages/autogen-core/src/autogen_core/_intervention.py @@ -0,0 +1,41 @@ +from typing import Any, Protocol, final + +from ._agent_id import AgentId + +__all__ = [ + "DropMessage", + "InterventionHandler", + "DefaultInterventionHandler", +] + + +@final +class DropMessage: ... + + +class InterventionHandler(Protocol): + """An intervention handler is a class that can be used to modify, log or drop messages that are being processed by the :class:`autogen_core.base.AgentRuntime`. + + Note: Returning None from any of the intervention handler methods will result in a warning being issued and treated as "no change". If you intend to drop a message, you should return :class:`DropMessage` explicitly. + """ + + async def on_send(self, message: Any, *, sender: AgentId | None, recipient: AgentId) -> Any | type[DropMessage]: ... + async def on_publish(self, message: Any, *, sender: AgentId | None) -> Any | type[DropMessage]: ... + async def on_response( + self, message: Any, *, sender: AgentId, recipient: AgentId | None + ) -> Any | type[DropMessage]: ... + + +class DefaultInterventionHandler(InterventionHandler): + """Simple class that provides a default implementation for all intervention + handler methods, that simply returns the message unchanged. Allows for easy + subclassing to override only the desired methods.""" + + async def on_send(self, message: Any, *, sender: AgentId | None, recipient: AgentId) -> Any | type[DropMessage]: + return message + + async def on_publish(self, message: Any, *, sender: AgentId | None) -> Any | type[DropMessage]: + return message + + async def on_response(self, message: Any, *, sender: AgentId, recipient: AgentId | None) -> Any | type[DropMessage]: + return message diff --git a/python/packages/autogen-core/src/autogen_core/base/intervention.py b/python/packages/autogen-core/src/autogen_core/base/intervention.py index 5fe337b8776d..aa4a388a3ded 100644 --- a/python/packages/autogen-core/src/autogen_core/base/intervention.py +++ b/python/packages/autogen-core/src/autogen_core/base/intervention.py @@ -1,45 +1,22 @@ -from typing import Any, Awaitable, Callable, Protocol, final +from typing_extensions import deprecated -from .._agent_id import AgentId +from .._intervention import DefaultInterventionHandler as DefaultInterventionHandlerAlias +from .._intervention import DropMessage as DropMessageAlias +from .._intervention import InterventionHandler as InterventionHandlerAliass __all__ = [ "DropMessage", - "InterventionFunction", "InterventionHandler", "DefaultInterventionHandler", ] +# Final so can't inherit and deprecate +DropMessage = DropMessageAlias -@final -class DropMessage: ... +@deprecated("Moved to autogen_core.InterventionHandler. Will remove this in 0.4.0.", stacklevel=2) +class InterventionHandler(InterventionHandlerAliass): ... -InterventionFunction = Callable[[Any], Any | Awaitable[type[DropMessage]]] - -class InterventionHandler(Protocol): - """An intervention handler is a class that can be used to modify, log or drop messages that are being processed by the :class:`autogen_core.base.AgentRuntime`. - - Note: Returning None from any of the intervention handler methods will result in a warning being issued and treated as "no change". If you intend to drop a message, you should return :class:`DropMessage` explicitly. - """ - - async def on_send(self, message: Any, *, sender: AgentId | None, recipient: AgentId) -> Any | type[DropMessage]: ... - async def on_publish(self, message: Any, *, sender: AgentId | None) -> Any | type[DropMessage]: ... - async def on_response( - self, message: Any, *, sender: AgentId, recipient: AgentId | None - ) -> Any | type[DropMessage]: ... - - -class DefaultInterventionHandler(InterventionHandler): - """Simple class that provides a default implementation for all intervention - handler methods, that simply returns the message unchanged. Allows for easy - subclassing to override only the desired methods.""" - - async def on_send(self, message: Any, *, sender: AgentId | None, recipient: AgentId) -> Any | type[DropMessage]: - return message - - async def on_publish(self, message: Any, *, sender: AgentId | None) -> Any | type[DropMessage]: - return message - - async def on_response(self, message: Any, *, sender: AgentId, recipient: AgentId | None) -> Any | type[DropMessage]: - return message +@deprecated("Moved to autogen_core.DefaultInterventionHandler. Will remove this in 0.4.0.", stacklevel=2) +class DefaultInterventionHandler(DefaultInterventionHandlerAlias): ... diff --git a/python/packages/autogen-core/tests/test_intervention.py b/python/packages/autogen-core/tests/test_intervention.py index a046201feff3..ef6ee4ebfb70 100644 --- a/python/packages/autogen-core/tests/test_intervention.py +++ b/python/packages/autogen-core/tests/test_intervention.py @@ -1,6 +1,5 @@ import pytest -from autogen_core import AgentId, SingleThreadedAgentRuntime -from autogen_core.base.intervention import DefaultInterventionHandler, DropMessage +from autogen_core import AgentId, DefaultInterventionHandler, DropMessage, SingleThreadedAgentRuntime from autogen_core.exceptions import MessageDroppedException from autogen_test_utils import LoopbackAgent, MessageType From bf9f217531cfa47f334bd252e4b280bde0fb3ea6 Mon Sep 17 00:00:00 2001 From: Jack Gerrits Date: Mon, 30 Dec 2024 15:32:50 -0500 Subject: [PATCH 2/2] usage --- .../src/autogen_core/_single_threaded_agent_runtime.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/packages/autogen-core/src/autogen_core/_single_threaded_agent_runtime.py b/python/packages/autogen-core/src/autogen_core/_single_threaded_agent_runtime.py index a00e531fc962..37c84ca84b1f 100644 --- a/python/packages/autogen-core/src/autogen_core/_single_threaded_agent_runtime.py +++ b/python/packages/autogen-core/src/autogen_core/_single_threaded_agent_runtime.py @@ -28,6 +28,7 @@ from ._agent_runtime import AgentRuntime from ._agent_type import AgentType from ._cancellation_token import CancellationToken +from ._intervention import DropMessage, InterventionHandler from ._message_context import MessageContext from ._message_handler_context import MessageHandlerContext from ._runtime_impl_helpers import SubscriptionManager, get_impl @@ -35,7 +36,6 @@ from ._subscription import Subscription from ._telemetry import EnvelopeMetadata, MessageRuntimeTracingConfig, TraceHelper, get_telemetry_envelope_metadata from ._topic import TopicId -from .base.intervention import DropMessage, InterventionHandler from .exceptions import MessageDroppedException logger = logging.getLogger("autogen_core")