diff --git a/src/rai_core/rai/communication/ros2/api.py b/src/rai_core/rai/communication/ros2/api.py index 7af6706da..2a98ba266 100644 --- a/src/rai_core/rai/communication/ros2/api.py +++ b/src/rai_core/rai/communication/ros2/api.py @@ -164,7 +164,9 @@ class ROS2TopicAPI: _publishers: Dictionary mapping topic names to their publisher instances """ - def __init__(self, node: rclpy.node.Node) -> None: + def __init__( + self, node: rclpy.node.Node, destroy_subscribers: bool = False + ) -> None: """Initialize the ROS2 topic API. Args: @@ -181,7 +183,7 @@ def __init__(self, node: rclpy.node.Node) -> None: # preventing node crashes. self._last_msg: Dict[str, Tuple[float, Any]] = {} self._subscriptions: Dict[str, rclpy.node.Subscription] = {} - self._destroy_subscriptions: bool = False + self._destroy_subscribers: bool = destroy_subscribers def get_topic_names_and_types( self, no_demangle: bool = False @@ -298,7 +300,35 @@ def receive( def _generic_callback(self, topic: str, msg: Any) -> None: self._last_msg[topic] = (time.time(), msg) - def _wait_for_message( + def _wait_for_message_once( + self, + msg_cls: Type[Any], + node: rclpy.node.Node, + topic: str, + qos_profile: QoSProfile, + timeout_sec: float, + ) -> Tuple[bool, Any]: + ts = time.time() + success = False + msg = None + + def callback(received_msg: Any): + nonlocal success, msg + success = True + msg = received_msg + + sub = node.create_subscription( + msg_cls, + topic, + callback, + qos_profile=qos_profile, + ) + while not success and time.time() - ts < timeout_sec: + time.sleep(0.01) + node.destroy_subscription(sub) + return success, msg + + def _wait_for_message_persistent( self, msg_cls: Type[Any], node: rclpy.node.Node, @@ -313,19 +343,30 @@ def _wait_for_message( partial(self._generic_callback, topic), qos_profile=qos_profile, ) - ts = time.time() while time.time() - ts < timeout_sec: if topic in self._last_msg: if self._last_msg[topic][0] + timeout_sec > time.time(): - if self._destroy_subscriptions: - node.destroy_subscription(self._subscriptions.pop(topic)) return True, self._last_msg[topic][1] time.sleep(0.01) - if self._destroy_subscriptions: - node.destroy_subscription(self._subscriptions.pop(topic)) return False, None + def _wait_for_message( + self, + msg_cls: Type[Any], + node: rclpy.node.Node, + topic: str, + qos_profile: QoSProfile, + timeout_sec: float, + ) -> Tuple[bool, Any]: + if self._destroy_subscribers: + return self._wait_for_message_once( + msg_cls, node, topic, qos_profile, timeout_sec + ) + return self._wait_for_message_persistent( + msg_cls, node, topic, qos_profile, timeout_sec + ) + def _is_topic_available(self, topic: str, timeout_sec: float) -> bool: ts = time.time() topic = topic if topic.startswith("/") else f"/{topic}" diff --git a/src/rai_core/rai/communication/ros2/connectors/ari_connector.py b/src/rai_core/rai/communication/ros2/connectors/ari_connector.py index 95a6adcef..ac9a32181 100644 --- a/src/rai_core/rai/communication/ros2/connectors/ari_connector.py +++ b/src/rai_core/rai/communication/ros2/connectors/ari_connector.py @@ -39,16 +39,69 @@ class ROS2ARIConnector(ROS2ActionMixin, ROS2ServiceMixin, ARIConnector[ROS2ARIMessage]): + """ROS2-specific implementation of the ARIConnector. + + This connector provides functionality for ROS2 communication through topics, + services, and actions, as well as TF (Transform) operations. + + Parameters + ---------- + node_name : str, optional + Name of the ROS2 node. If not provided, generates a unique name with UUID. + destroy_subscribers : bool, optional + Whether to destroy subscribers after receiving a message, by default False. + + Methods + ------- + get_topics_names_and_types() + Get list of available topics and their message types. + get_services_names_and_types() + Get list of available services and their types. + get_actions_names_and_types() + Get list of available actions and their types. + send_message(message, target, msg_type, auto_qos_matching=True, qos_profile=None, **kwargs) + Send a message to a specified topic. + receive_message(source, timeout_sec=1.0, msg_type=None, auto_topic_type=True, **kwargs) + Receive a message from a specified topic. + wait_for_transform(tf_buffer, target_frame, source_frame, timeout_sec=1.0) + Wait for a transform to become available. + get_transform(target_frame, source_frame, timeout_sec=5.0) + Get the transform between two frames. + create_service(service_name, on_request, on_done=None, service_type, **kwargs) + Create a ROS2 service. + create_action(action_name, generate_feedback_callback, action_type, **kwargs) + Create a ROS2 action server. + shutdown() + Clean up resources and shut down the connector. + + Notes + ----- + Threading Model: + The connector creates a MultiThreadedExecutor that runs in a dedicated thread. + This executor processes all ROS2 callbacks and operations asynchronously. + + Subscriber Lifecycle: + The `destroy_subscribers` parameter controls subscriber cleanup behavior: + - True: Subscribers are destroyed after receiving a message + - Pros: Better resource utilization + - Cons: Known stability issues (see: https://github.com/ros2/rclpy/issues/1142) + - False (default): Subscribers remain active after message reception + - Pros: More stable operation, avoids potential crashes + - Cons: May lead to memory/performance overhead from inactive subscribers + """ + def __init__( - self, node_name: str = f"rai_ros2_ari_connector_{str(uuid.uuid4())[-12:]}" + self, + node_name: str = f"rai_ros2_ari_connector_{str(uuid.uuid4())[-12:]}", + destroy_subscribers: bool = False, ): super().__init__() self._node = Node(node_name) - self._topic_api = ROS2TopicAPI(self._node) + self._topic_api = ROS2TopicAPI(self._node, destroy_subscribers) self._service_api = ROS2ServiceAPI(self._node) self._actions_api = ROS2ActionAPI(self._node) self._tf_buffer = Buffer(node=self._node) - self.tf_listener = TransformListener(self._tf_buffer, self._node) + self._tf_listener = TransformListener(self._tf_buffer, self._node) self._executor = MultiThreadedExecutor() self._executor.add_node(self._node) @@ -173,7 +226,7 @@ def node(self) -> Node: return self._node def shutdown(self): - self.tf_listener.unregister() + self._tf_listener.unregister() self._node.destroy_node() self._actions_api.shutdown() self._topic_api.shutdown() diff --git a/src/rai_core/rai/communication/ros2/connectors/hri_connector.py b/src/rai_core/rai/communication/ros2/connectors/hri_connector.py index 8e76d9c8b..94b3c666e 100644 --- a/src/rai_core/rai/communication/ros2/connectors/hri_connector.py +++ b/src/rai_core/rai/communication/ros2/connectors/hri_connector.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging import threading import uuid from typing import Any, Callable, List, Literal, Optional, Tuple, Union @@ -19,7 +20,6 @@ from rclpy.executors import MultiThreadedExecutor from rclpy.node import Node -import rai_interfaces.msg from rai.communication import HRIConnector from rai.communication.ros2.api import ( ConfigurableROS2TopicAPI, @@ -31,6 +31,11 @@ from rai.communication.ros2.connectors.service_mixin import ROS2ServiceMixin from rai.communication.ros2.messages import ROS2HRIMessage +try: + import rai_interfaces.msg +except ImportError: + logging.warning("rai_interfaces is not installed, ROS 2 HRIMessage will not work.") + class ROS2HRIConnector(ROS2ActionMixin, ROS2ServiceMixin, HRIConnector[ROS2HRIMessage]): def __init__(