Skip to content

Commit

Permalink
[CAP] Refactor: Better Names for classes and methods (#2734)
Browse files Browse the repository at this point in the history
* Bug fix

* Refactor: Better class names, method names

* pypi version

* pre-commit fixes
  • Loading branch information
rajan-chari authored May 21, 2024
1 parent e208f7c commit 9f33724
Show file tree
Hide file tree
Showing 21 changed files with 128 additions and 129 deletions.
12 changes: 6 additions & 6 deletions samples/apps/cap/py/autogencap/Actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@ def __init__(self, agent_name: str, description: str):
self.run = False
self._start_event = threading.Event()

def connect_network(self, network):
def on_connect(self, network):
Debug(self.actor_name, f"is connecting to {network}")
Debug(self.actor_name, "connected")

def _process_txt_msg(self, msg: str, msg_type: str, topic: str, sender: str) -> bool:
def on_txt_msg(self, msg: str, msg_type: str, receiver: str, sender: str) -> bool:
Info(self.actor_name, f"InBox: {msg}")
return True

def _process_bin_msg(self, msg: bytes, msg_type: str, topic: str, sender: str) -> bool:
Info(self.actor_name, f"Msg: topic=[{topic}], msg_type=[{msg_type}]")
def on_bin_msg(self, msg: bytes, msg_type: str, receiver: str, sender: str) -> bool:
Info(self.actor_name, f"Msg: receiver=[{receiver}], msg_type=[{msg_type}]")
return True

def _recv_thread(self):
Expand All @@ -51,12 +51,12 @@ def _recv_thread(self):
continue
if msg_type == "text":
msg = msg.decode("utf-8") # Convert bytes to string
if not self._process_txt_msg(msg, msg_type, topic, sender_topic):
if not self.on_txt_msg(msg, msg_type, topic, sender_topic):
msg = "quit"
if msg.lower() == "quit":
break
else:
if not self._process_bin_msg(msg, msg_type, topic, sender_topic):
if not self.on_bin_msg(msg, msg_type, topic, sender_topic):
break
except Exception as e:
Debug(self.actor_name, f"recv thread encountered an error: {e}")
Expand Down
4 changes: 2 additions & 2 deletions samples/apps/cap/py/autogencap/ActorConnector.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def send_txt_msg(self, msg):
def send_bin_msg(self, msg_type: str, msg):
self._sender.send_bin_msg(msg_type, msg)

def binary_request(self, msg_type: str, msg, num_attempts=5):
def send_recv_msg(self, msg_type: str, msg, num_attempts=5):
original_timeout: int = 0
if num_attempts == -1:
original_timeout = self._resp_socket.getsockopt(zmq.RCVTIMEO)
Expand Down Expand Up @@ -148,5 +148,5 @@ def binary_request(self, msg_type: str, msg, num_attempts=5):
return None, None, None

def close(self):
self._pub_socket.close()
self._sender.close()
self._resp_socket.close()
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# TODO: remove time import


class LocalActorNetwork:
class ComponentEnsemble:
def __init__(self, name: str = "Local Actor Network", start_broker: bool = True):
self.local_actors = {}
self.name: str = name
Expand Down Expand Up @@ -49,7 +49,7 @@ def register(self, actor: Actor):
def connect(self):
self._init_runtime()
for actor in self.local_actors.values():
actor.connect_network(self)
actor.on_connect(self)

def disconnect(self):
for actor in self.local_actors.values():
Expand All @@ -59,22 +59,22 @@ def disconnect(self):
if self._broker:
self._broker.stop()

def actor_connector_by_topic(self, topic: str) -> ActorConnector:
def find_by_topic(self, topic: str) -> ActorConnector:
return ActorConnector(self._context, topic)

def lookup_actor(self, name: str) -> ActorConnector:
def find_by_name(self, name: str) -> ActorConnector:
actor_info: ActorInfo = self._directory_svc.lookup_actor_by_name(name)
if actor_info is None:
Warn("Local_Actor_Network", f"{name}, not found in the network.")
return None
Debug("Local_Actor_Network", f"[{name}] found in the network.")
return self.actor_connector_by_topic(name)
return self.find_by_topic(name)

def lookup_termination(self) -> ActorConnector:
def find_termination(self) -> ActorConnector:
termination_topic: str = Termination_Topic
return self.actor_connector_by_topic(termination_topic)
return self.find_by_topic(termination_topic)

def lookup_actor_info(self, name_regex) -> List[ActorInfo]:
def find_by_name_regex(self, name_regex) -> List[ActorInfo]:
actor_info: ActorInfoCollection = self._directory_svc.lookup_actor_info_by_name(name_regex)
if actor_info is None:
Warn("Local_Actor_Network", f"{name_regex}, not found in the network.")
Expand Down
2 changes: 1 addition & 1 deletion samples/apps/cap/py/autogencap/DebugLog.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def __init__(self):
super().__init__()

def WriteLog(self, level, context, msg):
timestamp = colored(datetime.datetime.now().strftime("%m/%d/%y %H:%M:%S"), "pink")
timestamp = colored(datetime.datetime.now().strftime("%m/%d/%y %H:%M:%S"), "dark_grey")
# Translate level number to name and color
level_name = colored(LEVEL_NAMES[level], LEVEL_COLOR[level])
# Left justify the context and color it blue
Expand Down
20 changes: 10 additions & 10 deletions samples/apps/cap/py/autogencap/DirectorySvc.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,25 +35,25 @@ def __init__(self, topic: str, name: str):
self._registered_actors = {}
self._network_prefix = ""

def _process_bin_msg(self, msg: bytes, msg_type: str, topic: str, sender: str) -> bool:
def on_bin_msg(self, msg: bytes, msg_type: str, topic: str, sender: str) -> bool:
if msg_type == ActorRegistration.__name__:
self._actor_registration_msg_handler(topic, msg_type, msg, sender)
self._on_actor_registration_msg(topic, msg_type, msg, sender)
elif msg_type == ActorLookup.__name__:
self._actor_lookup_msg_handler(topic, msg_type, msg, sender)
self._on_actor_lookup_msg(topic, msg_type, msg, sender)
elif msg_type == Ping.__name__:
self._ping_msg_handler(topic, msg_type, msg, sender)
self._on_ping_msg(topic, msg_type, msg, sender)
else:
Error("DirectorySvc", f"Unknown message type: {msg_type}")
return True

def _ping_msg_handler(self, topic: str, msg_type: str, msg: bytes, sender_topic: str):
def _on_ping_msg(self, topic: str, msg_type: str, msg: bytes, sender_topic: str):
Info("DirectorySvc", f"Ping received: {sender_topic}")
pong = Pong()
serialized_msg = pong.SerializeToString()
sender_connection = ActorSender(self._context, sender_topic)
sender_connection.send_bin_msg(Pong.__name__, serialized_msg)

def _actor_registration_msg_handler(self, topic: str, msg_type: str, msg: bytes, sender_topic: str):
def _on_actor_registration_msg(self, topic: str, msg_type: str, msg: bytes, sender_topic: str):
actor_reg = ActorRegistration()
actor_reg.ParseFromString(msg)
Info("DirectorySvc", f"Actor registration: {actor_reg.actor_info.name}")
Expand All @@ -71,7 +71,7 @@ def _actor_registration_msg_handler(self, topic: str, msg_type: str, msg: bytes,
serialized_msg = err.SerializeToString()
sender_connection.send_bin_msg(ErrorMsg.__name__, serialized_msg)

def _actor_lookup_msg_handler(self, topic: str, msg_type: str, msg: bytes, sender_topic: str):
def _on_actor_lookup_msg(self, topic: str, msg_type: str, msg: bytes, sender_topic: str):
actor_lookup = ActorLookup()
actor_lookup.ParseFromString(msg)
Debug("DirectorySvc", f"Actor lookup: {actor_lookup.actor_info.name}")
Expand Down Expand Up @@ -111,7 +111,7 @@ def _no_other_directory(self) -> bool:
Debug("DirectorySvc", "Pinging existing DirectorySvc")
ping = Ping()
serialized_msg = ping.SerializeToString()
_, _, resp = self._directory_connector.binary_request(Ping.__name__, serialized_msg, retry=1)
_, _, resp = self._directory_connector.send_recv_msg(Ping.__name__, serialized_msg, num_attempts=1)
if resp is None:
return True
return False
Expand All @@ -138,7 +138,7 @@ def register_actor(self, actor_info: ActorInfo):
actor_reg = ActorRegistration()
actor_reg.actor_info.CopyFrom(actor_info)
serialized_msg = actor_reg.SerializeToString()
_, _, resp = self._directory_connector.binary_request(ActorRegistration.__name__, serialized_msg)
_, _, resp = self._directory_connector.send_recv_msg(ActorRegistration.__name__, serialized_msg)
report_error_msg(resp, "DirectorySvc")

def register_actor_by_name(self, actor_name: str):
Expand All @@ -149,7 +149,7 @@ def _lookup_actors_by_name(self, name_regex: str):
actor_info = ActorInfo(name=name_regex)
actor_lookup = ActorLookup(actor_info=actor_info)
serialized_msg = actor_lookup.SerializeToString()
_, _, resp = self._directory_connector.binary_request(ActorLookup.__name__, serialized_msg)
_, _, resp = self._directory_connector.send_recv_msg(ActorLookup.__name__, serialized_msg)
actor_lookup_resp = ActorLookupResponse()
actor_lookup_resp.ParseFromString(resp)
return actor_lookup_resp
Expand Down
10 changes: 5 additions & 5 deletions samples/apps/cap/py/autogencap/ag_adapter/AG2CAP.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from autogen import Agent, ConversableAgent

from ..LocalActorNetwork import LocalActorNetwork
from ..ComponentEnsemble import ComponentEnsemble
from .AutoGenConnector import AutoGenConnector


Expand All @@ -14,13 +14,13 @@ class AG2CAP(ConversableAgent):

def __init__(
self,
network: LocalActorNetwork,
ensemble: ComponentEnsemble,
agent_name: str,
agent_description: Optional[str] = None,
):
super().__init__(name=agent_name, description=agent_description, llm_config=False)
self._agent_connector: AutoGenConnector = None
self._network: LocalActorNetwork = network
self._ensemble: ComponentEnsemble = ensemble
self._recv_called = False

def reset_receive_called(self):
Expand All @@ -38,8 +38,8 @@ def set_name(self, name: str):

def _check_connection(self):
if self._agent_connector is None:
self._agent_connector = AutoGenConnector(self._network.lookup_actor(self.name))
self._terminate_connector = AutoGenConnector(self._network.lookup_termination())
self._agent_connector = AutoGenConnector(self._ensemble.find_by_name(self.name))
self._terminate_connector = AutoGenConnector(self._ensemble.find_termination())

def receive(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def send_gen_reply_req(self):
# Setting retry to -1 to keep trying until a response is received
# This normal AutoGen behavior but does not handle the case when an AutoGen agent
# is not running. In that case, the connector will keep trying indefinitely.
_, _, resp = self._can_channel.binary_request(type(msg).__name__, serialized_msg, num_attempts=-1)
_, _, resp = self._can_channel.send_recv_msg(type(msg).__name__, serialized_msg, num_attempts=-1)
gen_reply_resp = GenReplyResp()
gen_reply_resp.ParseFromString(resp)
return gen_reply_resp.data
Expand Down
34 changes: 17 additions & 17 deletions samples/apps/cap/py/autogencap/ag_adapter/CAP2AG.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

from autogen import ConversableAgent

from ..ComponentEnsemble import ComponentEnsemble
from ..DebugLog import Debug, Error, Info, Warn, shorten
from ..LocalActorNetwork import LocalActorNetwork
from ..proto.Autogen_pb2 import GenReplyReq, GenReplyResp, PrepChat, ReceiveReq, Terminate
from .AG2CAP import AG2CAP
from .AGActor import AGActor
Expand All @@ -27,26 +27,26 @@ def __init__(self, ag_agent: ConversableAgent, the_other_name: str, init_chat: b
self.STATE = self.States.INIT
self._can2ag_name: str = self.actor_name + ".can2ag"
self._self_recursive: bool = self_recursive
self._network: LocalActorNetwork = None
self._ensemble: ComponentEnsemble = None
self._connectors = {}

def connect_network(self, network: LocalActorNetwork):
def on_connect(self, ensemble: ComponentEnsemble):
"""
Connect to the AutoGen system.
"""
self._network = network
self._ag2can_other_agent = AG2CAP(self._network, self._other_agent_name)
Debug(self._can2ag_name, "connected to {network}")
self._ensemble = ensemble
self._ag2can_other_agent = AG2CAP(self._ensemble, self._other_agent_name)
Debug(self._can2ag_name, "connected to {ensemble}")

def disconnect_network(self, network: LocalActorNetwork):
def disconnect_network(self, ensemble: ComponentEnsemble):
"""
Disconnect from the AutoGen system.
"""
super().disconnect_network(network)
super().disconnect_network(ensemble)
# self._the_other.close()
Debug(self.actor_name, "disconnected")

def _process_txt_msg(self, msg: str, msg_type: str, topic: str, sender: str):
def on_txt_msg(self, msg: str, msg_type: str, topic: str, sender: str):
"""
Process a text message received from the AutoGen system.
"""
Expand Down Expand Up @@ -83,7 +83,7 @@ def _call_agent_receive(self, receive_params: ReceiveReq):
self._the_ag_agent.receive(data, self._ag2can_other_agent, request_reply, silent)
self._ag2can_other_agent.set_name(save_name)

def receive_msgproc(self, msg: bytes):
def on_receive_msg(self, msg: bytes):
"""
Process a ReceiveReq message received from the AutoGen system.
"""
Expand Down Expand Up @@ -117,11 +117,11 @@ def get_actor_connector(self, topic: str):
if topic in self._connectors:
return self._connectors[topic]
else:
connector = self._network.actor_connector_by_topic(topic)
connector = self._ensemble.find_by_topic(topic)
self._connectors[topic] = connector
return connector

def generate_reply_msgproc(self, msg: GenReplyReq, sender_topic: str):
def on_generate_reply_msg(self, msg: GenReplyReq, sender_topic: str):
"""
Process a GenReplyReq message received from the AutoGen system and generate a reply.
"""
Expand All @@ -137,23 +137,23 @@ def generate_reply_msgproc(self, msg: GenReplyReq, sender_topic: str):
connector.send_bin_msg(type(reply_msg).__name__, serialized_msg)
return True

def prepchat_msgproc(self, msg, sender_topic):
def on_prepchat_msg(self, msg, sender_topic):
prep_chat = PrepChat()
prep_chat.ParseFromString(msg)
self._the_ag_agent._prepare_chat(self._ag2can_other_agent, prep_chat.clear_history, prep_chat.prepare_recipient)
return True

def _process_bin_msg(self, msg: bytes, msg_type: str, topic: str, sender: str):
def on_bin_msg(self, msg: bytes, msg_type: str, topic: str, sender: str):
"""
Process a binary message received from the AutoGen system.
"""
Info(self._can2ag_name, f"proc_bin_msg: topic=[{topic}], msg_type=[{msg_type}]")
if msg_type == ReceiveReq.__name__:
return self.receive_msgproc(msg)
return self.on_receive_msg(msg)
elif msg_type == GenReplyReq.__name__:
return self.generate_reply_msgproc(msg, sender)
return self.on_generate_reply_msg(msg, sender)
elif msg_type == PrepChat.__name__:
return self.prepchat_msgproc(msg, sender)
return self.on_prepchat_msg(msg, sender)
elif msg_type == Terminate.__name__:
Warn(self._can2ag_name, f"TERMINATE received: topic=[{topic}], msg_type=[{msg_type}]")
return False
Expand Down
6 changes: 3 additions & 3 deletions samples/apps/cap/py/autogencap/ag_adapter/CAPGroupChat.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from autogen import Agent, AssistantAgent, GroupChat
from autogencap.ag_adapter.AG2CAP import AG2CAP
from autogencap.ag_adapter.CAP2AG import CAP2AG
from autogencap.LocalActorNetwork import LocalActorNetwork
from autogencap.ComponentEnsemble import ComponentEnsemble


class CAPGroupChat(GroupChat):
Expand All @@ -13,10 +13,10 @@ def __init__(
messages: List[str],
max_round: int,
chat_initiator: str,
network: LocalActorNetwork,
ensemble: ComponentEnsemble,
):
self.chat_initiator: str = chat_initiator
self._cap_network: LocalActorNetwork = network
self._cap_network: ComponentEnsemble = ensemble
self._cap_proxies: List[CAP2AG] = []
self._ag_proxies: List[AG2CAP] = []
self._ag_agents: List[Agent] = agents
Expand Down
12 changes: 6 additions & 6 deletions samples/apps/cap/py/autogencap/ag_adapter/CAPGroupChatManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
from autogencap.ActorConnector import ActorConnector
from autogencap.ag_adapter.CAP2AG import CAP2AG
from autogencap.ag_adapter.CAPGroupChat import CAPGroupChat
from autogencap.LocalActorNetwork import LocalActorNetwork
from autogencap.ComponentEnsemble import ComponentEnsemble


class CAPGroupChatManager:
def __init__(self, groupchat: CAPGroupChat, llm_config: dict, network: LocalActorNetwork):
self._network: LocalActorNetwork = network
def __init__(self, groupchat: CAPGroupChat, llm_config: dict, network: ComponentEnsemble):
self._ensemble: ComponentEnsemble = network
self._cap_group_chat: CAPGroupChat = groupchat
self._ag_group_chat_manager: GroupChatManager = GroupChatManager(
groupchat=self._cap_group_chat, llm_config=llm_config
Expand All @@ -20,11 +20,11 @@ def __init__(self, groupchat: CAPGroupChat, llm_config: dict, network: LocalActo
init_chat=False,
self_recursive=True,
)
self._network.register(self._cap_proxy)
self._ensemble.register(self._cap_proxy)

def initiate_chat(self, txt_msg: str) -> None:
self._network.connect()
user_proxy_conn: ActorConnector = self._network.lookup_actor(self._cap_group_chat.chat_initiator)
self._ensemble.connect()
user_proxy_conn: ActorConnector = self._ensemble.find_by_name(self._cap_group_chat.chat_initiator)
user_proxy_conn.send_txt_msg(txt_msg)
self._wait_for_user_exit()

Expand Down
2 changes: 1 addition & 1 deletion samples/apps/cap/py/autogencap/ag_adapter/CAPPair.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def initiate_chat(self, message: str):
self._network.connect()

# Send a message to the user_proxy
agent_connection = self._network.lookup_actor(self._first_ag_agent.name)
agent_connection = self._network.find_by_name(self._first_ag_agent.name)
agent_connection.send_txt_msg(message)

def running(self):
Expand Down
Loading

0 comments on commit 9f33724

Please sign in to comment.