|
| 1 | +import time |
| 2 | +import zmq |
| 3 | +import threading |
| 4 | +from autogencap.DebugLog import Debug, Info, Warn |
| 5 | +from autogencap.Config import xsub_url, xpub_url |
| 6 | + |
| 7 | + |
| 8 | +class Broker: |
| 9 | + def __init__(self, context: zmq.Context = zmq.Context()): |
| 10 | + self._context: zmq.Context = context |
| 11 | + self._run: bool = False |
| 12 | + self._xpub: zmq.Socket = None |
| 13 | + self._xsub: zmq.Socket = None |
| 14 | + |
| 15 | + def start(self) -> bool: |
| 16 | + try: |
| 17 | + # XPUB setup |
| 18 | + self._xpub = self._context.socket(zmq.XPUB) |
| 19 | + self._xpub.setsockopt(zmq.LINGER, 0) |
| 20 | + self._xpub.bind(xpub_url) |
| 21 | + |
| 22 | + # XSUB setup |
| 23 | + self._xsub = self._context.socket(zmq.XSUB) |
| 24 | + self._xsub.setsockopt(zmq.LINGER, 0) |
| 25 | + self._xsub.bind(xsub_url) |
| 26 | + |
| 27 | + except zmq.ZMQError as e: |
| 28 | + Debug("BROKER", f"Unable to start. Check details: {e}") |
| 29 | + # If binding fails, close the sockets and return False |
| 30 | + if self._xpub: |
| 31 | + self._xpub.close() |
| 32 | + if self._xsub: |
| 33 | + self._xsub.close() |
| 34 | + return False |
| 35 | + |
| 36 | + self._run = True |
| 37 | + self._broker_thread: threading.Thread = threading.Thread(target=self.thread_fn) |
| 38 | + self._broker_thread.start() |
| 39 | + time.sleep(0.01) |
| 40 | + return True |
| 41 | + |
| 42 | + def stop(self): |
| 43 | + # Error("BROKER_ERR", "fix cleanup self._context.term()") |
| 44 | + Debug("BROKER", "stopped") |
| 45 | + self._run = False |
| 46 | + self._broker_thread.join() |
| 47 | + if self._xpub: |
| 48 | + self._xpub.close() |
| 49 | + if self._xsub: |
| 50 | + self._xsub.close() |
| 51 | + # self._context.term() |
| 52 | + |
| 53 | + def thread_fn(self): |
| 54 | + try: |
| 55 | + # Poll sockets for events |
| 56 | + self._poller: zmq.Poller = zmq.Poller() |
| 57 | + self._poller.register(self._xpub, zmq.POLLIN) |
| 58 | + self._poller.register(self._xsub, zmq.POLLIN) |
| 59 | + |
| 60 | + # Receive msgs, forward and process |
| 61 | + while self._run: |
| 62 | + events = dict(self._poller.poll(500)) |
| 63 | + if self._xpub in events: |
| 64 | + message = self._xpub.recv_multipart() |
| 65 | + Debug("BROKER", f"subscription message: {message[0]}") |
| 66 | + self._xsub.send_multipart(message) |
| 67 | + |
| 68 | + if self._xsub in events: |
| 69 | + message = self._xsub.recv_multipart() |
| 70 | + Debug("BROKER", f"publishing message: {message}") |
| 71 | + self._xpub.send_multipart(message) |
| 72 | + |
| 73 | + except Exception as e: |
| 74 | + Debug("BROKER", f"thread encountered an error: {e}") |
| 75 | + finally: |
| 76 | + self._run = False |
| 77 | + Debug("BROKER", "thread ended") |
| 78 | + return |
| 79 | + |
| 80 | + |
| 81 | +# Run a standalone broker that all other Actors can connect to. |
| 82 | +# This can also run inproc with the other actors. |
| 83 | +def main(): |
| 84 | + broker = Broker() |
| 85 | + Info("BROKER", "Starting.") |
| 86 | + if broker.start(): |
| 87 | + Info("BROKER", "Running.") |
| 88 | + else: |
| 89 | + Warn("BROKER", "Failed to start.") |
| 90 | + return |
| 91 | + |
| 92 | + status_interval = 300 # seconds |
| 93 | + last_time = time.time() |
| 94 | + |
| 95 | + # Broker is running in a separate thread. Here we are watching the |
| 96 | + # broker's status and printing status every few seconds. This is |
| 97 | + # a good place to print other statistics captured as the broker runs. |
| 98 | + # -- Exits when the user presses Ctrl+C -- |
| 99 | + while broker._run: |
| 100 | + # print a message every n seconds |
| 101 | + current_time = time.time() |
| 102 | + elapsed_time = current_time - last_time |
| 103 | + if elapsed_time > status_interval: |
| 104 | + Info("BROKER", "Running.") |
| 105 | + last_time = current_time |
| 106 | + try: |
| 107 | + time.sleep(0.5) |
| 108 | + except KeyboardInterrupt: |
| 109 | + Info("BROKER", "KeyboardInterrupt. Stopping the broker.") |
| 110 | + broker.stop() |
| 111 | + |
| 112 | + |
| 113 | +if __name__ == "__main__": |
| 114 | + main() |
0 commit comments