diff --git a/rosbridge_server/scripts/rosbridge_websocket.py b/rosbridge_server/scripts/rosbridge_websocket.py index cda9f0aa..9648c5d2 100755 --- a/rosbridge_server/scripts/rosbridge_websocket.py +++ b/rosbridge_server/scripts/rosbridge_websocket.py @@ -34,16 +34,19 @@ from __future__ import annotations import argparse +import asyncio +import signal import sys +import threading import time from typing import TYPE_CHECKING, cast import rclpy from rcl_interfaces.msg import ParameterDescriptor +from rclpy.executors import SingleThreadedExecutor from rclpy.node import Node from rclpy.utilities import remove_ros_args from tornado.httpserver import HTTPServer -from tornado.ioloop import IOLoop, PeriodicCallback from tornado.netutil import bind_sockets from tornado.web import Application @@ -53,14 +56,6 @@ from tornado.routing import _RuleList -def start_hook() -> None: - IOLoop.instance().start() - - -def shutdown_hook() -> None: - IOLoop.instance().stop() - - SERVER_PARAMETERS = ( # Server parameters ("port", int, 9090, "Port to listen on for WebSocket connections."), @@ -130,6 +125,7 @@ def __init__(self) -> None: RosbridgeWebSocket.node_handle = self RosbridgeWebSocket.client_manager = ClientManager(self) + RosbridgeWebSocket.event_loop = asyncio.get_event_loop() self._handle_parameters() @@ -227,30 +223,42 @@ def _start_server(self) -> None: time.sleep(self.retry_startup_delay) -def main() -> None: - rclpy.init() +async def async_main() -> None: + rclpy.init(args=sys.argv, signal_handler_options=rclpy.signals.SignalHandlerOptions.NO) + node = RosbridgeWebsocketNode() - executor = rclpy.executors.SingleThreadedExecutor() + executor = SingleThreadedExecutor() executor.add_node(node) - def spin_ros() -> None: - if not rclpy.ok(): - shutdown_hook() + spin_thread = threading.Thread(target=executor.spin) + spin_thread.start() + + loop = asyncio.get_running_loop() + stop_event = asyncio.Event() + signal_handled = False + + def handle_signal() -> None: + nonlocal signal_handled + if signal_handled: return - executor.spin_once(timeout_sec=0.01) - - spin_callback = PeriodicCallback(spin_ros, 1) - spin_callback.start() - try: - start_hook() - node.destroy_node() - rclpy.shutdown() - except KeyboardInterrupt: print("Exiting due to SIGINT") - finally: - spin_callback.stop() - shutdown_hook() # shutdown hook to stop the server + stop_event.set() + executor.shutdown() + signal_handled = True + + for sig in (signal.SIGINT, signal.SIGTERM): + loop.add_signal_handler(sig, handle_signal) + + await stop_event.wait() + spin_thread.join() + + node.destroy_node() + rclpy.shutdown() + + +def main() -> None: + asyncio.run(async_main()) if __name__ == "__main__": diff --git a/rosbridge_server/src/rosbridge_server/websocket_handler.py b/rosbridge_server/src/rosbridge_server/websocket_handler.py index 7063e154..d7ccbf5d 100644 --- a/rosbridge_server/src/rosbridge_server/websocket_handler.py +++ b/rosbridge_server/src/rosbridge_server/websocket_handler.py @@ -32,18 +32,19 @@ from __future__ import annotations +import asyncio import sys import threading import traceback import uuid +from asyncio.events import AbstractEventLoop from collections import deque -from functools import partial, wraps +from functools import wraps from typing import TYPE_CHECKING, ClassVar, ParamSpec, TypeVar from rclpy.node import Node from rosbridge_library.rosbridge_protocol import RosbridgeProtocol from rosbridge_library.util import bson -from tornado.ioloop import IOLoop from tornado.iostream import StreamClosedError from tornado.websocket import WebSocketClosedError, WebSocketHandler @@ -52,8 +53,6 @@ from .client_manager import ClientManager -_io_loop = IOLoop.instance() - def _log_exception() -> None: """Log the most recent exception to ROS.""" @@ -132,6 +131,9 @@ class RosbridgeWebSocket(WebSocketHandler): # Class variable to manage connected clients client_manager: ClassVar[ClientManager | None] = None + # Event loop to run the coroutines on + event_loop: ClassVar[AbstractEventLoop | None] = None + # Node handle to pass to RosbridgeProtocol when opening a connection node_handle: ClassVar[Node | None] = None @@ -189,12 +191,15 @@ def on_close(self) -> None: self.incoming_queue.finish() def send_message(self, message: bson.BSON | bytearray | str, compression: str = "none") -> None: + cls = self.__class__ + assert isinstance(cls.event_loop, AbstractEventLoop), "Event loop was not set" + if isinstance(message, bson.BSON) or compression in ["cbor", "cbor-raw"]: binary = True else: binary = False - _io_loop.add_callback(partial(self.prewrite_message, message, binary)) + asyncio.run_coroutine_threadsafe(self.prewrite_message(message, binary), cls.event_loop) async def prewrite_message(self, message: bson.BSON | bytearray | str, binary: bool) -> None: cls = self.__class__