Skip to content

Commit 1e49aee

Browse files
authored
Add group chat pattern, create separate folder for patterns (#117)
* add tool use example; refactor example directory * update * add more examples * fix * fix * doc * move * add group chat example, create patterns folder
1 parent 2ab3ce4 commit 1e49aee

7 files changed

+212
-11
lines changed

python/examples/README.md

+11-5
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,7 @@ agents, runtime, and message passing APIs.
99

1010
- [`one_agent_direct.py`](core/one_agent_direct.py): A simple example of how to create a single agent powered by ChatCompletion model client. Communicate with the agent using async direct messaging API.
1111
- [`inner_outer_direct.py`](core/inner_outer_direct.py): A simple example of how to create an agent that calls an inner agent using async direct messaging API.
12-
- [`two_agents_pub_sub.py`](core/two_agents_pub_sub.py): An example of how to create two agents that communicate using publish-subscribe API.
13-
- [`mixture_of_agents_direct.py`](core/mixture_of_agents_direct.py): An example of how to create a [mixture of agents](https://github.com/togethercomputer/moa) that communicate using async direct messaging API.
14-
- [`mixture_of_agents_pub_sub.py`](core/mixture_of_agents_pub_sub.py): An example of how to create a [mixture of agents](https://github.com/togethercomputer/moa) that communicate using publish-subscribe API.
15-
- [`coder_reviewer_direct.py`](core/coder_reviewer_direct.py): An example of how to create a coder-reviewer reflection pattern using async direct messaging API.
16-
- [`coder_reviewer_pub_sub.py`](core/coder_reviewer_pub_sub.py): An example of how to create a coder-reviewer reflection pattern using publish-subscribe API.
12+
- [`two_agents_pub_sub_termination.py`](core/two_agents_pub_sub_termination.py): An example of how to create two agents that communicate using publish-subscribe API, and termination using an intervention handler.
1713

1814
## Tool use examples
1915

@@ -24,6 +20,16 @@ We provide examples to illustrate how to use tools in AGNext:
2420
- [`coding_two_agent_pub_sub.py`](tool-use/coding_two_agent_pub_sub.py): a code execution example with two agents, one for calling tool and one for executing the tool, to demonstrate tool use and reflection on tool use. This example uses the publish-subscribe API.
2521
- [`custom_function_tool_one_agent_direct.py`](tool-use/custom_function_tool_one_agent_direct.py): a custom function tool example with one agent that calls and executes tools to demonstrate tool use and reflection on tool use. This example uses the async direct messaging API.
2622

23+
## Pattern examples
24+
25+
We provide examples to illustrate how multi-agent patterns can be implemented in AGNext:
26+
27+
- [`mixture_of_agents_direct.py`](pattern/mixture_of_agents_direct.py): An example of how to create a [mixture of agents](https://github.com/togethercomputer/moa) that communicate using async direct messaging API.
28+
- [`mixture_of_agents_pub_sub.py`](pattern/mixture_of_agents_pub_sub.py): An example of how to create a [mixture of agents](https://github.com/togethercomputer/moa) that communicate using publish-subscribe API.
29+
- [`coder_reviewer_direct.py`](pattern/coder_reviewer_direct.py): An example of how to create a coder-reviewer reflection pattern using async direct messaging API.
30+
- [`coder_reviewer_pub_sub.py`](pattern/coder_reviewer_pub_sub.py): An example of how to create a coder-reviewer reflection pattern using publish-subscribe API.
31+
- [`group_chat_pub_sub.py`](pattern/group_chat_pub_sub.py): An example of how to create a round-robin group chat among three agents using publish-subscribe API.
32+
2733
## Demos
2834

2935
We provide interactive demos that showcase applications that can be built using AGNext:

python/examples/core/two_agents_pub_sub.py renamed to python/examples/core/two_agents_pub_sub_termination.py

+35-6
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import asyncio
22
from dataclasses import dataclass
3-
from typing import List
3+
from typing import Any, List
44

55
from agnext.application import SingleThreadedAgentRuntime
66
from agnext.components import TypeRoutedAgent, message_handler
@@ -12,7 +12,8 @@
1212
SystemMessage,
1313
UserMessage,
1414
)
15-
from agnext.core import CancellationToken
15+
from agnext.core import AgentId, CancellationToken
16+
from agnext.core.intervention import DefaultInterventionHandler
1617

1718

1819
@dataclass
@@ -21,10 +22,15 @@ class Message:
2122
content: str
2223

2324

25+
@dataclass
26+
class Termination:
27+
pass
28+
29+
2430
class ChatCompletionAgent(TypeRoutedAgent):
2531
"""An agent that uses a chat completion model to respond to messages.
2632
It keeps a memory of the conversation and uses it to generate responses.
27-
It terminates the conversation when the termination word is mentioned."""
33+
It publishes a termination message when the termination word is mentioned."""
2834

2935
def __init__(
3036
self,
@@ -43,6 +49,7 @@ def __init__(
4349
async def handle_message(self, message: Message, cancellation_token: CancellationToken) -> None:
4450
self._memory.append(message)
4551
if self._termination_word in message.content:
52+
self.publish_message(Termination())
4653
return
4754
llm_messages: List[LLMMessage] = []
4855
for m in self._memory[-10:]:
@@ -55,8 +62,30 @@ async def handle_message(self, message: Message, cancellation_token: Cancellatio
5562
self.publish_message(Message(content=response.content, source=self.metadata["name"]))
5663

5764

65+
class TerminationHandler(DefaultInterventionHandler):
66+
"""A handler that listens for termination messages."""
67+
68+
def __init__(self) -> None:
69+
self._terminated = False
70+
71+
async def on_publish(self, message: Any, *, sender: AgentId | None) -> Any:
72+
if isinstance(message, Termination):
73+
self._terminated = True
74+
return message
75+
76+
@property
77+
def terminated(self) -> bool:
78+
return self._terminated
79+
80+
5881
async def main() -> None:
59-
runtime = SingleThreadedAgentRuntime()
82+
# Create the termination handler.
83+
termination_handler = TerminationHandler()
84+
85+
# Create the runtime with the termination handler.
86+
runtime = SingleThreadedAgentRuntime(intervention_handler=termination_handler)
87+
88+
# Register the agents.
6089
jack = runtime.register_and_get(
6190
"Jack",
6291
lambda: ChatCompletionAgent(
@@ -84,8 +113,8 @@ async def main() -> None:
84113
message = Message(content="Can you tell me something fun about SF?", source="User")
85114
runtime.send_message(message, jack)
86115

87-
# Process messages until the agent responds.
88-
while True:
116+
# Process messages until termination.
117+
while not termination_handler.terminated:
89118
await runtime.process_next()
90119

91120

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
import asyncio
2+
from dataclasses import dataclass
3+
from typing import Any, List
4+
5+
from agnext.application import SingleThreadedAgentRuntime
6+
from agnext.components import TypeRoutedAgent, message_handler
7+
from agnext.components.models import (
8+
AssistantMessage,
9+
ChatCompletionClient,
10+
LLMMessage,
11+
OpenAI,
12+
SystemMessage,
13+
UserMessage,
14+
)
15+
from agnext.core import AgentId, CancellationToken
16+
from agnext.core.intervention import DefaultInterventionHandler
17+
18+
19+
@dataclass
20+
class Message:
21+
source: str
22+
content: str
23+
24+
25+
@dataclass
26+
class RequestToSpeak:
27+
pass
28+
29+
30+
@dataclass
31+
class Termination:
32+
pass
33+
34+
35+
class RoundRobinGroupChatManager(TypeRoutedAgent):
36+
def __init__(
37+
self,
38+
description: str,
39+
participants: List[AgentId],
40+
num_rounds: int,
41+
) -> None:
42+
super().__init__(description)
43+
self._participants = participants
44+
self._num_rounds = num_rounds
45+
self._round_count = 0
46+
47+
@message_handler
48+
async def handle_message(self, message: Message, cancellation_token: CancellationToken) -> None:
49+
# Select the next speaker in a round-robin fashion
50+
speaker = self._participants[self._round_count % len(self._participants)]
51+
self._round_count += 1
52+
if self._round_count == self._num_rounds * len(self._participants):
53+
# End the conversation after the specified number of rounds.
54+
self.publish_message(Termination())
55+
return
56+
# Send a request to speak message to the selected speaker.
57+
self.send_message(RequestToSpeak(), speaker)
58+
59+
60+
class GroupChatParticipant(TypeRoutedAgent):
61+
def __init__(
62+
self,
63+
description: str,
64+
system_messages: List[SystemMessage],
65+
model_client: ChatCompletionClient,
66+
) -> None:
67+
super().__init__(description)
68+
self._system_messages = system_messages
69+
self._model_client = model_client
70+
self._memory: List[Message] = []
71+
72+
@message_handler
73+
async def handle_message(self, message: Message, cancellation_token: CancellationToken) -> None:
74+
self._memory.append(message)
75+
76+
@message_handler
77+
async def handle_request_to_speak(self, message: RequestToSpeak, cancellation_token: CancellationToken) -> None:
78+
# Generate a response to the last message in the memory
79+
if not self._memory:
80+
return
81+
llm_messages: List[LLMMessage] = []
82+
for m in self._memory[-10:]:
83+
if m.source == self.metadata["name"]:
84+
llm_messages.append(AssistantMessage(content=m.content, source=self.metadata["name"]))
85+
else:
86+
llm_messages.append(UserMessage(content=m.content, source=m.source))
87+
response = await self._model_client.create(self._system_messages + llm_messages)
88+
assert isinstance(response.content, str)
89+
speach = Message(content=response.content, source=self.metadata["name"])
90+
self._memory.append(speach)
91+
self.publish_message(speach)
92+
93+
94+
class TerminationHandler(DefaultInterventionHandler):
95+
"""A handler that listens for termination messages."""
96+
97+
def __init__(self) -> None:
98+
self._terminated = False
99+
100+
async def on_publish(self, message: Any, *, sender: AgentId | None) -> Any:
101+
if isinstance(message, Termination):
102+
self._terminated = True
103+
return message
104+
105+
@property
106+
def terminated(self) -> bool:
107+
return self._terminated
108+
109+
110+
async def main() -> None:
111+
# Create the termination handler.
112+
termination_handler = TerminationHandler()
113+
114+
# Create the runtime.
115+
runtime = SingleThreadedAgentRuntime(intervention_handler=termination_handler)
116+
117+
# Register the participants.
118+
agent1 = runtime.register_and_get(
119+
"DataScientist",
120+
lambda: GroupChatParticipant(
121+
description="A data scientist",
122+
system_messages=[SystemMessage("You are a data scientist.")],
123+
model_client=OpenAI(model="gpt-3.5-turbo"),
124+
),
125+
)
126+
agent2 = runtime.register_and_get(
127+
"Engineer",
128+
lambda: GroupChatParticipant(
129+
description="An engineer",
130+
system_messages=[SystemMessage("You are an engineer.")],
131+
model_client=OpenAI(model="gpt-3.5-turbo"),
132+
),
133+
)
134+
agent3 = runtime.register_and_get(
135+
"Artist",
136+
lambda: GroupChatParticipant(
137+
description="An artist",
138+
system_messages=[SystemMessage("You are an artist.")],
139+
model_client=OpenAI(model="gpt-3.5-turbo"),
140+
),
141+
)
142+
143+
# Register the group chat manager.
144+
runtime.register(
145+
"GroupChatManager",
146+
lambda: RoundRobinGroupChatManager(
147+
description="A group chat manager",
148+
participants=[agent1, agent2, agent3],
149+
num_rounds=3,
150+
),
151+
)
152+
153+
# Start the conversation.
154+
runtime.publish_message(Message(content="Hello, everyone!", source="Moderator"), namespace="default")
155+
156+
# Run the runtime until termination.
157+
while not termination_handler.terminated:
158+
await runtime.process_next()
159+
160+
161+
if __name__ == "__main__":
162+
import logging
163+
164+
logging.basicConfig(level=logging.WARNING)
165+
logging.getLogger("agnext").setLevel(logging.DEBUG)
166+
asyncio.run(main())

0 commit comments

Comments
 (0)