Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move intervention objects to root module #4859

Merged
merged 3 commits into from
Dec 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,179 +1,179 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Termination using Intervention Handler\n",
"\n",
"```{note}\n",
"This method is valid when using {py:class}`~autogen_core.SingleThreadedAgentRuntime`.\n",
"```\n",
"\n",
"There are many different ways to handle termination in `autogen_core`. Ultimately, the goal is to detect that the runtime no longer needs to be executed and you can proceed to finalization tasks. One way to do this is to use an {py:class}`autogen_core.base.intervention.InterventionHandler` to detect a termination message and then act on it."
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"from dataclasses import dataclass\n",
"from typing import Any\n",
"\n",
"from autogen_core import (\n",
" AgentId,\n",
" DefaultTopicId,\n",
" MessageContext,\n",
" RoutedAgent,\n",
" SingleThreadedAgentRuntime,\n",
" default_subscription,\n",
" message_handler,\n",
")\n",
"from autogen_core.base.intervention import DefaultInterventionHandler"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"First, we define a dataclass for regular message and message that will be used to signal termination."
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
"@dataclass\n",
"class Message:\n",
" content: Any\n",
"\n",
"\n",
"@dataclass\n",
"class Termination:\n",
" reason: str"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We code our agent to publish a termination message when it decides it is time to terminate."
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [],
"source": [
"@default_subscription\n",
"class AnAgent(RoutedAgent):\n",
" def __init__(self) -> None:\n",
" super().__init__(\"MyAgent\")\n",
" self.received = 0\n",
"\n",
" @message_handler\n",
" async def on_new_message(self, message: Message, ctx: MessageContext) -> None:\n",
" self.received += 1\n",
" if self.received > 3:\n",
" await self.publish_message(Termination(reason=\"Reached maximum number of messages\"), DefaultTopicId())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Next, we create an InterventionHandler that will detect the termination message and act on it. This one hooks into publishes and when it encounters `Termination` it alters its internal state to indicate that termination has been requested."
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [],
"source": [
"class TerminationHandler(DefaultInterventionHandler):\n",
" def __init__(self) -> None:\n",
" self._termination_value: Termination | None = None\n",
"\n",
" async def on_publish(self, message: Any, *, sender: AgentId | None) -> Any:\n",
" if isinstance(message, Termination):\n",
" self._termination_value = message\n",
" return message\n",
"\n",
" @property\n",
" def termination_value(self) -> Termination | None:\n",
" return self._termination_value\n",
"\n",
" @property\n",
" def has_terminated(self) -> bool:\n",
" return self._termination_value is not None"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Finally, we add this handler to the runtime and use it to detect termination and stop the runtime when the termination message is received."
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Termination(reason='Reached maximum number of messages')\n"
]
}
],
"source": [
"termination_handler = TerminationHandler()\n",
"runtime = SingleThreadedAgentRuntime(intervention_handlers=[termination_handler])\n",
"\n",
"await AnAgent.register(runtime, \"my_agent\", AnAgent)\n",
"\n",
"runtime.start()\n",
"\n",
"# Publish more than 3 messages to trigger termination.\n",
"await runtime.publish_message(Message(\"hello\"), DefaultTopicId())\n",
"await runtime.publish_message(Message(\"hello\"), DefaultTopicId())\n",
"await runtime.publish_message(Message(\"hello\"), DefaultTopicId())\n",
"await runtime.publish_message(Message(\"hello\"), DefaultTopicId())\n",
"\n",
"# Wait for termination.\n",
"await runtime.stop_when(lambda: termination_handler.has_terminated)\n",
"\n",
"print(termination_handler.termination_value)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": ".venv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.9"
}
},
"nbformat": 4,
"nbformat_minor": 2
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Termination using Intervention Handler\n",
"\n",
"```{note}\n",
"This method is valid when using {py:class}`~autogen_core.SingleThreadedAgentRuntime`.\n",
"```\n",
"\n",
"There are many different ways to handle termination in `autogen_core`. Ultimately, the goal is to detect that the runtime no longer needs to be executed and you can proceed to finalization tasks. One way to do this is to use an {py:class}`autogen_core.base.intervention.InterventionHandler` to detect a termination message and then act on it."
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"from dataclasses import dataclass\n",
"from typing import Any\n",
"\n",
"from autogen_core import (\n",
" AgentId,\n",
" DefaultInterventionHandler,\n",
" DefaultTopicId,\n",
" MessageContext,\n",
" RoutedAgent,\n",
" SingleThreadedAgentRuntime,\n",
" default_subscription,\n",
" message_handler,\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"First, we define a dataclass for regular message and message that will be used to signal termination."
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
"@dataclass\n",
"class Message:\n",
" content: Any\n",
"\n",
"\n",
"@dataclass\n",
"class Termination:\n",
" reason: str"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We code our agent to publish a termination message when it decides it is time to terminate."
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [],
"source": [
"@default_subscription\n",
"class AnAgent(RoutedAgent):\n",
" def __init__(self) -> None:\n",
" super().__init__(\"MyAgent\")\n",
" self.received = 0\n",
"\n",
" @message_handler\n",
" async def on_new_message(self, message: Message, ctx: MessageContext) -> None:\n",
" self.received += 1\n",
" if self.received > 3:\n",
" await self.publish_message(Termination(reason=\"Reached maximum number of messages\"), DefaultTopicId())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Next, we create an InterventionHandler that will detect the termination message and act on it. This one hooks into publishes and when it encounters `Termination` it alters its internal state to indicate that termination has been requested."
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [],
"source": [
"class TerminationHandler(DefaultInterventionHandler):\n",
" def __init__(self) -> None:\n",
" self._termination_value: Termination | None = None\n",
"\n",
" async def on_publish(self, message: Any, *, sender: AgentId | None) -> Any:\n",
" if isinstance(message, Termination):\n",
" self._termination_value = message\n",
" return message\n",
"\n",
" @property\n",
" def termination_value(self) -> Termination | None:\n",
" return self._termination_value\n",
"\n",
" @property\n",
" def has_terminated(self) -> bool:\n",
" return self._termination_value is not None"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Finally, we add this handler to the runtime and use it to detect termination and stop the runtime when the termination message is received."
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Termination(reason='Reached maximum number of messages')\n"
]
}
],
"source": [
"termination_handler = TerminationHandler()\n",
"runtime = SingleThreadedAgentRuntime(intervention_handlers=[termination_handler])\n",
"\n",
"await AnAgent.register(runtime, \"my_agent\", AnAgent)\n",
"\n",
"runtime.start()\n",
"\n",
"# Publish more than 3 messages to trigger termination.\n",
"await runtime.publish_message(Message(\"hello\"), DefaultTopicId())\n",
"await runtime.publish_message(Message(\"hello\"), DefaultTopicId())\n",
"await runtime.publish_message(Message(\"hello\"), DefaultTopicId())\n",
"await runtime.publish_message(Message(\"hello\"), DefaultTopicId())\n",
"\n",
"# Wait for termination.\n",
"await runtime.stop_when(lambda: termination_handler.has_terminated)\n",
"\n",
"print(termination_handler.termination_value)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": ".venv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.9"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@
"from autogen_core import (\n",
" AgentId,\n",
" AgentType,\n",
" DefaultInterventionHandler,\n",
" DropMessage,\n",
" FunctionCall,\n",
" MessageContext,\n",
" RoutedAgent,\n",
" SingleThreadedAgentRuntime,\n",
" message_handler,\n",
")\n",
"from autogen_core.base.intervention import DefaultInterventionHandler, DropMessage\n",
"from autogen_core.models import (\n",
" ChatCompletionClient,\n",
" LLMMessage,\n",
Expand Down
2 changes: 1 addition & 1 deletion python/packages/autogen-core/samples/slow_human_in_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from autogen_core import (
AgentId,
CancellationToken,
DefaultInterventionHandler,
DefaultTopicId,
FunctionCall,
MessageContext,
Expand All @@ -41,7 +42,6 @@
message_handler,
type_subscription,
)
from autogen_core.base.intervention import DefaultInterventionHandler
from autogen_core.model_context import BufferedChatCompletionContext
from autogen_core.models import (
AssistantMessage,
Expand Down
Loading
Loading