From eaa9d3fb83e65f4da875478235cd36ea4ab4ff54 Mon Sep 17 00:00:00 2001 From: Maciej Majek Date: Tue, 1 Apr 2025 20:44:03 +0200 Subject: [PATCH 1/2] feat: enhance destroy_subscribers behavior --- src/rai_core/rai/communication/ros2/api.py | 57 ++++++++++++--- .../ros2/connectors/ari_connector.py | 69 ++++++++++++++++++- .../ros2/connectors/hri_connector.py | 7 +- 3 files changed, 122 insertions(+), 11 deletions(-) diff --git a/src/rai_core/rai/communication/ros2/api.py b/src/rai_core/rai/communication/ros2/api.py index 7af6706da..2a98ba266 100644 --- a/src/rai_core/rai/communication/ros2/api.py +++ b/src/rai_core/rai/communication/ros2/api.py @@ -164,7 +164,9 @@ class ROS2TopicAPI: _publishers: Dictionary mapping topic names to their publisher instances """ - def __init__(self, node: rclpy.node.Node) -> None: + def __init__( + self, node: rclpy.node.Node, destroy_subscribers: bool = False + ) -> None: """Initialize the ROS2 topic API. Args: @@ -181,7 +183,7 @@ def __init__(self, node: rclpy.node.Node) -> None: # preventing node crashes. self._last_msg: Dict[str, Tuple[float, Any]] = {} self._subscriptions: Dict[str, rclpy.node.Subscription] = {} - self._destroy_subscriptions: bool = False + self._destroy_subscribers: bool = destroy_subscribers def get_topic_names_and_types( self, no_demangle: bool = False @@ -298,7 +300,35 @@ def receive( def _generic_callback(self, topic: str, msg: Any) -> None: self._last_msg[topic] = (time.time(), msg) - def _wait_for_message( + def _wait_for_message_once( + self, + msg_cls: Type[Any], + node: rclpy.node.Node, + topic: str, + qos_profile: QoSProfile, + timeout_sec: float, + ) -> Tuple[bool, Any]: + ts = time.time() + success = False + msg = None + + def callback(received_msg: Any): + nonlocal success, msg + success = True + msg = received_msg + + sub = node.create_subscription( + msg_cls, + topic, + callback, + qos_profile=qos_profile, + ) + while not success and time.time() - ts < timeout_sec: + time.sleep(0.01) + node.destroy_subscription(sub) + return success, msg + + def _wait_for_message_persistent( self, msg_cls: Type[Any], node: rclpy.node.Node, @@ -313,19 +343,30 @@ def _wait_for_message( partial(self._generic_callback, topic), qos_profile=qos_profile, ) - ts = time.time() while time.time() - ts < timeout_sec: if topic in self._last_msg: if self._last_msg[topic][0] + timeout_sec > time.time(): - if self._destroy_subscriptions: - node.destroy_subscription(self._subscriptions.pop(topic)) return True, self._last_msg[topic][1] time.sleep(0.01) - if self._destroy_subscriptions: - node.destroy_subscription(self._subscriptions.pop(topic)) return False, None + def _wait_for_message( + self, + msg_cls: Type[Any], + node: rclpy.node.Node, + topic: str, + qos_profile: QoSProfile, + timeout_sec: float, + ) -> Tuple[bool, Any]: + if self._destroy_subscribers: + return self._wait_for_message_once( + msg_cls, node, topic, qos_profile, timeout_sec + ) + return self._wait_for_message_persistent( + msg_cls, node, topic, qos_profile, timeout_sec + ) + def _is_topic_available(self, topic: str, timeout_sec: float) -> bool: ts = time.time() topic = topic if topic.startswith("/") else f"/{topic}" diff --git a/src/rai_core/rai/communication/ros2/connectors/ari_connector.py b/src/rai_core/rai/communication/ros2/connectors/ari_connector.py index 95a6adcef..9ded9bfc3 100644 --- a/src/rai_core/rai/communication/ros2/connectors/ari_connector.py +++ b/src/rai_core/rai/communication/ros2/connectors/ari_connector.py @@ -39,12 +39,77 @@ class ROS2ARIConnector(ROS2ActionMixin, ROS2ServiceMixin, ARIConnector[ROS2ARIMessage]): + """ROS2-specific implementation of the ARIConnector. + + This connector provides functionality for ROS2 communication through topics, + services, and actions, as well as TF (Transform) operations. + + Parameters + ---------- + node_name : str, optional + Name of the ROS2 node. If not provided, generates a unique name with UUID. + destroy_subscribers : bool, optional + Whether to destroy subscribers after receiving a message, by default False. + If True, the subscriber will be destroyed after the message is received. + If False, may lead to performance issues due to unneded subscribers. + It has been observed that destroying subscribers may lead to a crash of the node/executor. + + Attributes + ---------- + _node : Node + The ROS2 node instance. + _topic_api : ROS2TopicAPI + API for handling ROS2 topic operations. + _service_api : ROS2ServiceAPI + API for handling ROS2 service operations. + _actions_api : ROS2ActionAPI + API for handling ROS2 action operations. + _tf_buffer : Buffer + Buffer for storing TF transforms. + tf_listener : TransformListener + Listener for TF transforms. + _executor : MultiThreadedExecutor + ROS2 executor for running the node. + _thread : Thread + Thread running the executor. + + Methods + ------- + get_topics_names_and_types() + Get list of available topics and their message types. + get_services_names_and_types() + Get list of available services and their types. + get_actions_names_and_types() + Get list of available actions and their types. + send_message(message, target, msg_type, auto_qos_matching=True, qos_profile=None, **kwargs) + Send a message to a specified topic. + receive_message(source, timeout_sec=1.0, msg_type=None, auto_topic_type=True, **kwargs) + Receive a message from a specified topic. + wait_for_transform(tf_buffer, target_frame, source_frame, timeout_sec=1.0) + Wait for a transform to become available. + get_transform(target_frame, source_frame, timeout_sec=5.0) + Get the transform between two frames. + create_service(service_name, on_request, on_done=None, service_type, **kwargs) + Create a ROS2 service. + create_action(action_name, generate_feedback_callback, action_type, **kwargs) + Create a ROS2 action server. + shutdown() + Clean up resources and shut down the connector. + + Notes + ----- + This connector runs in a separate thread to handle ROS2 operations asynchronously. + It automatically manages the lifecycle of ROS2 nodes, topics, services, and actions. + """ + def __init__( - self, node_name: str = f"rai_ros2_ari_connector_{str(uuid.uuid4())[-12:]}" + self, + node_name: str = f"rai_ros2_ari_connector_{str(uuid.uuid4())[-12:]}", + destroy_subscribers: bool = False, ): super().__init__() self._node = Node(node_name) - self._topic_api = ROS2TopicAPI(self._node) + self._topic_api = ROS2TopicAPI(self._node, destroy_subscribers) self._service_api = ROS2ServiceAPI(self._node) self._actions_api = ROS2ActionAPI(self._node) self._tf_buffer = Buffer(node=self._node) diff --git a/src/rai_core/rai/communication/ros2/connectors/hri_connector.py b/src/rai_core/rai/communication/ros2/connectors/hri_connector.py index 8e76d9c8b..94b3c666e 100644 --- a/src/rai_core/rai/communication/ros2/connectors/hri_connector.py +++ b/src/rai_core/rai/communication/ros2/connectors/hri_connector.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging import threading import uuid from typing import Any, Callable, List, Literal, Optional, Tuple, Union @@ -19,7 +20,6 @@ from rclpy.executors import MultiThreadedExecutor from rclpy.node import Node -import rai_interfaces.msg from rai.communication import HRIConnector from rai.communication.ros2.api import ( ConfigurableROS2TopicAPI, @@ -31,6 +31,11 @@ from rai.communication.ros2.connectors.service_mixin import ROS2ServiceMixin from rai.communication.ros2.messages import ROS2HRIMessage +try: + import rai_interfaces.msg +except ImportError: + logging.warning("rai_interfaces is not installed, ROS 2 HRIMessage will not work.") + class ROS2HRIConnector(ROS2ActionMixin, ROS2ServiceMixin, HRIConnector[ROS2HRIMessage]): def __init__( From 023cd9ede6f60a516951239aeacfe24efbaae57e Mon Sep 17 00:00:00 2001 From: Maciej Majek Date: Wed, 2 Apr 2025 11:46:09 +0200 Subject: [PATCH 2/2] chore: update docstring --- .../ros2/connectors/ari_connector.py | 40 +++++++------------ 1 file changed, 14 insertions(+), 26 deletions(-) diff --git a/src/rai_core/rai/communication/ros2/connectors/ari_connector.py b/src/rai_core/rai/communication/ros2/connectors/ari_connector.py index 9ded9bfc3..ac9a32181 100644 --- a/src/rai_core/rai/communication/ros2/connectors/ari_connector.py +++ b/src/rai_core/rai/communication/ros2/connectors/ari_connector.py @@ -50,28 +50,6 @@ class ROS2ARIConnector(ROS2ActionMixin, ROS2ServiceMixin, ARIConnector[ROS2ARIMe Name of the ROS2 node. If not provided, generates a unique name with UUID. destroy_subscribers : bool, optional Whether to destroy subscribers after receiving a message, by default False. - If True, the subscriber will be destroyed after the message is received. - If False, may lead to performance issues due to unneded subscribers. - It has been observed that destroying subscribers may lead to a crash of the node/executor. - - Attributes - ---------- - _node : Node - The ROS2 node instance. - _topic_api : ROS2TopicAPI - API for handling ROS2 topic operations. - _service_api : ROS2ServiceAPI - API for handling ROS2 service operations. - _actions_api : ROS2ActionAPI - API for handling ROS2 action operations. - _tf_buffer : Buffer - Buffer for storing TF transforms. - tf_listener : TransformListener - Listener for TF transforms. - _executor : MultiThreadedExecutor - ROS2 executor for running the node. - _thread : Thread - Thread running the executor. Methods ------- @@ -98,8 +76,18 @@ class ROS2ARIConnector(ROS2ActionMixin, ROS2ServiceMixin, ARIConnector[ROS2ARIMe Notes ----- - This connector runs in a separate thread to handle ROS2 operations asynchronously. - It automatically manages the lifecycle of ROS2 nodes, topics, services, and actions. + Threading Model: + The connector creates a MultiThreadedExecutor that runs in a dedicated thread. + This executor processes all ROS2 callbacks and operations asynchronously. + + Subscriber Lifecycle: + The `destroy_subscribers` parameter controls subscriber cleanup behavior: + - True: Subscribers are destroyed after receiving a message + - Pros: Better resource utilization + - Cons: Known stability issues (see: https://github.com/ros2/rclpy/issues/1142) + - False (default): Subscribers remain active after message reception + - Pros: More stable operation, avoids potential crashes + - Cons: May lead to memory/performance overhead from inactive subscribers """ def __init__( @@ -113,7 +101,7 @@ def __init__( self._service_api = ROS2ServiceAPI(self._node) self._actions_api = ROS2ActionAPI(self._node) self._tf_buffer = Buffer(node=self._node) - self.tf_listener = TransformListener(self._tf_buffer, self._node) + self._tf_listener = TransformListener(self._tf_buffer, self._node) self._executor = MultiThreadedExecutor() self._executor.add_node(self._node) @@ -238,7 +226,7 @@ def node(self) -> Node: return self._node def shutdown(self): - self.tf_listener.unregister() + self._tf_listener.unregister() self._node.destroy_node() self._actions_api.shutdown() self._topic_api.shutdown()