Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 35 additions & 27 deletions rosbridge_server/scripts/rosbridge_websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,19 @@
from __future__ import annotations

import argparse
import asyncio
import signal
import sys
import threading
import time
from typing import TYPE_CHECKING, cast

import rclpy
from rcl_interfaces.msg import ParameterDescriptor
from rclpy.executors import SingleThreadedExecutor
from rclpy.node import Node
from rclpy.utilities import remove_ros_args
from tornado.httpserver import HTTPServer
from tornado.ioloop import IOLoop, PeriodicCallback
from tornado.netutil import bind_sockets
from tornado.web import Application

Expand All @@ -53,14 +56,6 @@
from tornado.routing import _RuleList


def start_hook() -> None:
IOLoop.instance().start()


def shutdown_hook() -> None:
IOLoop.instance().stop()


SERVER_PARAMETERS = (
# Server parameters
("port", int, 9090, "Port to listen on for WebSocket connections."),
Expand Down Expand Up @@ -130,6 +125,7 @@ def __init__(self) -> None:

RosbridgeWebSocket.node_handle = self
RosbridgeWebSocket.client_manager = ClientManager(self)
RosbridgeWebSocket.event_loop = asyncio.get_event_loop()

self._handle_parameters()

Expand Down Expand Up @@ -227,30 +223,42 @@ def _start_server(self) -> None:
time.sleep(self.retry_startup_delay)


def main() -> None:
rclpy.init()
async def async_main() -> None:
rclpy.init(args=sys.argv, signal_handler_options=rclpy.signals.SignalHandlerOptions.NO)

node = RosbridgeWebsocketNode()

executor = rclpy.executors.SingleThreadedExecutor()
executor = SingleThreadedExecutor()
executor.add_node(node)

def spin_ros() -> None:
if not rclpy.ok():
shutdown_hook()
spin_thread = threading.Thread(target=executor.spin)
spin_thread.start()

loop = asyncio.get_running_loop()
stop_event = asyncio.Event()
signal_handled = False

def handle_signal() -> None:
nonlocal signal_handled
if signal_handled:
return
executor.spin_once(timeout_sec=0.01)

spin_callback = PeriodicCallback(spin_ros, 1)
spin_callback.start()
try:
start_hook()
node.destroy_node()
rclpy.shutdown()
except KeyboardInterrupt:
print("Exiting due to SIGINT")
finally:
spin_callback.stop()
shutdown_hook() # shutdown hook to stop the server
stop_event.set()
executor.shutdown()
signal_handled = True

for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, handle_signal)

await stop_event.wait()
spin_thread.join()

node.destroy_node()
rclpy.shutdown()


def main() -> None:
asyncio.run(async_main())


if __name__ == "__main__":
Expand Down
15 changes: 10 additions & 5 deletions rosbridge_server/src/rosbridge_server/websocket_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,19 @@

from __future__ import annotations

import asyncio
import sys
import threading
import traceback
import uuid
from asyncio.events import AbstractEventLoop
from collections import deque
from functools import partial, wraps
from functools import wraps
from typing import TYPE_CHECKING, ClassVar, ParamSpec, TypeVar

from rclpy.node import Node
from rosbridge_library.rosbridge_protocol import RosbridgeProtocol
from rosbridge_library.util import bson
from tornado.ioloop import IOLoop
from tornado.iostream import StreamClosedError
from tornado.websocket import WebSocketClosedError, WebSocketHandler

Expand All @@ -52,8 +53,6 @@

from .client_manager import ClientManager

_io_loop = IOLoop.instance()


def _log_exception() -> None:
"""Log the most recent exception to ROS."""
Expand Down Expand Up @@ -132,6 +131,9 @@ class RosbridgeWebSocket(WebSocketHandler):
# Class variable to manage connected clients
client_manager: ClassVar[ClientManager | None] = None

# Event loop to run the coroutines on
event_loop: ClassVar[AbstractEventLoop | None] = None

# Node handle to pass to RosbridgeProtocol when opening a connection
node_handle: ClassVar[Node | None] = None

Expand Down Expand Up @@ -189,12 +191,15 @@ def on_close(self) -> None:
self.incoming_queue.finish()

def send_message(self, message: bson.BSON | bytearray | str, compression: str = "none") -> None:
cls = self.__class__
assert isinstance(cls.event_loop, AbstractEventLoop), "Event loop was not set"

if isinstance(message, bson.BSON) or compression in ["cbor", "cbor-raw"]:
binary = True
else:
binary = False

_io_loop.add_callback(partial(self.prewrite_message, message, binary))
asyncio.run_coroutine_threadsafe(self.prewrite_message(message, binary), cls.event_loop)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If event_loop is None (its default value), will this error? Should we check?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It probably will, the same with node_handle. I'm not happy with passing arguments through class variables but I just tried to adhere to how it is currently done.


async def prewrite_message(self, message: bson.BSON | bytearray | str, binary: bool) -> None:
cls = self.__class__
Expand Down
Loading