diff --git a/samples/apps/cap/README.md b/samples/apps/cap/README.md index 7fe7a6c7469b..c8d1945eae2e 100644 --- a/samples/apps/cap/README.md +++ b/samples/apps/cap/README.md @@ -1,11 +1,13 @@ # Composable Actor Platform (CAP) for AutoGen -## I just want to run the demo! +## I just want to run the remote AutoGen agents! *Python Instructions (Windows, Linux, MacOS):* 0) cd py 1) pip install -r autogencap/requirements.txt 2) python ./demo/App.py +3) Choose (5) and follow instructions to run standalone Agents +4) Choose other options for other demos *Demo Notes:* 1) Options involving AutoGen require OAI_CONFIG_LIST. @@ -15,14 +17,15 @@ *Demo Reference:* ``` - Select the Composable Actor Platform (CAP) demo app to run: - (enter anything else to quit) - 1. Hello World Actor - 2. Complex Actor Graph - 3. AutoGen Pair - 4. AutoGen GroupChat - 5. AutoGen Agents in different processes - Enter your choice (1-5): +Select the Composable Actor Platform (CAP) demo app to run: +(enter anything else to quit) +1. Hello World +2. Complex Agent (e.g. Name or Quit) +3. AutoGen Pair +4. AutoGen GroupChat +5. AutoGen Agents in different processes +6. List Actors in CAP (Registry) +Enter your choice (1-6): ``` ## What is Composable Actor Platform (CAP)? diff --git a/samples/apps/cap/py/README.md b/samples/apps/cap/py/README.md new file mode 100644 index 000000000000..e11fa3d048fc --- /dev/null +++ b/samples/apps/cap/py/README.md @@ -0,0 +1,39 @@ +# Composable Actor Platform (CAP) for AutoGen + +## I just want to run the remote AutoGen agents! +*Python Instructions (Windows, Linux, MacOS):* + +pip install autogencap + +1) AutoGen require OAI_CONFIG_LIST. + AutoGen python requirements: 3.8 <= python <= 3.11 + +``` + +## What is Composable Actor Platform (CAP)? +AutoGen is about Agents and Agent Orchestration. CAP extends AutoGen to allows Agents to communicate via a message bus. CAP, therefore, deals with the space between these components. CAP is a message based, actor platform that allows actors to be composed into arbitrary graphs. + +Actors can register themselves with CAP, find other agents, construct arbitrary graphs, send and receive messages independently and many, many, many other things. +```python + # CAP Platform + network = LocalActorNetwork() + # Register an agent + network.register(GreeterAgent()) + # Tell agents to connect to other agents + network.connect() + # Get a channel to the agent + greeter_link = network.lookup_agent("Greeter") + # Send a message to the agent + greeter_link.send_txt_msg("Hello World!") + # Cleanup + greeter_link.close() + network.disconnect() +``` +### Check out other demos in the `py/demo` directory. We show the following: ### +1) Hello World shown above +2) Many CAP Actors interacting with each other +3) A pair of interacting AutoGen Agents wrapped in CAP Actors +4) CAP wrapped AutoGen Agents in a group chat +5) Two AutoGen Agents running in different processes and communicating through CAP +6) List all registered agents in CAP +7) AutoGen integration to list all registered agents diff --git a/samples/apps/cap/py/autogencap/ActorConnector.py b/samples/apps/cap/py/autogencap/ActorConnector.py index c7b16157dc6a..e2ddbfa4fb69 100644 --- a/samples/apps/cap/py/autogencap/ActorConnector.py +++ b/samples/apps/cap/py/autogencap/ActorConnector.py @@ -29,8 +29,11 @@ def _connect_pub_socket(self): evt: Dict[str, Any] = {} mon_evt = recv_monitor_message(monitor) evt.update(mon_evt) - if evt["event"] == zmq.EVENT_MONITOR_STOPPED or evt["event"] == zmq.EVENT_HANDSHAKE_SUCCEEDED: - Debug("ActorSender", "Handshake received (Or Monitor stopped)") + if evt["event"] == zmq.EVENT_HANDSHAKE_SUCCEEDED: + Debug("ActorSender", "Handshake received") + break + elif evt["event"] == zmq.EVENT_MONITOR_STOPPED: + Debug("ActorSender", "Monitor stopped") break self._pub_socket.disable_monitor() monitor.close() @@ -117,32 +120,33 @@ def send_txt_msg(self, msg): def send_bin_msg(self, msg_type: str, msg): self._sender.send_bin_msg(msg_type, msg) - def binary_request(self, msg_type: str, msg, retry=5): + def binary_request(self, msg_type: str, msg, num_attempts=5): original_timeout: int = 0 - if retry == -1: + if num_attempts == -1: original_timeout = self._resp_socket.getsockopt(zmq.RCVTIMEO) self._resp_socket.setsockopt(zmq.RCVTIMEO, 1000) try: self._sender.send_bin_request_msg(msg_type, msg, self._resp_topic) - while retry == -1 or retry > 0: + while num_attempts == -1 or num_attempts > 0: try: topic, resp_msg_type, _, resp = self._resp_socket.recv_multipart() return topic, resp_msg_type, resp except zmq.Again: Debug( - "ActorConnector", f"{self._topic}: No response received. retry_count={retry}, max_retry={retry}" + "ActorConnector", + f"{self._topic}: No response received. retry_count={num_attempts}, max_retry={num_attempts}", ) time.sleep(0.01) - if retry != -1: - retry -= 1 + if num_attempts != -1: + num_attempts -= 1 finally: - if retry == -1: + if num_attempts == -1: self._resp_socket.setsockopt(zmq.RCVTIMEO, original_timeout) Error("ActorConnector", f"{self._topic}: No response received. Giving up.") return None, None, None def close(self): - self._sender.close() + self._pub_socket.close() self._resp_socket.close() diff --git a/samples/apps/cap/py/autogencap/DebugLog.py b/samples/apps/cap/py/autogencap/DebugLog.py index e03712355853..d3be81fe24e6 100644 --- a/samples/apps/cap/py/autogencap/DebugLog.py +++ b/samples/apps/cap/py/autogencap/DebugLog.py @@ -15,42 +15,58 @@ LEVEL_NAMES = ["DBG", "INF", "WRN", "ERR"] LEVEL_COLOR = ["dark_grey", "green", "yellow", "red"] -console_lock = threading.Lock() - - -def Log(level, context, msg): - # Check if the current level meets the threshold - if level >= Config.LOG_LEVEL: # Use the LOG_LEVEL from the Config module - # Check if the context is in the list of ignored contexts - if context in Config.IGNORED_LOG_CONTEXTS: - return - with console_lock: - timestamp = colored(datetime.datetime.now().strftime("%m/%d/%y %H:%M:%S"), "dark_grey") - # Translate level number to name and color - level_name = colored(LEVEL_NAMES[level], LEVEL_COLOR[level]) - # Left justify the context and color it blue - context = colored(context.ljust(14), "blue") - # Left justify the threadid and color it blue - thread_id = colored(str(threading.get_ident()).ljust(5), "blue") - # color the msg based on the level - msg = colored(msg, LEVEL_COLOR[level]) - print(f"{thread_id} {timestamp} {level_name}: [{context}] {msg}") + +class BaseLogger: + def __init__(self): + self._lock = threading.Lock() + + def Log(self, level, context, msg): + # Check if the current level meets the threshold + if level >= Config.LOG_LEVEL: # Use the LOG_LEVEL from the Config module + # Check if the context is in the list of ignored contexts + if context in Config.IGNORED_LOG_CONTEXTS: + return + with self._lock: + self.WriteLog(level, context, msg) + + def WriteLog(self, level, context, msg): + raise NotImplementedError("Subclasses must implement this method") + + +class ConsoleLogger(BaseLogger): + def __init__(self): + super().__init__() + + def WriteLog(self, level, context, msg): + timestamp = colored(datetime.datetime.now().strftime("%m/%d/%y %H:%M:%S"), "pink") + # Translate level number to name and color + level_name = colored(LEVEL_NAMES[level], LEVEL_COLOR[level]) + # Left justify the context and color it blue + context = colored(context.ljust(14), "blue") + # Left justify the threadid and color it blue + thread_id = colored(str(threading.get_ident()).ljust(5), "blue") + # color the msg based on the level + msg = colored(msg, LEVEL_COLOR[level]) + print(f"{thread_id} {timestamp} {level_name}: [{context}] {msg}") + + +LOGGER = ConsoleLogger() def Debug(context, message): - Log(DEBUG, context, message) + LOGGER.Log(DEBUG, context, message) def Info(context, message): - Log(INFO, context, message) + LOGGER.Log(INFO, context, message) def Warn(context, message): - Log(WARN, context, message) + LOGGER.Log(WARN, context, message) def Error(context, message): - Log(ERROR, context, message) + LOGGER.Log(ERROR, context, message) def shorten(msg, num_parts=5, max_len=100): diff --git a/samples/apps/cap/py/autogencap/ag_adapter/AutoGenConnector.py b/samples/apps/cap/py/autogencap/ag_adapter/AutoGenConnector.py index 3fbb0db64fdd..ce81e7e945d3 100644 --- a/samples/apps/cap/py/autogencap/ag_adapter/AutoGenConnector.py +++ b/samples/apps/cap/py/autogencap/ag_adapter/AutoGenConnector.py @@ -1,3 +1,4 @@ +import json from typing import Dict, Optional, Union from autogen import Agent @@ -37,7 +38,7 @@ def send_gen_reply_req(self): # Setting retry to -1 to keep trying until a response is received # This normal AutoGen behavior but does not handle the case when an AutoGen agent # is not running. In that case, the connector will keep trying indefinitely. - _, _, resp = self._can_channel.binary_request(type(msg).__name__, serialized_msg, retry=-1) + _, _, resp = self._can_channel.binary_request(type(msg).__name__, serialized_msg, num_attempts=-1) gen_reply_resp = GenReplyResp() gen_reply_resp.ParseFromString(resp) return gen_reply_resp.data @@ -55,7 +56,8 @@ def send_receive_req( msg = ReceiveReq() if isinstance(message, dict): for key, value in message.items(): - msg.data_map.data[key] = value + json_serialized_value = json.dumps(value) + msg.data_map.data[key] = json_serialized_value elif isinstance(message, str): msg.data = message msg.sender = sender.name diff --git a/samples/apps/cap/py/autogencap/ag_adapter/CAP2AG.py b/samples/apps/cap/py/autogencap/ag_adapter/CAP2AG.py index 50a0a4751ea4..25cd7093ba79 100644 --- a/samples/apps/cap/py/autogencap/ag_adapter/CAP2AG.py +++ b/samples/apps/cap/py/autogencap/ag_adapter/CAP2AG.py @@ -1,3 +1,4 @@ +import json from enum import Enum from typing import Optional @@ -72,7 +73,11 @@ def _call_agent_receive(self, receive_params: ReceiveReq): save_name = self._ag2can_other_agent.name self._ag2can_other_agent.set_name(receive_params.sender) if receive_params.HasField("data_map"): - data = dict(receive_params.data_map.data) + json_data = dict(receive_params.data_map.data) + data = {} + for key, json_value in json_data.items(): + value = json.loads(json_value) + data[key] = value else: data = receive_params.data self._the_ag_agent.receive(data, self._ag2can_other_agent, request_reply, silent) diff --git a/samples/apps/cap/py/autogencap/ag_adapter/agent.py b/samples/apps/cap/py/autogencap/ag_adapter/agent.py new file mode 100644 index 000000000000..219bb7297c18 --- /dev/null +++ b/samples/apps/cap/py/autogencap/ag_adapter/agent.py @@ -0,0 +1,22 @@ +import time + +from autogen import ConversableAgent + +from ..DebugLog import Info, Warn +from .CAP2AG import CAP2AG + + +class Agent: + def __init__(self, agent: ConversableAgent, counter_party_name="user_proxy", init_chat=False): + self._agent = agent + self._the_other_name = counter_party_name + self._agent_adptr = CAP2AG( + ag_agent=self._agent, the_other_name=self._the_other_name, init_chat=init_chat, self_recursive=True + ) + + def register(self, network): + Info("Agent", f"Running Standalone {self._agent.name}") + network.register(self._agent_adptr) + + def running(self): + return self._agent_adptr.run diff --git a/samples/apps/cap/py/demo/App.py b/samples/apps/cap/py/demo/App.py index 19411a9b315c..8af8c97b0e5a 100644 --- a/samples/apps/cap/py/demo/App.py +++ b/samples/apps/cap/py/demo/App.py @@ -45,7 +45,7 @@ def main(): print("3. AutoGen Pair") print("4. AutoGen GroupChat") print("5. AutoGen Agents in different processes") - print("6. List Actors in CAP") + print("6. List Actors in CAP (Registry)") choice = input("Enter your choice (1-6): ") if choice == "1": diff --git a/samples/apps/cap/py/demo/RemoteAGDemo.py b/samples/apps/cap/py/demo/RemoteAGDemo.py index 0c2a946c0a42..5e7f2f0f1efe 100644 --- a/samples/apps/cap/py/demo/RemoteAGDemo.py +++ b/samples/apps/cap/py/demo/RemoteAGDemo.py @@ -5,13 +5,12 @@ def remote_ag_demo(): print("Remote Agent Demo") instructions = """ - In this demo, Broker, Assistant, and UserProxy are running in separate processes. - demo/standalone/UserProxy.py will initiate a conversation by sending UserProxy a message. + In this demo, Assistant, and UserProxy are running in separate processes. + demo/standalone/user_proxy.py will initiate a conversation by sending UserProxy Agent a message. Please do the following: - 1) Start Broker (python demo/standalone/Broker.py) - 2) Start Assistant (python demo/standalone/Assistant.py) - 3) Start UserProxy (python demo/standalone/UserProxy.py) + 1) Start Assistant (python demo/standalone/assistant.py) + 2) Start UserProxy (python demo/standalone/user_proxy.py) """ print(instructions) input("Press Enter to return to demo menu...") diff --git a/samples/apps/cap/py/demo/standalone/DirectorySvc.py b/samples/apps/cap/py/demo/standalone/directory_svc.py similarity index 100% rename from samples/apps/cap/py/demo/standalone/DirectorySvc.py rename to samples/apps/cap/py/demo/standalone/directory_svc.py diff --git a/samples/apps/cap/py/demo/standalone/user_proxy.py b/samples/apps/cap/py/demo/standalone/user_proxy.py new file mode 100644 index 000000000000..3ce4dac79276 --- /dev/null +++ b/samples/apps/cap/py/demo/standalone/user_proxy.py @@ -0,0 +1,57 @@ +import time + +import _paths +from autogencap.ag_adapter.agent import Agent +from autogencap.Config import IGNORED_LOG_CONTEXTS +from autogencap.LocalActorNetwork import LocalActorNetwork + +from autogen import UserProxyAgent + +# Filter out some Log message contexts +IGNORED_LOG_CONTEXTS.extend(["BROKER"]) + + +def main(): + # Standard AutoGen + user_proxy = UserProxyAgent( + "user_proxy", + code_execution_config={"work_dir": "coding"}, + is_termination_msg=lambda x: "TERMINATE" in x.get("content"), + ) + + # Wrap AutoGen Agent in CAP + cap_user_proxy = Agent(user_proxy, counter_party_name="assistant", init_chat=True) + # Create the message bus + network = LocalActorNetwork() + # Add the user_proxy to the message bus + cap_user_proxy.register(network) + # Start message processing + network.connect() + + # Wait for the user_proxy to finish + interact_with_user(network, cap_user_proxy) + # Cleanup + network.disconnect() + + +# Starts the Broker and the Assistant. The UserProxy is started separately. +def interact_with_user(network, cap_assistant): + user_proxy_conn = network.lookup_actor("user_proxy") + example = "Plot a chart of MSFT daily closing prices for last 1 Month." + print(f"Example: {example}") + try: + user_input = input("Please enter your command: ") + if user_input == "": + user_input = example + print(f"Sending: {user_input}") + user_proxy_conn.send_txt_msg(user_input) + + # Hang around for a while + while cap_assistant.running(): + time.sleep(0.5) + except KeyboardInterrupt: + print("Interrupted by user, shutting down.") + + +if __name__ == "__main__": + main() diff --git a/samples/apps/cap/py/pyproject.toml b/samples/apps/cap/py/pyproject.toml new file mode 100644 index 000000000000..51024bb8b279 --- /dev/null +++ b/samples/apps/cap/py/pyproject.toml @@ -0,0 +1,36 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "autogencap_rajan.jedi" +version = "0.0.7" +authors = [ + { name="Rajan Chari", email="rajan.jedi@gmail.com" }, +] +dependencies = [ + "pyzmq >= 25.1.2", + "protobuf >= 4.25.3", + "termcolor >= 2.4.0", + "pyautogen >= 0.2.23", +] +description = "CAP w/ autogen bindings" +readme = "README.md" +requires-python = ">=3.8" +classifiers = [ + "Programming Language :: Python :: 3", + "License :: OSI Approved :: MIT License", + "Operating System :: OS Independent", +] + +[project.urls] +"Homepage" = "https://github.com/microsoft/autogen" +"Bug Tracker" = "https://github.com/microsoft/autogen/issues" + +[tool.hatch.build.targets.sdist] +packages = ["autogencap"] +only-packages = true + +[tool.hatch.build.targets.wheel] +packages = ["autogencap"] +only-packages = true