Skip to content

Commit 27aaab8

Browse files
authored
Reduce sleep() in CAP library code (microsoft#2189)
* 1) Removed most framework sleeps 2) refactored connection code * pre-commit fixes * pre-commit * ignore protobuf files in pre-commit checks * Fix duplicate actor registration * refactor change * Nicer printing of Actors * 1) Report recv_multipart errors 4) Always send 4 parts * AutoGen generate_reply expects to wait indefinitely for an answer. CAP can wait a certain amount and give up. In order to reconcile the two, AutoGenConnector is set to wait indefinitely. * pre-commit formatting fixes * pre-commit format changes * don't check autogenerated proto py files
1 parent fa7da31 commit 27aaab8

16 files changed

+242
-86
lines changed

pyproject.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ exclude = [
5252
# This file needs to be either upgraded or removed and therefore should be
5353
# ignore from type checking for now
5454
"math_utils\\.py$",
55-
"samples\\apps\\cap\\py\\autogencap\\proto\\.*\\.py",
55+
"**/cap/py/autogencap/proto/*",
5656
]
5757
ignore-init-module-imports = true
5858
unfixable = ["F401"]

samples/apps/cap/py/autogencap/Actor.py

+17-10
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import threading
33
import traceback
44
import time
5-
from .DebugLog import Debug, Info
5+
from .DebugLog import Debug, Info, Error
66
from .Config import xpub_url
77

88

@@ -11,6 +11,7 @@ def __init__(self, agent_name: str, description: str):
1111
self.actor_name: str = agent_name
1212
self.agent_description: str = description
1313
self.run = False
14+
self._start_event = threading.Event()
1415

1516
def connect_network(self, network):
1617
Debug(self.actor_name, f"is connecting to {network}")
@@ -25,14 +26,15 @@ def _process_bin_msg(self, msg: bytes, msg_type: str, topic: str, sender: str) -
2526
return True
2627

2728
def _recv_thread(self):
28-
Debug(self.actor_name, "recv thread started")
29-
self._socket: zmq.Socket = self._context.socket(zmq.SUB)
30-
self._socket.setsockopt(zmq.RCVTIMEO, 500)
31-
self._socket.connect(xpub_url)
32-
str_topic = f"{self.actor_name}"
33-
Debug(self.actor_name, f"subscribe to: {str_topic}")
34-
self._socket.setsockopt_string(zmq.SUBSCRIBE, f"{str_topic}")
3529
try:
30+
Debug(self.actor_name, "recv thread started")
31+
self._socket: zmq.Socket = self._context.socket(zmq.SUB)
32+
self._socket.setsockopt(zmq.RCVTIMEO, 500)
33+
self._socket.connect(xpub_url)
34+
str_topic = f"{self.actor_name}"
35+
Debug(self.actor_name, f"subscribe to: {str_topic}")
36+
self._socket.setsockopt_string(zmq.SUBSCRIBE, f"{str_topic}")
37+
self._start_event.set()
3638
while self.run:
3739
try:
3840
topic, msg_type, sender_topic, msg = self._socket.recv_multipart()
@@ -41,7 +43,9 @@ def _recv_thread(self):
4143
sender_topic = sender_topic.decode("utf-8") # Convert bytes to string
4244
except zmq.Again:
4345
continue # No message received, continue to next iteration
44-
except Exception:
46+
except Exception as e:
47+
Error(self.actor_name, f"recv thread encountered an error: {e}")
48+
traceback.print_exc()
4549
continue
4650
if msg_type == "text":
4751
msg = msg.decode("utf-8") # Convert bytes to string
@@ -57,14 +61,17 @@ def _recv_thread(self):
5761
traceback.print_exc()
5862
finally:
5963
self.run = False
64+
# In case there was an exception at startup signal
65+
# the main thread.
66+
self._start_event.set()
6067
Debug(self.actor_name, "recv thread ended")
6168

6269
def start(self, context: zmq.Context):
6370
self._context = context
6471
self.run: bool = True
6572
self._thread = threading.Thread(target=self._recv_thread)
6673
self._thread.start()
67-
time.sleep(0.01)
74+
self._start_event.wait()
6875

6976
def disconnect_network(self, network):
7077
Debug(self.actor_name, f"is disconnecting from {network}")

samples/apps/cap/py/autogencap/ActorConnector.py

+101-39
Original file line numberDiff line numberDiff line change
@@ -5,80 +5,142 @@
55
from zmq.utils.monitor import recv_monitor_message
66
import time
77
import uuid
8-
from .DebugLog import Debug, Error
8+
from .DebugLog import Debug, Error, Info
99
from .Config import xsub_url, xpub_url, router_url
1010
from typing import Any, Dict
1111

1212

13-
class ActorConnector:
13+
class ActorSender:
1414
def __init__(self, context, topic):
1515
self._context = context
16-
17-
self._resp_socket = self._context.socket(zmq.SUB)
18-
self._resp_socket.setsockopt(zmq.LINGER, 0)
19-
self._resp_socket.setsockopt(zmq.RCVTIMEO, 250)
20-
self._resp_socket.connect(xpub_url)
21-
self._resp_topic = str(uuid.uuid4())
22-
Debug("AgentConnector", f"subscribe to: {self._resp_topic}")
23-
self._resp_socket.setsockopt_string(zmq.SUBSCRIBE, f"{self._resp_topic}")
2416
self._topic = topic
25-
2617
self._connect_pub_socket()
2718

28-
def _send_recv_router_msg(self):
29-
# Send a request to the router and wait for a response
30-
req_socket = self._context.socket(zmq.REQ)
31-
req_socket.connect(router_url)
32-
try:
33-
Debug("ActorConnector", "Broker Check Request Sent")
34-
req_socket.send_string("Request")
35-
_ = req_socket.recv_string()
36-
Debug("ActorConnector", "Broker Check Response Received")
37-
finally:
38-
req_socket.close()
39-
4019
def _connect_pub_socket(self):
20+
Debug("ActorSender", f"Connecting pub socket {self._topic}")
4121
self._pub_socket = self._context.socket(zmq.PUB)
42-
self._pub_socket.setsockopt(zmq.LINGER, 0)
4322
monitor = self._pub_socket.get_monitor_socket()
23+
self._pub_socket.setsockopt(zmq.LINGER, 0)
4424
self._pub_socket.connect(xsub_url)
4525
# Monitor handshake on the pub socket
4626
while monitor.poll():
4727
evt: Dict[str, Any] = {}
4828
mon_evt = recv_monitor_message(monitor)
4929
evt.update(mon_evt)
5030
if evt["event"] == zmq.EVENT_MONITOR_STOPPED or evt["event"] == zmq.EVENT_HANDSHAKE_SUCCEEDED:
51-
Debug("ActorConnector", "Handshake received (Or Monitor stopped)")
31+
Debug("ActorSender", "Handshake received (Or Monitor stopped)")
5232
break
5333
self._pub_socket.disable_monitor()
5434
monitor.close()
5535
self._send_recv_router_msg()
5636

37+
def _send_recv_router_msg(self):
38+
# Send a request to the router and wait for a response
39+
req_socket = self._context.socket(zmq.REQ)
40+
req_socket.connect(router_url)
41+
try:
42+
Debug("ActorSender", "Broker Check Request Sent")
43+
req_socket.send_string("Request")
44+
_ = req_socket.recv_string()
45+
Debug("ActorSender", "Broker Check Response Received")
46+
finally:
47+
req_socket.close()
48+
5749
def send_txt_msg(self, msg):
50+
Debug("ActorSender", f"[{self._topic}] send_txt_msg: {msg}")
5851
self._pub_socket.send_multipart(
59-
[self._topic.encode("utf8"), "text".encode("utf8"), self._resp_topic.encode("utf8"), msg.encode("utf8")]
52+
[self._topic.encode("utf8"), "text".encode("utf8"), "no_resp".encode("utf8"), msg.encode("utf8")]
6053
)
6154

6255
def send_bin_msg(self, msg_type: str, msg):
56+
Debug("ActorSender", f"[{self._topic}] send_bin_msg: {msg_type}")
6357
self._pub_socket.send_multipart(
64-
[self._topic.encode("utf8"), msg_type.encode("utf8"), self._resp_topic.encode("utf8"), msg]
58+
[self._topic.encode("utf8"), msg_type.encode("utf8"), "no_resp".encode("utf8"), msg]
6559
)
6660

67-
def binary_request(self, msg_type: str, msg, retry=5):
61+
def send_bin_request_msg(self, msg_type: str, msg, resp_topic: str):
62+
Debug("ActorSender", f"[{self._topic}] send_bin_request_msg: {msg_type}")
6863
self._pub_socket.send_multipart(
69-
[self._topic.encode("utf8"), msg_type.encode("utf8"), self._resp_topic.encode("utf8"), msg]
64+
[self._topic.encode("utf8"), msg_type.encode("utf8"), resp_topic.encode("utf8"), msg]
7065
)
71-
for i in range(retry + 1):
72-
try:
73-
resp_topic, resp_msg_type, resp_sender_topic, resp = self._resp_socket.recv_multipart()
74-
return resp_topic, resp_msg_type, resp_sender_topic, resp
75-
except zmq.Again:
76-
Debug("ActorConnector", f"binary_request: No response received. retry_count={i}, max_retry={retry}")
77-
time.sleep(0.01) # Wait a bit before retrying
78-
continue
79-
Error("ActorConnector", "binary_request: No response received. Giving up.")
80-
return None, None, None, None
8166

8267
def close(self):
8368
self._pub_socket.close()
69+
70+
71+
class ActorConnector:
72+
def __init__(self, context, topic):
73+
self._context = context
74+
self._topic = topic
75+
self._connect_sub_socket()
76+
self._sender = ActorSender(context, topic)
77+
time.sleep(0.1) # Wait for the socket to connect
78+
79+
def _connect_sub_socket(self):
80+
self._resp_socket = self._context.socket(zmq.SUB)
81+
monitor = self._resp_socket.get_monitor_socket()
82+
self._resp_socket.setsockopt(zmq.LINGER, 0)
83+
self._resp_socket.setsockopt(zmq.RCVTIMEO, 250)
84+
self._resp_socket.connect(xpub_url)
85+
self._resp_topic = str(uuid.uuid4())
86+
Debug("ActorConnector", f"subscribe to: {self._resp_topic}")
87+
self._resp_socket.setsockopt_string(zmq.SUBSCRIBE, f"{self._resp_topic}")
88+
while monitor.poll():
89+
evt: Dict[str, Any] = {}
90+
mon_evt = recv_monitor_message(monitor)
91+
evt.update(mon_evt)
92+
Debug("ActorConnector", evt)
93+
if evt["event"] == zmq.EVENT_MONITOR_STOPPED or evt["event"] == zmq.EVENT_HANDSHAKE_SUCCEEDED:
94+
Debug("ActorConnector", "Handshake received (Or Monitor stopped)")
95+
break
96+
self._resp_socket.disable_monitor()
97+
monitor.close()
98+
self._send_recv_router_msg()
99+
100+
def _send_recv_router_msg(self):
101+
# Send a request to the router and wait for a response
102+
req_socket = self._context.socket(zmq.REQ)
103+
req_socket.connect(router_url)
104+
try:
105+
Debug("ActorConnector", "Broker Check Request Sent")
106+
req_socket.send_string("Request")
107+
_ = req_socket.recv_string()
108+
Debug("ActorConnector", "Broker Check Response Received")
109+
finally:
110+
req_socket.close()
111+
112+
def send_txt_msg(self, msg):
113+
self._sender.send_txt_msg(msg)
114+
115+
def send_bin_msg(self, msg_type: str, msg):
116+
self._sender.send_bin_msg(msg_type, msg)
117+
118+
def binary_request(self, msg_type: str, msg, retry=5):
119+
original_timeout: int = 0
120+
if retry == -1:
121+
original_timeout = self._resp_socket.getsockopt(zmq.RCVTIMEO)
122+
self._resp_socket.setsockopt(zmq.RCVTIMEO, 1000)
123+
124+
try:
125+
self._sender.send_bin_request_msg(msg_type, msg, self._resp_topic)
126+
while retry == -1 or retry > 0:
127+
try:
128+
topic, resp_msg_type, _, resp = self._resp_socket.recv_multipart()
129+
return topic, resp_msg_type, resp
130+
except zmq.Again:
131+
Debug(
132+
"ActorConnector", f"{self._topic}: No response received. retry_count={retry}, max_retry={retry}"
133+
)
134+
time.sleep(0.01)
135+
if retry != -1:
136+
retry -= 1
137+
finally:
138+
if retry == -1:
139+
self._resp_socket.setsockopt(zmq.RCVTIMEO, original_timeout)
140+
141+
Error("ActorConnector", f"{self._topic}: No response received. Giving up.")
142+
return None, None, None
143+
144+
def close(self):
145+
self._sender.close()
84146
self._resp_socket.close()

samples/apps/cap/py/autogencap/Broker.py

+9-2
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ def __init__(self, context: zmq.Context = zmq.Context()):
1212
self._xpub: zmq.Socket = None
1313
self._xsub: zmq.Socket = None
1414
self._router: zmq.Socket = None
15+
self._start_event = threading.Event()
1516

1617
def _init_sockets(self):
1718
try:
@@ -44,8 +45,9 @@ def start(self) -> bool:
4445
self._run = True
4546
self._broker_thread: threading.Thread = threading.Thread(target=self.thread_fn)
4647
self._broker_thread.start()
47-
time.sleep(0.01)
48-
return True
48+
self._start_event.wait()
49+
# this will be false if the thread is not running
50+
return self._run
4951

5052
def stop(self):
5153
if not self._run:
@@ -67,6 +69,7 @@ def thread_fn(self):
6769
if not self._init_sockets():
6870
Debug("BROKER", "Receive thread not started since sockets were not initialized")
6971
self._run = False
72+
self._start_event.set()
7073
return
7174

7275
# Poll sockets for events
@@ -76,6 +79,8 @@ def thread_fn(self):
7679
self._poller.register(self._router, zmq.POLLIN)
7780

7881
Info("BROKER", "Started. Waiting for events")
82+
# signal to the main thread that Broker has started
83+
self._start_event.set()
7984
# Receive msgs, forward and process
8085
while self._run:
8186
events = dict(self._poller.poll(500))
@@ -131,6 +136,8 @@ def main():
131136
Info("BROKER", "Running.")
132137
last_time = current_time
133138
try:
139+
# Hang out for a while and print out
140+
# status every now and then
134141
time.sleep(0.5)
135142
except KeyboardInterrupt:
136143
Info("BROKER", "KeyboardInterrupt. Stopping the broker.")

0 commit comments

Comments
 (0)