Skip to content

Commit

Permalink
Autogenstudio Updates [Upload/Dowload of Skills/Workflows, Streaming …
Browse files Browse the repository at this point in the history
…Agent Replies, Agent Message Summarization] (microsoft#1801)

* update default agent system message

* session friendly name functionality

* minor formatting

* fix issues with groupchat and version bump

* fix issues with groupchat and version bump. address microsoft#1580

* update groupchat system message

* add support for copying message in chatbox

* rewrite how agent history is maintained in workflow manager. Directly extend GroupChat and Conversable agent and override process_message method. Allow passing a message processor.
Fixes bug where the last message from a groupchat manager does not get called via register_reply.

* general qol updates

* add support for downloading + copying skills, models and agents from UI

* add regex check to agent name, address microsoft#1507

* add support for uploading workflow files

* refactor, add support for streaming intermediate agent response to ui

* improve streaming ux

* add support for uploading  skills, models, agents, workflows

* add datamodel for socket message

* version update

* fix chatbox height bug

* fix csv pagination issue

* improve hidden menu for uploading entities

* fix minor issue with animation timing on chat interface

* version bump, css fixes

* use description field in autogen conversable class for description

* add implementation for llm summarization of agent chat

* support for llm summary of agent history

* formatting fixes

* formatting updates

* add dockerfile to run autogenstudio in a docker contailer

* autogenstudio docker container

* updates to websockets

* update socket connection logic,

* support using socket for passing message requests where a socket is available

* improve command for building frontend

* formatting updates

* remove duplicated code

# overwrite skills.py in work_dir is duplicated

* update description location

as Where the code calls is like  config.description

* version bump

* refactor to ensure each session and call within a session has an independent working directory

* support use of socket for sending messages where available

* use rsync to copy built files to ui direction instead of cp -rT

* spelling correctino

* readme update

* fix numpy version

* version bump

* add support for dot env variables and updating default app dir to /home/<user>/.autogenstudio

* formatting update

* update gitignore

* formatting updates

---------

Co-authored-by: James Woffinden-Luey <[email protected]>
Co-authored-by: cillyfly <[email protected]>
  • Loading branch information
3 people authored Mar 16, 2024
1 parent da4ddb4 commit d4582ad
Show file tree
Hide file tree
Showing 28 changed files with 2,116 additions and 628 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -182,3 +182,4 @@ test/agentchat/test_agent_scripts/*


notebook/result.png
samples/apps/autogen-studio/autogenstudio/models/test/
17 changes: 17 additions & 0 deletions samples/apps/autogen-studio/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
FROM python:3.10

WORKDIR /code

RUN pip install -U gunicorn autogenstudio

RUN useradd -m -u 1000 user
USER user
ENV HOME=/home/user \
PATH=/home/user/.local/bin:$PATH \
AUTOGENSTUDIO_APPDIR=/home/user/app

WORKDIR $HOME/app

COPY --chown=user . $HOME/app

CMD gunicorn -w $((2 * $(getconf _NPROCESSORS_ONLN) + 1)) --timeout 12600 -k uvicorn.workers.UvicornWorker autogenstudio.web.app:app --bind "0.0.0.0:8081"
3 changes: 3 additions & 0 deletions samples/apps/autogen-studio/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ Code for AutoGen Studio is on GitHub at [microsoft/autogen](https://github.com/m
> [!WARNING]
> AutoGen Studio is currently under active development and we are iterating quickly. Kindly consider that we may introduce breaking changes in the releases during the upcoming weeks, and also the `README` might be outdated. We'll update the `README` as soon as we stabilize the API.
> [!NOTE] Updates
> March 12: Default directory for AutoGen Studio is now /home/<user>/.autogenstudio. You can also specify this directory using the `--appdir` argument when running the application. For example, `autogenstudio ui --appdir /path/to/folder`. This will store the database and other files in the specified directory e.g. `/path/to/folder/database.sqlite`. `.env` files in that directory will be used to set environment variables for the app.
### Capabilities / Roadmap

Some of the capabilities supported by the app frontend include the following:
Expand Down
228 changes: 195 additions & 33 deletions samples/apps/autogen-studio/autogenstudio/chatmanager.py
Original file line number Diff line number Diff line change
@@ -1,63 +1,225 @@
import asyncio
from datetime import datetime
import json
from queue import Queue
import time
from typing import List
from .datamodel import AgentWorkFlowConfig, Message
from .utils import extract_successful_code_blocks, get_default_agent_config, get_modified_files
from .workflowmanager import AutoGenWorkFlowManager
from typing import Any, List, Dict, Optional, Tuple
import os
from fastapi import WebSocket, WebSocketDisconnect
import websockets
from .datamodel import AgentWorkFlowConfig, Message, SocketMessage
from .utils import extract_successful_code_blocks, get_modified_files, summarize_chat_history
from .workflowmanager import AutoGenWorkFlowManager


class AutoGenChatManager:
def __init__(self) -> None:
pass
"""
This class handles the automated generation and management of chat interactions
using an automated workflow configuration and message queue.
"""

def __init__(self, message_queue: Queue) -> None:
"""
Initializes the AutoGenChatManager with a message queue.
:param message_queue: A queue to use for sending messages asynchronously.
"""
self.message_queue = message_queue

def send(self, message: str) -> None:
"""
Sends a message by putting it into the message queue.
:param message: The message string to be sent.
"""
if self.message_queue is not None:
self.message_queue.put_nowait(message)

def chat(
self,
message: Message,
history: List[Dict[str, Any]],
flow_config: Optional[AgentWorkFlowConfig] = None,
connection_id: Optional[str] = None,
user_dir: Optional[str] = None,
**kwargs,
) -> Message:
"""
Processes an incoming message according to the agent's workflow configuration
and generates a response.
def chat(self, message: Message, history: List, flow_config: AgentWorkFlowConfig = None, **kwargs) -> None:
work_dir = kwargs.get("work_dir", None)
scratch_dir = os.path.join(work_dir, "scratch")
:param message: An instance of `Message` representing an incoming message.
:param history: A list of dictionaries, each representing a past interaction.
:param flow_config: An instance of `AgentWorkFlowConfig`. If None, defaults to a standard configuration.
:param connection_id: An optional connection identifier.
:param kwargs: Additional keyword arguments.
:return: An instance of `Message` representing a response.
"""

# create a working director for workflow based on user_dir/session_id/time_hash
work_dir = os.path.join(user_dir, message.session_id, datetime.now().strftime("%Y%m%d_%H-%M-%S"))
os.makedirs(work_dir, exist_ok=True)

# if no flow config is provided, use the default
if flow_config is None:
flow_config = get_default_agent_config(scratch_dir)
raise ValueError("flow_config must be specified")

flow = AutoGenWorkFlowManager(
config=flow_config,
history=history,
work_dir=work_dir,
send_message_function=self.send,
connection_id=connection_id,
)

flow = AutoGenWorkFlowManager(config=flow_config, history=history, work_dir=scratch_dir)
message_text = message.content.strip()

output = ""
start_time = time.time()

metadata = {}
flow.run(message=f"{message_text}", clear_history=False)
end_time = time.time()

metadata["messages"] = flow.agent_history
metadata = {
"messages": flow.agent_history,
"summary_method": flow_config.summary_method,
"time": end_time - start_time,
"files": get_modified_files(start_time, end_time, source_dir=work_dir),
}

output = ""
print("Modified files: ", len(metadata["files"]))

output = self._generate_output(message_text, flow, flow_config)

output_message = Message(
user_id=message.user_id,
root_msg_id=message.root_msg_id,
role="assistant",
content=output,
metadata=json.dumps(metadata),
session_id=message.session_id,
)

return output_message

def _generate_output(
self, message_text: str, flow: AutoGenWorkFlowManager, flow_config: AgentWorkFlowConfig
) -> str:
"""
Generates the output response based on the workflow configuration and agent history.
:param message_text: The text of the incoming message.
:param flow: An instance of `AutoGenWorkFlowManager`.
:param flow_config: An instance of `AgentWorkFlowConfig`.
:return: The output response as a string.
"""

output = ""
if flow_config.summary_method == "last":
successful_code_blocks = extract_successful_code_blocks(flow.agent_history)
last_message = flow.agent_history[-1]["message"]["content"] if flow.agent_history else ""
successful_code_blocks = "\n\n".join(successful_code_blocks)
output = (last_message + "\n" + successful_code_blocks) if successful_code_blocks else last_message
elif flow_config.summary_method == "llm":
output = ""
model = flow.config.receiver.config.llm_config.config_list[0]
status_message = SocketMessage(
type="agent_status",
data={"status": "summarizing", "message": "Generating summary of agent dialogue"},
connection_id=flow.connection_id,
)
self.send(status_message.dict())
output = summarize_chat_history(task=message_text, messages=flow.agent_history, model=model)

elif flow_config.summary_method == "none":
output = ""
return output

metadata["code"] = ""
metadata["summary_method"] = flow_config.summary_method
end_time = time.time()
metadata["time"] = end_time - start_time
modified_files = get_modified_files(start_time, end_time, scratch_dir, dest_dir=work_dir)
metadata["files"] = modified_files

print("Modified files: ", len(modified_files))
class WebSocketConnectionManager:
"""
Manages WebSocket connections including sending, broadcasting, and managing the lifecycle of connections.
"""

output_message = Message(
user_id=message.user_id,
root_msg_id=message.root_msg_id,
role="assistant",
content=output,
metadata=json.dumps(metadata),
session_id=message.session_id,
)
def __init__(
self, active_connections: List[Tuple[WebSocket, str]] = None, active_connections_lock: asyncio.Lock = None
) -> None:
"""
Initializes WebSocketConnectionManager with an optional list of active WebSocket connections.
return output_message
:param active_connections: A list of tuples, each containing a WebSocket object and its corresponding client_id.
"""
if active_connections is None:
active_connections = []
self.active_connections_lock = active_connections_lock
self.active_connections: List[Tuple[WebSocket, str]] = active_connections

async def connect(self, websocket: WebSocket, client_id: str) -> None:
"""
Accepts a new WebSocket connection and appends it to the active connections list.
:param websocket: The WebSocket instance representing a client connection.
:param client_id: A string representing the unique identifier of the client.
"""
await websocket.accept()
async with self.active_connections_lock:
self.active_connections.append((websocket, client_id))
print(f"New Connection: {client_id}, Total: {len(self.active_connections)}")

async def disconnect(self, websocket: WebSocket) -> None:
"""
Disconnects and removes a WebSocket connection from the active connections list.
:param websocket: The WebSocket instance to remove.
"""
async with self.active_connections_lock:
try:
self.active_connections = [conn for conn in self.active_connections if conn[0] != websocket]
print(f"Connection Closed. Total: {len(self.active_connections)}")
except ValueError:
print("Error: WebSocket connection not found")

async def disconnect_all(self) -> None:
"""
Disconnects all active WebSocket connections.
"""
for connection, _ in self.active_connections[:]:
await self.disconnect(connection)

async def send_message(self, message: Dict, websocket: WebSocket) -> None:
"""
Sends a JSON message to a single WebSocket connection.
:param message: A JSON serializable dictionary containing the message to send.
:param websocket: The WebSocket instance through which to send the message.
"""
try:
async with self.active_connections_lock:
await websocket.send_json(message)
except WebSocketDisconnect:
print("Error: Tried to send a message to a closed WebSocket")
await self.disconnect(websocket)
except websockets.exceptions.ConnectionClosedOK:
print("Error: WebSocket connection closed normally")
await self.disconnect(websocket)
except Exception as e:
print(f"Error in sending message: {str(e)}")
await self.disconnect(websocket)

async def broadcast(self, message: Dict) -> None:
"""
Broadcasts a JSON message to all active WebSocket connections.
:param message: A JSON serializable dictionary containing the message to broadcast.
"""
# Create a message dictionary with the desired format
message_dict = {"message": message}

for connection, _ in self.active_connections[:]:
try:
if connection.client_state == websockets.protocol.State.OPEN:
# Call send_message method with the message dictionary and current WebSocket connection
await self.send_message(message_dict, connection)
else:
print("Error: WebSocket connection is closed")
await self.disconnect(connection)
except (WebSocketDisconnect, websockets.exceptions.ConnectionClosedOK) as e:
print(f"Error: WebSocket disconnected or closed({str(e)})")
await self.disconnect(connection)
16 changes: 14 additions & 2 deletions samples/apps/autogen-studio/autogenstudio/datamodel.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ class AgentConfig:
is_termination_msg: Optional[Union[bool, str, Callable]] = None
code_execution_config: Optional[Union[bool, str, Dict[str, Any]]] = None
default_auto_reply: Optional[str] = ""
description: Optional[str] = None

def dict(self):
result = asdict(self)
Expand All @@ -131,7 +132,6 @@ class AgentFlowSpec:
timestamp: Optional[str] = None
user_id: Optional[str] = None
skills: Optional[Union[None, List[Skill]]] = None
description: Optional[str] = None

def __post_init__(self):
if self.timestamp is None:
Expand Down Expand Up @@ -175,7 +175,6 @@ class GroupChatFlowSpec:
id: Optional[str] = None
timestamp: Optional[str] = None
user_id: Optional[str] = None
description: Optional[str] = None
skills: Optional[Union[None, List[Skill]]] = None

def __post_init__(self):
Expand Down Expand Up @@ -303,3 +302,16 @@ class DBWebRequestModel(object):
agent: Optional[AgentFlowSpec] = None
workflow: Optional[AgentWorkFlowConfig] = None
model: Optional[Model] = None
message: Optional[Message] = None
connection_id: Optional[str] = None


@dataclass
class SocketMessage(object):
connection_id: str
data: Dict[str, Any]
type: str

def dict(self):
result = asdict(self)
return result
Loading

0 comments on commit d4582ad

Please sign in to comment.