Skip to content

Commit

Permalink
[CAP] [Feature] Get list of actors from directory service. (microsoft…
Browse files Browse the repository at this point in the history
…#2073)

* Search directory for list of actors using regex '.*' gets all actors

* docs changes

* pre-commit fixes

* Use ActorInfo from protobuf

* pre-commit

* Added zmq tests to work on removing sleeps

* minor refactor of zmq tests

* 1) Change DirSvr to user Broker.  2) Add req-router to broker 3) In ActorConnector use handshake and req/resp to remove sleep

* 1) Change DirSvr to user Broker.  2) Add req-router to broker 3) In ActorConnector use handshake and req/resp to remove sleep

* move socket creation to thread with recv

* move socket creation to thread with recv

* Better logging for DirectorySvc

* better logging for directory svc

* Use logging config

* Start removing sleeps

* pre-commit

* Cleanup monitor socket
  • Loading branch information
rajan-chari authored and sharsha315 committed Mar 29, 2024
1 parent bdaa0a8 commit 9d03b98
Show file tree
Hide file tree
Showing 11 changed files with 319 additions and 81 deletions.
5 changes: 3 additions & 2 deletions samples/apps/cap/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ Actors can register themselves with CAP, find other agents, construct arbitrary
2) Many CAP Actors interacting with each other
3) A pair of interacting AutoGen Agents wrapped in CAP Actors
4) CAP wrapped AutoGen Agents in a group chat

5) Two AutoGen Agents running in different processes and communicating through CAP
6) List all registered agents in CAP
### Coming soon. Stay tuned! ###
1) Two AutoGen Agents running in different processes and communicating through CAP
1) AutoGen integration to list all registered agents
47 changes: 37 additions & 10 deletions samples/apps/cap/py/autogencap/ActorConnector.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,57 @@
# socket that can publish to that topic. It exposes this functionality
# using send_msg method
import zmq
from zmq.utils.monitor import recv_monitor_message
import time
import uuid
from .DebugLog import Debug, Error
from .Config import xsub_url, xpub_url
from .Config import xsub_url, xpub_url, router_url
from typing import Any, Dict


class ActorConnector:
def __init__(self, context, topic):
self._pub_socket = context.socket(zmq.PUB)
self._pub_socket.setsockopt(zmq.LINGER, 0)
self._pub_socket.connect(xsub_url)
self._context = context

self._resp_socket = context.socket(zmq.SUB)
self._resp_socket = self._context.socket(zmq.SUB)
self._resp_socket.setsockopt(zmq.LINGER, 0)
self._resp_socket.setsockopt(zmq.RCVTIMEO, 10000)
self._resp_socket.setsockopt(zmq.RCVTIMEO, 250)
self._resp_socket.connect(xpub_url)
self._resp_topic = str(uuid.uuid4())
Debug("AgentConnector", f"subscribe to: {self._resp_topic}")
self._resp_socket.setsockopt_string(zmq.SUBSCRIBE, f"{self._resp_topic}")
self._topic = topic
time.sleep(0.01) # Let the network do things.

self._connect_pub_socket()

def _send_recv_router_msg(self):
# Send a request to the router and wait for a response
req_socket = self._context.socket(zmq.REQ)
req_socket.connect(router_url)
try:
Debug("ActorConnector", "Broker Check Request Sent")
req_socket.send_string("Request")
_ = req_socket.recv_string()
Debug("ActorConnector", "Broker Check Response Received")
finally:
req_socket.close()

def _connect_pub_socket(self):
self._pub_socket = self._context.socket(zmq.PUB)
self._pub_socket.setsockopt(zmq.LINGER, 0)
monitor = self._pub_socket.get_monitor_socket()
self._pub_socket.connect(xsub_url)
# Monitor handshake on the pub socket
while monitor.poll():
evt: Dict[str, Any] = {}
mon_evt = recv_monitor_message(monitor)
evt.update(mon_evt)
if evt["event"] == zmq.EVENT_MONITOR_STOPPED or evt["event"] == zmq.EVENT_HANDSHAKE_SUCCEEDED:
Debug("ActorConnector", "Handshake received (Or Monitor stopped)")
break
self._pub_socket.disable_monitor()
monitor.close()
self._send_recv_router_msg()

def send_txt_msg(self, msg):
self._pub_socket.send_multipart(
Expand All @@ -35,14 +65,11 @@ def send_bin_msg(self, msg_type: str, msg):
)

def binary_request(self, msg_type: str, msg, retry=5):
time.sleep(0.5) # Let the network do things.
self._pub_socket.send_multipart(
[self._topic.encode("utf8"), msg_type.encode("utf8"), self._resp_topic.encode("utf8"), msg]
)
time.sleep(0.5) # Let the network do things.
for i in range(retry + 1):
try:
self._resp_socket.setsockopt(zmq.RCVTIMEO, 10000)
resp_topic, resp_msg_type, resp_sender_topic, resp = self._resp_socket.recv_multipart()
return resp_topic, resp_msg_type, resp_sender_topic, resp
except zmq.Again:
Expand Down
35 changes: 31 additions & 4 deletions samples/apps/cap/py/autogencap/Broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import zmq
import threading
from autogencap.DebugLog import Debug, Info, Warn
from autogencap.Config import xsub_url, xpub_url
from autogencap.Config import xsub_url, xpub_url, router_url


class Broker:
Expand All @@ -11,35 +11,45 @@ def __init__(self, context: zmq.Context = zmq.Context()):
self._run: bool = False
self._xpub: zmq.Socket = None
self._xsub: zmq.Socket = None
self._router: zmq.Socket = None

def start(self) -> bool:
def _init_sockets(self):
try:
# XPUB setup
self._xpub = self._context.socket(zmq.XPUB)
self._xpub.setsockopt(zmq.LINGER, 0)
self._xpub.bind(xpub_url)

# XSUB setup
self._xsub = self._context.socket(zmq.XSUB)
self._xsub.setsockopt(zmq.LINGER, 0)
self._xsub.bind(xsub_url)

# ROUTER setup
self._router = self._context.socket(zmq.ROUTER)
self._router.setsockopt(zmq.LINGER, 0)
self._router.bind(router_url)
return True
except zmq.ZMQError as e:
Debug("BROKER", f"Unable to start. Check details: {e}")
# If binding fails, close the sockets and return False
if self._xpub:
self._xpub.close()
if self._xsub:
self._xsub.close()
if self._router:
self._router.close()
return False

def start(self) -> bool:
Debug("BROKER", "Trying to start broker.")
self._run = True
self._broker_thread: threading.Thread = threading.Thread(target=self.thread_fn)
self._broker_thread.start()
time.sleep(0.01)
return True

def stop(self):
if not self._run:
return
# Error("BROKER_ERR", "fix cleanup self._context.term()")
Debug("BROKER", "stopped")
self._run = False
Expand All @@ -48,15 +58,24 @@ def stop(self):
self._xpub.close()
if self._xsub:
self._xsub.close()
if self._router:
self._router.close()
# self._context.term()

def thread_fn(self):
try:
if not self._init_sockets():
Debug("BROKER", "Receive thread not started since sockets were not initialized")
self._run = False
return

# Poll sockets for events
self._poller: zmq.Poller = zmq.Poller()
self._poller.register(self._xpub, zmq.POLLIN)
self._poller.register(self._xsub, zmq.POLLIN)
self._poller.register(self._router, zmq.POLLIN)

Info("BROKER", "Started. Waiting for events")
# Receive msgs, forward and process
while self._run:
events = dict(self._poller.poll(500))
Expand All @@ -70,6 +89,14 @@ def thread_fn(self):
Debug("BROKER", f"publishing message: {message}")
self._xpub.send_multipart(message)

if self._router in events:
message = self._router.recv_multipart()
Debug("BROKER", f"router message: {message}")
# Mirror it back for now to confirm connectivity
# More interesting reserved point to point
# routing coming in the the future
self._router.send_multipart(message)

except Exception as e:
Debug("BROKER", f"thread encountered an error: {e}")
finally:
Expand Down
2 changes: 2 additions & 0 deletions samples/apps/cap/py/autogencap/Config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@
IGNORED_LOG_CONTEXTS = []
xpub_url: str = "tcp://127.0.0.1:5555"
xsub_url: str = "tcp://127.0.0.1:5556"
router_url: str = "tcp://127.0.0.1:5557"
dealer_url: str = "tcp://127.0.0.1:5558"
12 changes: 7 additions & 5 deletions samples/apps/cap/py/autogencap/DebugLog.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import threading
import datetime
from autogencap.Config import LOG_LEVEL, IGNORED_LOG_CONTEXTS
import autogencap.Config as Config
from termcolor import colored

# Define log levels as constants
Expand All @@ -18,19 +18,21 @@

def Log(level, context, msg):
# Check if the current level meets the threshold
if level >= LOG_LEVEL: # Use the LOG_LEVEL from the Config module
if level >= Config.LOG_LEVEL: # Use the LOG_LEVEL from the Config module
# Check if the context is in the list of ignored contexts
if context in IGNORED_LOG_CONTEXTS:
if context in Config.IGNORED_LOG_CONTEXTS:
return
with console_lock:
timestamp = colored(datetime.datetime.now().strftime("%m/%d/%y %H:%M:%S"), "dark_grey")
# Translate level number to name and color
level_name = colored(LEVEL_NAMES[level], LEVEL_COLOR[level])
# Center justify the context and color it blue
# Left justify the context and color it blue
context = colored(context.ljust(14), "blue")
# Left justify the threadid and color it blue
thread_id = colored(str(threading.get_ident()).ljust(5), "blue")
# color the msg based on the level
msg = colored(msg, LEVEL_COLOR[level])
print(f"{threading.get_ident()} {timestamp} {level_name}: [{context}] {msg}")
print(f"{thread_id} {timestamp} {level_name}: [{context}] {msg}")


def Debug(context, message):
Expand Down
98 changes: 44 additions & 54 deletions samples/apps/cap/py/autogencap/DirectorySvc.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,22 @@
from autogencap.Constants import Directory_Svc_Topic
from autogencap.Config import xpub_url, xsub_url
from autogencap.Config import xpub_url, xsub_url, router_url
from autogencap.DebugLog import Debug, Info, Error
from autogencap.ActorConnector import ActorConnector
from autogencap.Actor import Actor
from autogencap.proto.CAP_pb2 import ActorRegistration, ActorInfo, ActorLookup, ActorLookupResponse, Ping, Pong
from autogencap.Broker import Broker
from autogencap.proto.CAP_pb2 import (
ActorRegistration,
ActorInfo,
ActorLookup,
ActorLookupResponse,
Ping,
Pong,
ActorInfoCollection,
)
import zmq
import threading
import time
import re

# TODO (Future DirectorySv PR) use actor description, network_id, other properties to make directory
# service more generic and powerful
Expand Down Expand Up @@ -52,18 +62,27 @@ def _actor_lookup_msg_handler(self, topic: str, msg_type: str, msg: bytes, sende
actor_lookup = ActorLookup()
actor_lookup.ParseFromString(msg)
Debug("DirectorySvc", f"Actor lookup: {actor_lookup.actor_info.name}")
actor: ActorInfo = None
if actor_lookup.actor_info.name in self._registered_actors:
Info("DirectorySvc", f"Actor found: {actor_lookup.actor_info.name}")
actor = self._registered_actors[actor_lookup.actor_info.name]
else:
Error("DirectorySvc", f"Actor not found: {actor_lookup.actor_info.name}")
actor_lookup_resp = ActorLookupResponse()
if actor is not None:
actor_lookup_resp.actor.info_coll.extend([actor])
actor_lookup_resp.found = False
try:
pattern = re.compile(actor_lookup.actor_info.name)
except re.error:
Error("DirectorySvc", f"Invalid regex pattern: {actor_lookup.actor_info.name}")
else:
found_actor_list = [
self._registered_actors[registered_actor]
for registered_actor in self._registered_actors
if pattern.match(registered_actor)
]

if found_actor_list:
for actor in found_actor_list:
Info("DirectorySvc", f"Actor found: {actor.name}")
actor_lookup_resp.found = True
actor_lookup_resp.actor.info_coll.extend(found_actor_list)
else:
actor_lookup_resp.found = False
Error("DirectorySvc", f"Actor not found: {actor_lookup.actor_info.name}")

sender_connection = ActorConnector(self._context, sender_topic)
serialized_msg = actor_lookup_resp.SerializeToString()
sender_connection.send_bin_msg(ActorLookupResponse.__name__, serialized_msg)
Expand All @@ -76,6 +95,7 @@ def __init__(self, context: zmq.Context = zmq.Context()):
self._directory_actor: DirectoryActor = None

def _no_other_directory(self) -> bool:
Debug("DirectorySvc", "Pinging existing DirectorySvc")
ping = Ping()
serialized_msg = ping.SerializeToString()
_, _, _, resp = self._directory_connector.binary_request(Ping.__name__, serialized_msg, retry=0)
Expand Down Expand Up @@ -110,69 +130,39 @@ def register_actor_by_name(self, actor_name: str):
actor_info = ActorInfo(name=actor_name)
self.register_actor(actor_info)

def lookup_actor_by_name(self, actor_name: str) -> ActorInfo:
actor_info = ActorInfo(name=actor_name)
def _lookup_actors_by_name(self, name_regex: str):
actor_info = ActorInfo(name=name_regex)
actor_lookup = ActorLookup(actor_info=actor_info)
serialized_msg = actor_lookup.SerializeToString()
_, _, _, resp = self._directory_connector.binary_request(ActorLookup.__name__, serialized_msg)
actor_lookup_resp = ActorLookupResponse()
actor_lookup_resp.ParseFromString(resp)
return actor_lookup_resp

def lookup_actor_by_name(self, actor_name: str) -> ActorInfo:
actor_lookup_resp = self._lookup_actors_by_name(actor_name)
if actor_lookup_resp.found:
if len(actor_lookup_resp.actor.info_coll) > 0:
return actor_lookup_resp.actor.info_coll[0]
return None


# Standalone min proxy for a standalone directory service
class MinProxy:
def __init__(self, context: zmq.Context):
self._context: zmq.Context = context
self._xpub: zmq.Socket = None
self._xsub: zmq.Socket = None

def start(self):
# Start the proxy thread
proxy_thread = threading.Thread(target=self.proxy_thread_fn)
proxy_thread.start()
time.sleep(0.01)

def stop(self):
self._xsub.setsockopt(zmq.LINGER, 0)
self._xpub.setsockopt(zmq.LINGER, 0)
self._xpub.close()
self._xsub.close()
time.sleep(0.01)

def proxy_thread_fn(self):
self._xpub: zmq.Socket = self._context.socket(zmq.XPUB)
self._xsub: zmq.Socket = self._context.socket(zmq.XSUB)
try:
self._xpub.bind(xpub_url)
self._xsub.bind(xsub_url)
zmq.proxy(self._xpub, self._xsub)
except zmq.ContextTerminated:
self._xpub.close()
self._xsub.close()
except Exception as e:
Error("proxy_thread_fn", f"proxy_thread_fn encountered an error: {e}")
self._xpub.setsockopt(zmq.LINGER, 0)
self._xsub.setsockopt(zmq.LINGER, 0)
self._xpub.close()
self._xsub.close()
finally:
Info("proxy_thread_fn", "proxy_thread_fn terminated.")
def lookup_actor_info_by_name(self, actor_name: str) -> ActorInfoCollection:
actor_lookup_resp = self._lookup_actors_by_name(actor_name)
if actor_lookup_resp.found:
if len(actor_lookup_resp.actor.info_coll) > 0:
return actor_lookup_resp.actor
return None


# Run a standalone directory service
def main():
context: zmq.Context = zmq.Context()
# Start simple broker (will exit if real broker is running)
proxy: MinProxy = MinProxy(context)
proxy: Broker = Broker(context)
proxy.start()
# Start the directory service
directory_svc = DirectorySvc(context)
directory_svc.start()

# # How do you register an actor?
# directory_svc.register_actor_by_name("my_actor")
#
Expand Down
Loading

0 comments on commit 9d03b98

Please sign in to comment.