Skip to content

Commit

Permalink
Move intervention objects to root module (#4859)
Browse files Browse the repository at this point in the history
* Move intervention to root

* usage
  • Loading branch information
jackgerrits authored Dec 30, 2024
1 parent 0569689 commit c58eb9d
Show file tree
Hide file tree
Showing 8 changed files with 241 additions and 215 deletions.
Original file line number Diff line number Diff line change
@@ -1,179 +1,179 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Termination using Intervention Handler\n",
"\n",
"```{note}\n",
"This method is valid when using {py:class}`~autogen_core.SingleThreadedAgentRuntime`.\n",
"```\n",
"\n",
"There are many different ways to handle termination in `autogen_core`. Ultimately, the goal is to detect that the runtime no longer needs to be executed and you can proceed to finalization tasks. One way to do this is to use an {py:class}`autogen_core.base.intervention.InterventionHandler` to detect a termination message and then act on it."
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"from dataclasses import dataclass\n",
"from typing import Any\n",
"\n",
"from autogen_core import (\n",
" AgentId,\n",
" DefaultTopicId,\n",
" MessageContext,\n",
" RoutedAgent,\n",
" SingleThreadedAgentRuntime,\n",
" default_subscription,\n",
" message_handler,\n",
")\n",
"from autogen_core.base.intervention import DefaultInterventionHandler"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"First, we define a dataclass for regular message and message that will be used to signal termination."
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
"@dataclass\n",
"class Message:\n",
" content: Any\n",
"\n",
"\n",
"@dataclass\n",
"class Termination:\n",
" reason: str"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We code our agent to publish a termination message when it decides it is time to terminate."
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [],
"source": [
"@default_subscription\n",
"class AnAgent(RoutedAgent):\n",
" def __init__(self) -> None:\n",
" super().__init__(\"MyAgent\")\n",
" self.received = 0\n",
"\n",
" @message_handler\n",
" async def on_new_message(self, message: Message, ctx: MessageContext) -> None:\n",
" self.received += 1\n",
" if self.received > 3:\n",
" await self.publish_message(Termination(reason=\"Reached maximum number of messages\"), DefaultTopicId())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Next, we create an InterventionHandler that will detect the termination message and act on it. This one hooks into publishes and when it encounters `Termination` it alters its internal state to indicate that termination has been requested."
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [],
"source": [
"class TerminationHandler(DefaultInterventionHandler):\n",
" def __init__(self) -> None:\n",
" self._termination_value: Termination | None = None\n",
"\n",
" async def on_publish(self, message: Any, *, sender: AgentId | None) -> Any:\n",
" if isinstance(message, Termination):\n",
" self._termination_value = message\n",
" return message\n",
"\n",
" @property\n",
" def termination_value(self) -> Termination | None:\n",
" return self._termination_value\n",
"\n",
" @property\n",
" def has_terminated(self) -> bool:\n",
" return self._termination_value is not None"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Finally, we add this handler to the runtime and use it to detect termination and stop the runtime when the termination message is received."
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Termination(reason='Reached maximum number of messages')\n"
]
}
],
"source": [
"termination_handler = TerminationHandler()\n",
"runtime = SingleThreadedAgentRuntime(intervention_handlers=[termination_handler])\n",
"\n",
"await AnAgent.register(runtime, \"my_agent\", AnAgent)\n",
"\n",
"runtime.start()\n",
"\n",
"# Publish more than 3 messages to trigger termination.\n",
"await runtime.publish_message(Message(\"hello\"), DefaultTopicId())\n",
"await runtime.publish_message(Message(\"hello\"), DefaultTopicId())\n",
"await runtime.publish_message(Message(\"hello\"), DefaultTopicId())\n",
"await runtime.publish_message(Message(\"hello\"), DefaultTopicId())\n",
"\n",
"# Wait for termination.\n",
"await runtime.stop_when(lambda: termination_handler.has_terminated)\n",
"\n",
"print(termination_handler.termination_value)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": ".venv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.9"
}
},
"nbformat": 4,
"nbformat_minor": 2
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Termination using Intervention Handler\n",
"\n",
"```{note}\n",
"This method is valid when using {py:class}`~autogen_core.SingleThreadedAgentRuntime`.\n",
"```\n",
"\n",
"There are many different ways to handle termination in `autogen_core`. Ultimately, the goal is to detect that the runtime no longer needs to be executed and you can proceed to finalization tasks. One way to do this is to use an {py:class}`autogen_core.base.intervention.InterventionHandler` to detect a termination message and then act on it."
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"from dataclasses import dataclass\n",
"from typing import Any\n",
"\n",
"from autogen_core import (\n",
" AgentId,\n",
" DefaultInterventionHandler,\n",
" DefaultTopicId,\n",
" MessageContext,\n",
" RoutedAgent,\n",
" SingleThreadedAgentRuntime,\n",
" default_subscription,\n",
" message_handler,\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"First, we define a dataclass for regular message and message that will be used to signal termination."
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
"@dataclass\n",
"class Message:\n",
" content: Any\n",
"\n",
"\n",
"@dataclass\n",
"class Termination:\n",
" reason: str"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We code our agent to publish a termination message when it decides it is time to terminate."
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [],
"source": [
"@default_subscription\n",
"class AnAgent(RoutedAgent):\n",
" def __init__(self) -> None:\n",
" super().__init__(\"MyAgent\")\n",
" self.received = 0\n",
"\n",
" @message_handler\n",
" async def on_new_message(self, message: Message, ctx: MessageContext) -> None:\n",
" self.received += 1\n",
" if self.received > 3:\n",
" await self.publish_message(Termination(reason=\"Reached maximum number of messages\"), DefaultTopicId())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Next, we create an InterventionHandler that will detect the termination message and act on it. This one hooks into publishes and when it encounters `Termination` it alters its internal state to indicate that termination has been requested."
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [],
"source": [
"class TerminationHandler(DefaultInterventionHandler):\n",
" def __init__(self) -> None:\n",
" self._termination_value: Termination | None = None\n",
"\n",
" async def on_publish(self, message: Any, *, sender: AgentId | None) -> Any:\n",
" if isinstance(message, Termination):\n",
" self._termination_value = message\n",
" return message\n",
"\n",
" @property\n",
" def termination_value(self) -> Termination | None:\n",
" return self._termination_value\n",
"\n",
" @property\n",
" def has_terminated(self) -> bool:\n",
" return self._termination_value is not None"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Finally, we add this handler to the runtime and use it to detect termination and stop the runtime when the termination message is received."
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Termination(reason='Reached maximum number of messages')\n"
]
}
],
"source": [
"termination_handler = TerminationHandler()\n",
"runtime = SingleThreadedAgentRuntime(intervention_handlers=[termination_handler])\n",
"\n",
"await AnAgent.register(runtime, \"my_agent\", AnAgent)\n",
"\n",
"runtime.start()\n",
"\n",
"# Publish more than 3 messages to trigger termination.\n",
"await runtime.publish_message(Message(\"hello\"), DefaultTopicId())\n",
"await runtime.publish_message(Message(\"hello\"), DefaultTopicId())\n",
"await runtime.publish_message(Message(\"hello\"), DefaultTopicId())\n",
"await runtime.publish_message(Message(\"hello\"), DefaultTopicId())\n",
"\n",
"# Wait for termination.\n",
"await runtime.stop_when(lambda: termination_handler.has_terminated)\n",
"\n",
"print(termination_handler.termination_value)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": ".venv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.9"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@
"from autogen_core import (\n",
" AgentId,\n",
" AgentType,\n",
" DefaultInterventionHandler,\n",
" DropMessage,\n",
" FunctionCall,\n",
" MessageContext,\n",
" RoutedAgent,\n",
" SingleThreadedAgentRuntime,\n",
" message_handler,\n",
")\n",
"from autogen_core.base.intervention import DefaultInterventionHandler, DropMessage\n",
"from autogen_core.models import (\n",
" ChatCompletionClient,\n",
" LLMMessage,\n",
Expand Down
2 changes: 1 addition & 1 deletion python/packages/autogen-core/samples/slow_human_in_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from autogen_core import (
AgentId,
CancellationToken,
DefaultInterventionHandler,
DefaultTopicId,
FunctionCall,
MessageContext,
Expand All @@ -41,7 +42,6 @@
message_handler,
type_subscription,
)
from autogen_core.base.intervention import DefaultInterventionHandler
from autogen_core.model_context import BufferedChatCompletionContext
from autogen_core.models import (
AssistantMessage,
Expand Down
Loading

0 comments on commit c58eb9d

Please sign in to comment.