Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions rclpy/rclpy/parameter_event_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,48 @@ def remove_parameter_event_callback(
"""
self._callbacks.remove_parameter_event_callback(handle)

def configure_nodes_filter(
self,
node_names: Optional[List[str]] = None,
) -> bool:
"""
Configure which node parameter events will be received.

This function depends on middleware support for content filtering.
If middleware doesn't support contentfilter, return false.

If node_names is empty, the configured node filter will be cleared.

If this function return true, only parameter events from the specified node will be
received.
It affects the behavior of the following two functions.
- add_parameter_event_callback()
The callback will only be called for parameter events from the specified nodes which are
configured in this function.
- add_parameter_callback()
The callback will only be called for parameter events from the specified nodes which are
configured in this function and add_parameter_callback().
If the nodes specified in this function is different from the nodes specified in
add_parameter_callback(), the callback will never be called.

:param node_names: Node names to filter parameter events from

:return: True if the filter was successfully applied, False otherwise.
"""
if node_names is None or len(node_names) == 0:
# Clear content filter
self.parameter_event_subscription.set_content_filter('', [])
return True

filter_expression = ' OR '.join([f'node = %{i}' for i in range(len(node_names))])

# Enclose each node name in "'"
quoted_node_names = [f"'{node_name}'" for node_name in node_names]

self.parameter_event_subscription.set_content_filter(filter_expression, quoted_node_names)

return self.parameter_event_subscription.is_cft_enabled

def _resolve_path(
self,
node_path: Optional[str] = None,
Expand Down
168 changes: 168 additions & 0 deletions rclpy/test/test_parameter_event_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import threading
import time
from typing import Any
from typing import Callable, Optional
from typing import Union
import unittest

import pytest

from rcl_interfaces.msg import Parameter as ParameterMsg
from rcl_interfaces.msg import ParameterEvent
import rclpy.context
from rclpy.executors import SingleThreadedExecutor
Expand Down Expand Up @@ -296,6 +300,170 @@ def test_resolve_path_same_namespace(self) -> None:
def test_resolve_path_other_namespace(self) -> None:
assert '/test_node' == self.parameter_event_handler._resolve_path('/test_node')

def test_configure_nodes_filter_with_check_add_parameter_event_callback(self) -> None:
remote_node_name1 = 'remote_node_1'
remote_node1 = rclpy.create_node(
remote_node_name1,
namespace='/rclpy',
context=self.context)
remote_node_name2 = 'remote_node_2'
remote_node2 = rclpy.create_node(
remote_node_name2,
namespace='/rclpy',
context=self.context)

remote_node1_param_name = 'param_node1'
remote_node2_param_name = 'param_node2'
remote_node1.declare_parameter(remote_node1_param_name, 10)
remote_node2.declare_parameter(remote_node2_param_name, 'Default')

received_event_from_remote_node1 = False
received_event_from_remote_node2 = False

def callback(param: ParameterEvent) -> None:
nonlocal received_event_from_remote_node1, received_event_from_remote_node2
if param.node == f'/rclpy/{remote_node_name1}':
received_event_from_remote_node1 = True
elif param.node == f'/rclpy/{remote_node_name2}':
received_event_from_remote_node2 = True

# Configure to only receive parameter events from remote_node_name2
assert self.parameter_event_handler.configure_nodes_filter(
[f'/rclpy/{remote_node_name2}'])

self.parameter_event_handler.add_parameter_event_callback(callback)

def wait_param_event(executor: SingleThreadedExecutor, timeout: int,
condition: Optional[Callable[[], bool]] = None):
start = time.monotonic()
while time.monotonic() - start < timeout:
executor.spin_once(0.2)
if condition is not None and condition():
break

thread = threading.Thread(target=wait_param_event, args=(self.executor, 2))
thread.start()
time.sleep(0.1) # 100ms
remote_node1.set_parameters(
[Parameter(remote_node1_param_name, Parameter.Type.INTEGER, 20)])
remote_node2.set_parameters(
[Parameter(remote_node2_param_name, Parameter.Type.STRING, 'abc')])
thread.join()

assert not received_event_from_remote_node1
assert received_event_from_remote_node2

# Clear node filter and all parameter events from remote nodes should be received
assert self.parameter_event_handler.configure_nodes_filter()

received_event_from_remote_node1 = False
received_event_from_remote_node2 = False

def check_both_received():
return received_event_from_remote_node1 and received_event_from_remote_node2

thread = threading.Thread(
target=wait_param_event,
args=(self.executor, 2, check_both_received))
thread.start()
time.sleep(0.1) # 100ms
remote_node1.set_parameters(
[Parameter(remote_node1_param_name, Parameter.Type.INTEGER, 30)])
remote_node2.set_parameters(
[Parameter(remote_node2_param_name, Parameter.Type.STRING, 'def')])
thread.join()

assert received_event_from_remote_node1
assert received_event_from_remote_node2

remote_node1.destroy_node()
remote_node2.destroy_node()

def test_configure_nodes_filter_with_check_add_parameter_callback(self) -> None:
remote_node_name1 = 'remote_node_1'
remote_node1 = rclpy.create_node(
remote_node_name1,
namespace='/rclpy',
context=self.context)
remote_node_name2 = 'remote_node_2'
remote_node2 = rclpy.create_node(
remote_node_name2,
namespace='/rclpy',
context=self.context)

remote_node1_param_name = 'param_node1'
remote_node2_param_name = 'param_node2'
remote_node1.declare_parameter(remote_node1_param_name, 10)
remote_node2.declare_parameter(remote_node2_param_name, 'Default')

received_event_from_remote_node1 = False
received_event_from_remote_node2 = False

def callback_remote_node1(param: ParameterMsg) -> None:
nonlocal received_event_from_remote_node1
if param.name == remote_node1_param_name:
received_event_from_remote_node1 = True

def callback_remote_node2(param: ParameterMsg) -> None:
nonlocal received_event_from_remote_node2
if param.name == remote_node2_param_name:
received_event_from_remote_node2 = True

# Configure to only receive parameter events from remote_node_name2
assert self.parameter_event_handler.configure_nodes_filter(
[f'/rclpy/{remote_node_name2}'])

self.parameter_event_handler.add_parameter_callback(
remote_node1_param_name, remote_node_name1, callback_remote_node1)
self.parameter_event_handler.add_parameter_callback(
remote_node2_param_name, remote_node_name2, callback_remote_node2)

def wait_param_event(executor: SingleThreadedExecutor, timeout: int,
condition: Optional[Callable[[], bool]] = None):
start = time.monotonic()
while time.monotonic() - start < timeout:
executor.spin_once(0.2)
if condition is not None and condition():
break

thread = threading.Thread(target=wait_param_event, args=(self.executor, 2))
thread.start()
time.sleep(0.1) # 100ms
remote_node1.set_parameters(
[Parameter(remote_node1_param_name, Parameter.Type.INTEGER, 20)])
remote_node2.set_parameters(
[Parameter(remote_node2_param_name, Parameter.Type.STRING, 'abc')])
thread.join()

assert not received_event_from_remote_node1
assert received_event_from_remote_node2

# Clear node filter and all parameter events from remote nodes should be received
assert self.parameter_event_handler.configure_nodes_filter()

received_event_from_remote_node1 = False
received_event_from_remote_node2 = False

def check_both_received():
return received_event_from_remote_node1 and received_event_from_remote_node2

thread = threading.Thread(
target=wait_param_event,
args=(self.executor, 2, check_both_received))
thread.start()
time.sleep(0.1) # 100ms
remote_node1.set_parameters(
[Parameter(remote_node1_param_name, Parameter.Type.INTEGER, 30)])
remote_node2.set_parameters(
[Parameter(remote_node2_param_name, Parameter.Type.STRING, 'def')])
thread.join()

assert received_event_from_remote_node1
assert received_event_from_remote_node2

remote_node1.destroy_node()
remote_node2.destroy_node()


if __name__ == '__main__':
unittest.main()