Skip to content

Commit

Permalink
TeamOne agents default to handling one message at a time, exclusively… (
Browse files Browse the repository at this point in the history
#273)

* TeamOne agents default to handling one message at a time, exclusively. The default can be overridden by passing a parameter to the BaseAgent constructor.
  • Loading branch information
afourney authored Jul 26, 2024
1 parent 3bc3761 commit 9fd7e30
Showing 1 changed file with 32 additions and 7 deletions.
39 changes: 32 additions & 7 deletions python/teams/team-one/src/team_one/agents/base_agent.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from asyncio import Lock
from typing import List, Tuple

from agnext.components import TypeRoutedAgent, message_handler
Expand All @@ -18,31 +19,55 @@ class BaseAgent(TypeRoutedAgent):
def __init__(
self,
description: str,
handle_messages_concurrently: bool = False,
) -> None:
super().__init__(description)
self._lock: Lock | None = None if handle_messages_concurrently else Lock()
self._chat_history: List[LLMMessage] = []

@message_handler
async def handle_broadcast(self, message: BroadcastMessage, cancellation_token: CancellationToken) -> None:
"""Handle an incoming broadcast message."""
assert isinstance(message.content, UserMessage)
self._chat_history.append(message.content)

try:
if self._lock is not None:
await self._lock.acquire()

self._chat_history.append(message.content)
finally:
if self._lock is not None:
self._lock.release()

@message_handler
async def handle_reset(self, message: ResetMessage, cancellation_token: CancellationToken) -> None:
"""Handle a reset message."""
await self._reset(cancellation_token)
try:
if self._lock is not None:
await self._lock.acquire()

await self._reset(cancellation_token)
finally:
if self._lock is not None:
self._lock.release()

@message_handler
async def handle_request_reply(self, message: RequestReplyMessage, cancellation_token: CancellationToken) -> None:
"""Respond to a reply request."""
request_halt, response = await self._generate_reply(cancellation_token)
try:
if self._lock is not None:
await self._lock.acquire()

request_halt, response = await self._generate_reply(cancellation_token)

assistant_message = AssistantMessage(content=message_content_to_str(response), source=self.metadata["name"])
self._chat_history.append(assistant_message)
assistant_message = AssistantMessage(content=message_content_to_str(response), source=self.metadata["name"])
self._chat_history.append(assistant_message)

user_message = UserMessage(content=response, source=self.metadata["name"])
await self.publish_message(BroadcastMessage(content=user_message, request_halt=request_halt))
user_message = UserMessage(content=response, source=self.metadata["name"])
await self.publish_message(BroadcastMessage(content=user_message, request_halt=request_halt))
finally:
if self._lock is not None:
self._lock.release()

async def _generate_reply(self, cancellation_token: CancellationToken) -> Tuple[bool, UserContent]:
"""Returns (request_halt, response_message)"""
Expand Down

0 comments on commit 9fd7e30

Please sign in to comment.