diff --git a/rclpy/rclpy/parameter_event_handler.py b/rclpy/rclpy/parameter_event_handler.py index ec1e6a77c..bc9fead91 100644 --- a/rclpy/rclpy/parameter_event_handler.py +++ b/rclpy/rclpy/parameter_event_handler.py @@ -352,6 +352,48 @@ def remove_parameter_event_callback( """ self._callbacks.remove_parameter_event_callback(handle) + def configure_nodes_filter( + self, + node_names: Optional[List[str]] = None, + ) -> bool: + """ + Configure which node parameter events will be received. + + This function depends on middleware support for content filtering. + If middleware doesn't support contentfilter, return false. + + If node_names is empty, the configured node filter will be cleared. + + If this function return true, only parameter events from the specified node will be + received. + It affects the behavior of the following two functions. + - add_parameter_event_callback() + The callback will only be called for parameter events from the specified nodes which are + configured in this function. + - add_parameter_callback() + The callback will only be called for parameter events from the specified nodes which are + configured in this function and add_parameter_callback(). + If the nodes specified in this function is different from the nodes specified in + add_parameter_callback(), the callback will never be called. + + :param node_names: Node names to filter parameter events from + + :return: True if the filter was successfully applied, False otherwise. + """ + if node_names is None or len(node_names) == 0: + # Clear content filter + self.parameter_event_subscription.set_content_filter('', []) + return True + + filter_expression = ' OR '.join([f'node = %{i}' for i in range(len(node_names))]) + + # Enclose each node name in "'" + quoted_node_names = [f"'{node_name}'" for node_name in node_names] + + self.parameter_event_subscription.set_content_filter(filter_expression, quoted_node_names) + + return self.parameter_event_subscription.is_cft_enabled + def _resolve_path( self, node_path: Optional[str] = None, diff --git a/rclpy/test/test_parameter_event_handler.py b/rclpy/test/test_parameter_event_handler.py index c4771a073..da30100b0 100644 --- a/rclpy/test/test_parameter_event_handler.py +++ b/rclpy/test/test_parameter_event_handler.py @@ -12,12 +12,16 @@ # See the License for the specific language governing permissions and # limitations under the License. +import threading +import time from typing import Any +from typing import Callable, Optional from typing import Union import unittest import pytest +from rcl_interfaces.msg import Parameter as ParameterMsg from rcl_interfaces.msg import ParameterEvent import rclpy.context from rclpy.executors import SingleThreadedExecutor @@ -296,6 +300,170 @@ def test_resolve_path_same_namespace(self) -> None: def test_resolve_path_other_namespace(self) -> None: assert '/test_node' == self.parameter_event_handler._resolve_path('/test_node') + def test_configure_nodes_filter_with_check_add_parameter_event_callback(self) -> None: + remote_node_name1 = 'remote_node_1' + remote_node1 = rclpy.create_node( + remote_node_name1, + namespace='/rclpy', + context=self.context) + remote_node_name2 = 'remote_node_2' + remote_node2 = rclpy.create_node( + remote_node_name2, + namespace='/rclpy', + context=self.context) + + remote_node1_param_name = 'param_node1' + remote_node2_param_name = 'param_node2' + remote_node1.declare_parameter(remote_node1_param_name, 10) + remote_node2.declare_parameter(remote_node2_param_name, 'Default') + + received_event_from_remote_node1 = False + received_event_from_remote_node2 = False + + def callback(param: ParameterEvent) -> None: + nonlocal received_event_from_remote_node1, received_event_from_remote_node2 + if param.node == f'/rclpy/{remote_node_name1}': + received_event_from_remote_node1 = True + elif param.node == f'/rclpy/{remote_node_name2}': + received_event_from_remote_node2 = True + + # Configure to only receive parameter events from remote_node_name2 + assert self.parameter_event_handler.configure_nodes_filter( + [f'/rclpy/{remote_node_name2}']) + + self.parameter_event_handler.add_parameter_event_callback(callback) + + def wait_param_event(executor: SingleThreadedExecutor, timeout: int, + condition: Optional[Callable[[], bool]] = None): + start = time.monotonic() + while time.monotonic() - start < timeout: + executor.spin_once(0.2) + if condition is not None and condition(): + break + + thread = threading.Thread(target=wait_param_event, args=(self.executor, 2)) + thread.start() + time.sleep(0.1) # 100ms + remote_node1.set_parameters( + [Parameter(remote_node1_param_name, Parameter.Type.INTEGER, 20)]) + remote_node2.set_parameters( + [Parameter(remote_node2_param_name, Parameter.Type.STRING, 'abc')]) + thread.join() + + assert not received_event_from_remote_node1 + assert received_event_from_remote_node2 + + # Clear node filter and all parameter events from remote nodes should be received + assert self.parameter_event_handler.configure_nodes_filter() + + received_event_from_remote_node1 = False + received_event_from_remote_node2 = False + + def check_both_received(): + return received_event_from_remote_node1 and received_event_from_remote_node2 + + thread = threading.Thread( + target=wait_param_event, + args=(self.executor, 2, check_both_received)) + thread.start() + time.sleep(0.1) # 100ms + remote_node1.set_parameters( + [Parameter(remote_node1_param_name, Parameter.Type.INTEGER, 30)]) + remote_node2.set_parameters( + [Parameter(remote_node2_param_name, Parameter.Type.STRING, 'def')]) + thread.join() + + assert received_event_from_remote_node1 + assert received_event_from_remote_node2 + + remote_node1.destroy_node() + remote_node2.destroy_node() + + def test_configure_nodes_filter_with_check_add_parameter_callback(self) -> None: + remote_node_name1 = 'remote_node_1' + remote_node1 = rclpy.create_node( + remote_node_name1, + namespace='/rclpy', + context=self.context) + remote_node_name2 = 'remote_node_2' + remote_node2 = rclpy.create_node( + remote_node_name2, + namespace='/rclpy', + context=self.context) + + remote_node1_param_name = 'param_node1' + remote_node2_param_name = 'param_node2' + remote_node1.declare_parameter(remote_node1_param_name, 10) + remote_node2.declare_parameter(remote_node2_param_name, 'Default') + + received_event_from_remote_node1 = False + received_event_from_remote_node2 = False + + def callback_remote_node1(param: ParameterMsg) -> None: + nonlocal received_event_from_remote_node1 + if param.name == remote_node1_param_name: + received_event_from_remote_node1 = True + + def callback_remote_node2(param: ParameterMsg) -> None: + nonlocal received_event_from_remote_node2 + if param.name == remote_node2_param_name: + received_event_from_remote_node2 = True + + # Configure to only receive parameter events from remote_node_name2 + assert self.parameter_event_handler.configure_nodes_filter( + [f'/rclpy/{remote_node_name2}']) + + self.parameter_event_handler.add_parameter_callback( + remote_node1_param_name, remote_node_name1, callback_remote_node1) + self.parameter_event_handler.add_parameter_callback( + remote_node2_param_name, remote_node_name2, callback_remote_node2) + + def wait_param_event(executor: SingleThreadedExecutor, timeout: int, + condition: Optional[Callable[[], bool]] = None): + start = time.monotonic() + while time.monotonic() - start < timeout: + executor.spin_once(0.2) + if condition is not None and condition(): + break + + thread = threading.Thread(target=wait_param_event, args=(self.executor, 2)) + thread.start() + time.sleep(0.1) # 100ms + remote_node1.set_parameters( + [Parameter(remote_node1_param_name, Parameter.Type.INTEGER, 20)]) + remote_node2.set_parameters( + [Parameter(remote_node2_param_name, Parameter.Type.STRING, 'abc')]) + thread.join() + + assert not received_event_from_remote_node1 + assert received_event_from_remote_node2 + + # Clear node filter and all parameter events from remote nodes should be received + assert self.parameter_event_handler.configure_nodes_filter() + + received_event_from_remote_node1 = False + received_event_from_remote_node2 = False + + def check_both_received(): + return received_event_from_remote_node1 and received_event_from_remote_node2 + + thread = threading.Thread( + target=wait_param_event, + args=(self.executor, 2, check_both_received)) + thread.start() + time.sleep(0.1) # 100ms + remote_node1.set_parameters( + [Parameter(remote_node1_param_name, Parameter.Type.INTEGER, 30)]) + remote_node2.set_parameters( + [Parameter(remote_node2_param_name, Parameter.Type.STRING, 'def')]) + thread.join() + + assert received_event_from_remote_node1 + assert received_event_from_remote_node2 + + remote_node1.destroy_node() + remote_node2.destroy_node() + if __name__ == '__main__': unittest.main()