Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 49 additions & 8 deletions src/rai_core/rai/communication/ros2/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,9 @@ class ROS2TopicAPI:
_publishers: Dictionary mapping topic names to their publisher instances
"""

def __init__(self, node: rclpy.node.Node) -> None:
def __init__(
self, node: rclpy.node.Node, destroy_subscribers: bool = False
) -> None:
"""Initialize the ROS2 topic API.

Args:
Expand All @@ -181,7 +183,7 @@ def __init__(self, node: rclpy.node.Node) -> None:
# preventing node crashes.
self._last_msg: Dict[str, Tuple[float, Any]] = {}
self._subscriptions: Dict[str, rclpy.node.Subscription] = {}
self._destroy_subscriptions: bool = False
self._destroy_subscribers: bool = destroy_subscribers

def get_topic_names_and_types(
self, no_demangle: bool = False
Expand Down Expand Up @@ -298,7 +300,35 @@ def receive(
def _generic_callback(self, topic: str, msg: Any) -> None:
self._last_msg[topic] = (time.time(), msg)

def _wait_for_message(
def _wait_for_message_once(
self,
msg_cls: Type[Any],
node: rclpy.node.Node,
topic: str,
qos_profile: QoSProfile,
timeout_sec: float,
) -> Tuple[bool, Any]:
ts = time.time()
success = False
msg = None

def callback(received_msg: Any):
nonlocal success, msg
success = True
msg = received_msg

sub = node.create_subscription(
msg_cls,
topic,
callback,
qos_profile=qos_profile,
)
while not success and time.time() - ts < timeout_sec:
time.sleep(0.01)
node.destroy_subscription(sub)
return success, msg

def _wait_for_message_persistent(
self,
msg_cls: Type[Any],
node: rclpy.node.Node,
Expand All @@ -313,19 +343,30 @@ def _wait_for_message(
partial(self._generic_callback, topic),
qos_profile=qos_profile,
)

ts = time.time()
while time.time() - ts < timeout_sec:
if topic in self._last_msg:
if self._last_msg[topic][0] + timeout_sec > time.time():
if self._destroy_subscriptions:
node.destroy_subscription(self._subscriptions.pop(topic))
return True, self._last_msg[topic][1]
time.sleep(0.01)
if self._destroy_subscriptions:
node.destroy_subscription(self._subscriptions.pop(topic))
return False, None

def _wait_for_message(
self,
msg_cls: Type[Any],
node: rclpy.node.Node,
topic: str,
qos_profile: QoSProfile,
timeout_sec: float,
) -> Tuple[bool, Any]:
if self._destroy_subscribers:
return self._wait_for_message_once(
msg_cls, node, topic, qos_profile, timeout_sec
)
return self._wait_for_message_persistent(
msg_cls, node, topic, qos_profile, timeout_sec
)

def _is_topic_available(self, topic: str, timeout_sec: float) -> bool:
ts = time.time()
topic = topic if topic.startswith("/") else f"/{topic}"
Expand Down
69 changes: 67 additions & 2 deletions src/rai_core/rai/communication/ros2/connectors/ari_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,77 @@


class ROS2ARIConnector(ROS2ActionMixin, ROS2ServiceMixin, ARIConnector[ROS2ARIMessage]):
"""ROS2-specific implementation of the ARIConnector.
This connector provides functionality for ROS2 communication through topics,
services, and actions, as well as TF (Transform) operations.
Parameters
----------
node_name : str, optional
Name of the ROS2 node. If not provided, generates a unique name with UUID.
destroy_subscribers : bool, optional
Whether to destroy subscribers after receiving a message, by default False.
If True, the subscriber will be destroyed after the message is received.
If False, may lead to performance issues due to unneded subscribers.
It has been observed that destroying subscribers may lead to a crash of the node/executor.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this line should be moved to notes and expanded upon. "It has been observed.... may", do you know anything more about this behaviour?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Expanded note

Attributes
----------
_node : Node
The ROS2 node instance.
_topic_api : ROS2TopicAPI
API for handling ROS2 topic operations.
_service_api : ROS2ServiceAPI
API for handling ROS2 service operations.
_actions_api : ROS2ActionAPI
API for handling ROS2 action operations.
_tf_buffer : Buffer
Buffer for storing TF transforms.
tf_listener : TransformListener
Listener for TF transforms.
_executor : MultiThreadedExecutor
ROS2 executor for running the node.
_thread : Thread
Thread running the executor.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
_node : Node
The ROS2 node instance.
_topic_api : ROS2TopicAPI
API for handling ROS2 topic operations.
_service_api : ROS2ServiceAPI
API for handling ROS2 service operations.
_actions_api : ROS2ActionAPI
API for handling ROS2 action operations.
_tf_buffer : Buffer
Buffer for storing TF transforms.
tf_listener : TransformListener
Listener for TF transforms.
_executor : MultiThreadedExecutor
ROS2 executor for running the node.
_thread : Thread
Thread running the executor.
tf_listener : TransformListener
Listener for TF transforms.

Protected and private Attributes should not be listed in the docstring documenting the classes public API. If any of these should become part of the public API please remove the _ prefix.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed tf_listener, to _tf_listener, as there is no need to expose the listener.
removed Attributes section of the docstring

Methods
-------
get_topics_names_and_types()
Get list of available topics and their message types.
get_services_names_and_types()
Get list of available services and their types.
get_actions_names_and_types()
Get list of available actions and their types.
send_message(message, target, msg_type, auto_qos_matching=True, qos_profile=None, **kwargs)
Send a message to a specified topic.
receive_message(source, timeout_sec=1.0, msg_type=None, auto_topic_type=True, **kwargs)
Receive a message from a specified topic.
wait_for_transform(tf_buffer, target_frame, source_frame, timeout_sec=1.0)
Wait for a transform to become available.
get_transform(target_frame, source_frame, timeout_sec=5.0)
Get the transform between two frames.
create_service(service_name, on_request, on_done=None, service_type, **kwargs)
Create a ROS2 service.
create_action(action_name, generate_feedback_callback, action_type, **kwargs)
Create a ROS2 action server.
shutdown()
Clean up resources and shut down the connector.
Notes
-----
This connector runs in a separate thread to handle ROS2 operations asynchronously.
It automatically manages the lifecycle of ROS2 nodes, topics, services, and actions.
"""

def __init__(
self, node_name: str = f"rai_ros2_ari_connector_{str(uuid.uuid4())[-12:]}"
self,
node_name: str = f"rai_ros2_ari_connector_{str(uuid.uuid4())[-12:]}",
destroy_subscribers: bool = False,
):
super().__init__()
self._node = Node(node_name)
self._topic_api = ROS2TopicAPI(self._node)
self._topic_api = ROS2TopicAPI(self._node, destroy_subscribers)
self._service_api = ROS2ServiceAPI(self._node)
self._actions_api = ROS2ActionAPI(self._node)
self._tf_buffer = Buffer(node=self._node)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import threading
import uuid
from typing import Any, Callable, List, Literal, Optional, Tuple, Union

from rclpy.executors import MultiThreadedExecutor
from rclpy.node import Node

import rai_interfaces.msg
from rai.communication import HRIConnector
from rai.communication.ros2.api import (
ConfigurableROS2TopicAPI,
Expand All @@ -31,6 +31,11 @@
from rai.communication.ros2.connectors.service_mixin import ROS2ServiceMixin
from rai.communication.ros2.messages import ROS2HRIMessage

try:
import rai_interfaces.msg
except ImportError:
logging.warning("rai_interfaces is not installed, ROS 2 HRIMessage will not work.")


class ROS2HRIConnector(ROS2ActionMixin, ROS2ServiceMixin, HRIConnector[ROS2HRIMessage]):
def __init__(
Expand Down