Skip to content

Commit

Permalink
Initial implementation of agentchat (#623)
Browse files Browse the repository at this point in the history
* WIP implementation for agentchat

* WIP

* WIP

* wip

* WIP

* WIP

* WIP

* WIP

* fix types

* format

* fix build

* Fix build

* use a simpler implementation of thread -- list

* Select speaker to return speaker topic type

* add parent topic type to agent container

* Address comments

* Add check to make sure agent id is constant in a team run.

* Fix build
  • Loading branch information
ekzhu authored Sep 25, 2024
1 parent 74a55c3 commit 7ba9918
Show file tree
Hide file tree
Showing 20 changed files with 904 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from ._base_chat_agent import BaseChatAgent, ChatMessage
from .coding._code_executor_agent import CodeExecutorAgent
from .coding._coding_assistant_agent import CodingAssistantAgent

__all__ = [
"BaseChatAgent",
"ChatMessage",
"CodeExecutorAgent",
"CodingAssistantAgent",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from abc import ABC, abstractmethod
from typing import Sequence

from autogen_core.base import CancellationToken
from autogen_core.components.models import AssistantMessage, UserMessage
from pydantic import BaseModel


class ChatMessage(BaseModel):
"""A chat message from a user or agent."""

content: UserMessage | AssistantMessage
"""The content of the message."""

request_pause: bool
"""A flag indicating whether the current conversation session should be
paused after processing this message."""


class BaseChatAgent(ABC):
"""Base class for a chat agent that can participant in a team."""

def __init__(self, name: str, description: str) -> None:
self._name = name
self._description = description

@property
def name(self) -> str:
"""The name of the agent. This is used by team to uniquely identify
the agent. It should be unique within the team."""
return self._name

@property
def description(self) -> str:
"""The description of the agent. This is used by team to
make decisions about which agents to use. The description should
describe the agent's capabilities and how to interact with it."""
return self._description

@abstractmethod
async def on_messages(self, messages: Sequence[ChatMessage], cancellation_token: CancellationToken) -> ChatMessage:
"""Handle incoming messages and return a response message."""
...
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from typing import List, Sequence

from autogen_core.base import CancellationToken
from autogen_core.components.code_executor import CodeBlock, CodeExecutor, extract_markdown_code_blocks
from autogen_core.components.models import UserMessage

from .._base_chat_agent import BaseChatAgent, ChatMessage


class CodeExecutorAgent(BaseChatAgent):
"""An agent that executes code snippets and report the results."""

DESCRIPTION = "A computer terminal that performs no other action than running Python scripts (provided to it quoted in ```python code blocks), or sh shell scripts (provided to it quoted in ```sh code blocks)."

def __init__(self, name: str, code_executor: CodeExecutor):
"""Initialize the agent with a code executor."""
super().__init__(name=name, description=self.DESCRIPTION)
self._code_executor = code_executor

async def on_messages(self, messages: Sequence[ChatMessage], cancellation_token: CancellationToken) -> ChatMessage:
# Extract code blocks from the messages.
code_blocks: List[CodeBlock] = []
for msg in messages:
if isinstance(msg.content, UserMessage) and isinstance(msg.content.content, str):
code_blocks.extend(extract_markdown_code_blocks(msg.content.content))
if code_blocks:
# Execute the code blocks.
result = await self._code_executor.execute_code_blocks(code_blocks, cancellation_token=cancellation_token)
return ChatMessage(content=UserMessage(content=result.output, source=self.name), request_pause=False)
else:
return ChatMessage(
content=UserMessage(content="No code blocks found in the thread.", source=self.name),
request_pause=False,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from typing import List, Sequence

from autogen_core.base import CancellationToken
from autogen_core.components.models import AssistantMessage, ChatCompletionClient, SystemMessage, UserMessage

from .._base_chat_agent import BaseChatAgent, ChatMessage


class CodingAssistantAgent(BaseChatAgent):
"""An agent that provides coding assistance using an LLM model client."""

DESCRIPTION = "A helpful and general-purpose AI assistant that has strong language skills, Python skills, and Linux command line skills."

SYSTEM_MESSAGE = """You are a helpful AI assistant.
Solve tasks using your coding and language skills.
In the following cases, suggest python code (in a python coding block) or shell script (in a sh coding block) for the user to execute.
1. When you need to collect info, use the code to output the info you need, for example, browse or search the web, download/read a file, print the content of a webpage or a file, get the current date/time, check the operating system. After sufficient info is printed and the task is ready to be solved based on your language skill, you can solve the task by yourself.
2. When you need to perform some task with code, use the code to perform the task and output the result. Finish the task smartly.
Solve the task step by step if you need to. If a plan is not provided, explain your plan first. Be clear which step uses code, and which step uses your language skill.
When using code, you must indicate the script type in the code block. The user cannot provide any other feedback or perform any other action beyond executing the code you suggest. The user can't modify your code. So do not suggest incomplete code which requires users to modify. Don't use a code block if it's not intended to be executed by the user.
If you want the user to save the code in a file before executing it, put # filename: <filename> inside the code block as the first line. Don't include multiple code blocks in one response. Do not ask users to copy and paste the result. Instead, use 'print' function for the output when relevant. Check the execution result returned by the user.
If the result indicates there is an error, fix the error and output the code again. Suggest the full code instead of partial code or code changes. If the error can't be fixed or if the task is not solved even after the code is executed successfully, analyze the problem, revisit your assumption, collect additional info you need, and think of a different approach to try.
When you find an answer, verify the answer carefully. Include verifiable evidence in your response if possible.
Reply "TERMINATE" in the end when everything is done."""

def __init__(self, name: str, model_client: ChatCompletionClient):
super().__init__(name=name, description=self.DESCRIPTION)
self._model_client = model_client
self._system_messages = [SystemMessage(content=self.SYSTEM_MESSAGE)]
self._message_thread: List[UserMessage | AssistantMessage] = []

async def on_messages(self, messages: Sequence[ChatMessage], cancellation_token: CancellationToken) -> ChatMessage:
# Add messages to the thread.
for msg in messages:
self._message_thread.append(msg.content)

# Generate an inference result based on the thread.
llm_messages = self._system_messages + self._message_thread
result = await self._model_client.create(llm_messages, cancellation_token=cancellation_token)
assert isinstance(result.content, str)

# Add the response to the thread.
self._message_thread.append(AssistantMessage(content=result.content, source=self.name))

# Detect pause request.
request_pause = "terminate" in result.content.strip().lower()

return ChatMessage(content=UserMessage(content=result.content, source=self.name), request_pause=request_pause)
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .group_chat._round_robin_group_chat import RoundRobinGroupChat

__all__ = ["RoundRobinGroupChat"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from typing import List

from autogen_core.base import MessageContext
from autogen_core.components import DefaultTopicId, RoutedAgent, event

from ..agents import BaseChatAgent, ChatMessage
from ._messages import ContentPublishEvent, ContentRequestEvent


class BaseChatAgentContainer(RoutedAgent):
"""A core agent class that delegates message handling to an
:class:`autogen_agentchat.agents.BaseChatAgent` so that it can be used in a team.
Args:
parent_topic_type (str): The topic type of the parent orchestrator.
agent (BaseChatAgent): The agent to delegate message handling to.
"""

def __init__(self, parent_topic_type: str, agent: BaseChatAgent) -> None:
super().__init__(description=agent.description)
self._parent_topic_type = parent_topic_type
self._agent = agent
self._message_buffer: List[ChatMessage] = []

@event
async def handle_content_publish(self, message: ContentPublishEvent, ctx: MessageContext) -> None:
"""Handle a content publish event by appending the content to the buffer."""
self._message_buffer.append(ChatMessage(content=message.content, request_pause=message.request_pause))

@event
async def handle_content_request(self, message: ContentRequestEvent, ctx: MessageContext) -> None:
"""Handle a content request event by passing the messages in the buffer
to the delegate agent and publish the response."""
response = await self._agent.on_messages(self._message_buffer, ctx.cancellation_token)
self._message_buffer.clear()
await self.publish_message(
ContentPublishEvent(content=response.content, request_pause=response.request_pause),
topic_id=DefaultTopicId(type=self._parent_topic_type),
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from dataclasses import dataclass
from typing import Protocol


@dataclass
class TeamRunResult:
result: str


class BaseTeam(Protocol):
async def run(self, task: str) -> TeamRunResult:
"""Run the team and return the result."""
...
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from autogen_core.components.models import AssistantMessage, UserMessage
from pydantic import BaseModel


class ContentPublishEvent(BaseModel):
"""An event message for sharing some data. Agents receive this message should
update their internal state (e.g., append to message history) with the
content of the message.
"""

content: UserMessage | AssistantMessage
"""The content of the message."""

request_pause: bool
"""A flag indicating whether the current conversation session should be
paused after processing this message."""


class ContentRequestEvent(BaseModel):
"""An event message for requesting to publish a content message.
Upon receiving this message, the agent should publish a ContentPublishEvent
message.
"""

...
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import sys
from typing import List

from autogen_core.base import MessageContext, TopicId
from autogen_core.components import RoutedAgent, event
from autogen_core.components.models import AssistantMessage, UserMessage

from .._messages import ContentPublishEvent, ContentRequestEvent


class BaseGroupChatManager(RoutedAgent):
"""Base class for a group chat manager that manages a group chat with multiple participants.
It is the responsibility of the caller to ensure:
- All participants must subscribe to the group chat topic and each of their own topics.
- The group chat manager must subscribe to the parent topic and the group chat topic.
Args:
parent_topic_type (str): The topic type of the parent orchestrator.
group_topic_type (str): The topic type of the group chat.
participant_topic_types (List[str]): The topic types of the participants.
participant_descriptions (List[str]): The descriptions of the participants
Raises:
ValueError: If the number of participant topic types, agent types, and descriptions are not the same.
"""

def __init__(
self,
parent_topic_type: str,
group_topic_type: str,
participant_topic_types: List[str],
participant_descriptions: List[str],
):
super().__init__(description="Group chat manager")
self._parent_topic_type = parent_topic_type
self._group_topic_type = group_topic_type
if len(participant_topic_types) != len(participant_descriptions):
raise ValueError("The number of participant topic types, agent types, and descriptions must be the same.")
if len(set(participant_topic_types)) != len(participant_topic_types):
raise ValueError("The participant topic types must be unique.")
if group_topic_type in participant_topic_types:
raise ValueError("The group topic type must not be in the participant topic types.")
if parent_topic_type in participant_topic_types:
raise ValueError("The parent topic type must not be in the participant topic types.")
if group_topic_type == parent_topic_type:
raise ValueError("The group topic type must not be the same as the parent topic type.")
self._participant_topic_types = participant_topic_types
self._participant_descriptions = participant_descriptions
self._message_thread: List[UserMessage | AssistantMessage] = []

@event
async def handle_content_publish(self, message: ContentPublishEvent, ctx: MessageContext) -> None:
"""Handle a content publish event.
If the event is from the parent topic, add the message to the thread.
If the event is from the group chat topic, add the message to the thread and select a speaker to continue the conversation.
If the event from the group chat session requests a pause, publish the last message to the parent topic."""
assert ctx.topic_id is not None
group_chat_topic_id = TopicId(type=self._group_topic_type, source=ctx.topic_id.source)

# TODO: use something else other than print.
assert isinstance(message.content, UserMessage) or isinstance(message.content, AssistantMessage)
sys.stdout.write(f"{'-'*80}\n{message.content.source}:\n{message.content.content}\n")

# Process event from parent.
if ctx.topic_id.type == self._parent_topic_type:
self._message_thread.append(message.content)
await self.publish_message(message, topic_id=group_chat_topic_id)
return

# Process event from the group chat this agent manages.
assert ctx.topic_id.type == self._group_topic_type
self._message_thread.append(message.content)

if message.request_pause:
parent_topic_id = TopicId(type=self._parent_topic_type, source=ctx.topic_id.source)
await self.publish_message(
ContentPublishEvent(content=message.content, request_pause=True), topic_id=parent_topic_id
)
return

# Select a speaker to continue the conversation.
speaker_topic_type = await self.select_speaker(self._message_thread)

participant_topic_id = TopicId(type=speaker_topic_type, source=ctx.topic_id.source)
group_chat_topic_id = TopicId(type=self._group_topic_type, source=ctx.topic_id.source)
await self.publish_message(ContentRequestEvent(), topic_id=participant_topic_id)

@event
async def handle_content_request(self, message: ContentRequestEvent, ctx: MessageContext) -> None:
"""Handle a content request by selecting a speaker to start the conversation."""
assert ctx.topic_id is not None
if ctx.topic_id.type == self._group_topic_type:
raise RuntimeError("Content request event from the group chat topic is not allowed.")

speaker_topic_type = await self.select_speaker(self._message_thread)

participant_topic_id = TopicId(type=speaker_topic_type, source=ctx.topic_id.source)
await self.publish_message(ContentRequestEvent(), topic_id=participant_topic_id)

async def select_speaker(self, thread: List[UserMessage | AssistantMessage]) -> str:
"""Select a speaker from the participants and return the
topic type of the selected speaker."""
raise NotImplementedError("Method not implemented")
Loading

0 comments on commit 7ba9918

Please sign in to comment.