Skip to content

Commit

Permalink
check_data_external: add action to check with custom python function (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
fred-labs authored Sep 16, 2024
1 parent aafaf2e commit db6bfd3
Show file tree
Hide file tree
Showing 7 changed files with 307 additions and 0 deletions.
49 changes: 49 additions & 0 deletions docs/libraries.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1015,6 +1015,55 @@ Compare received topic messages using the given ``comparison_operator``, against
- ``true``
- start checking with the first received message after action execution. If false, the check is executed on the last received message.


``check_data_external()``
^^^^^^^^^^^^^^^^^^^^^^^^^

Compare received topic messages using an external python function ``function_name`` defined in python file ``file_path`` relative to the scenario-file.

.. list-table::
:widths: 15 15 5 65
:header-rows: 1
:class: tight-table

* - Parameter
- Type
- Default
- Description
* - ``topic_name``
- ``string``
-
- Name of the topic to connect to
* - ``topic_type``
- ``string``
-
- Class of the message type (e.g. ``std_msgs.msg.String``)
* - ``qos_profile``
- ``qos_preset_profiles``
- ``qos_preset_profiles!system_default``
- QoS Preset Profile for the subscriber
* - ``file_path``
- ``string``
-
- Path to python file containing the external check function
* - ``function_name``
- ``string``
-
- python function to be called. The function is expected to have the signature: ``def function_name(msg) -> bool``
* - ``fail_if_no_data``
- ``bool``
- ``false``
- return failure if there is no data yet
* - ``fail_if_bad_comparison``
- ``bool``
- ``true``
- return failure if comparison failed
* - ``wait_for_first_message``
- ``bool``
- ``true``
- start checking with the first received message after action execution. If false, the check is executed on the last received message.


``differential_drive_robot.odometry_distance_traveled()``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
# Copyright (C) 2024 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions
# and limitations under the License.
#
# SPDX-License-Identifier: Apache-2.0

import py_trees
import rclpy
import os
import importlib
from ast import literal_eval
from rosidl_runtime_py.set_message import set_message_fields
from scenario_execution_ros.actions.conversions import get_qos_preset_profile, get_ros_message_type
import builtins
from scenario_execution.actions.base_action import BaseAction, ActionError


class RosTopicCheckDataExternal(BaseAction):
"""
Class to check if the message on ROS topic equals to the target message
"""

def __init__(self,
topic_name: str,
topic_type: str,
qos_profile: tuple,
file_path: str,
function_name: str):
super().__init__()
self.check_data = None
self.topic_name = topic_name
self.topic_type = topic_type
self.qos_profile = qos_profile
self.file_path = file_path
self.function_name = function_name
self.fail_if_no_data = None
self.fail_if_bad_comparison = None
self.wait_for_first_message = None
self.last_msg = None
self.found = None

def setup(self, **kwargs):
"""
Setup the subscriber
"""
try:
self.node: rclpy.Node = kwargs['node']
except KeyError as e:
error_message = "didn't find 'node' in setup's kwargs [{}][{}]".format(
self.name, self.__class__.__name__)
raise ActionError(error_message, action=self) from e

# check if msg type exists
try:
get_ros_message_type(self.topic_type)()
except (ValueError, AttributeError) as e:
raise ActionError(f"Invalid topic type '{self.topic_type}'.", action=self) from e

if os.path.isabs(self.file_path):
raise ActionError(f"Only relative function_file allowed: '{self.topic_type}'.", action=self)

if 'input_dir' not in kwargs:
raise ActionError("input_dir not defined.", action=self)

path = self.file_path
if kwargs['input_dir']:
path = os.path.join(kwargs['input_dir'], path)

spec = importlib.util.spec_from_file_location(f"check_data_external_{self.function_name}", path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)

try:
self.check_data = getattr(module, self.function_name)
except AttributeError as e:
raise ActionError(f"Check function '{self.function_name}' not found in file '{self.file_path}'.", action=self) from e

self.subscriber = self.node.create_subscription(
msg_type=get_ros_message_type(self.topic_type),
topic=self.topic_name,
callback=self._callback,
qos_profile=get_qos_preset_profile(self.qos_profile),
callback_group=rclpy.callback_groups.ReentrantCallbackGroup()
)
self.feedback_message = f"Waiting for data on {self.topic_name}" # pylint: disable= attribute-defined-outside-init

def execute(self,
fail_if_no_data: bool,
fail_if_bad_comparison: bool,
wait_for_first_message: bool):
self.fail_if_no_data = fail_if_no_data
self.fail_if_bad_comparison = fail_if_bad_comparison
self.wait_for_first_message = wait_for_first_message
if wait_for_first_message:
self.found = False
else:
self.found = self.check_data(self.last_msg)
if self.found is True:
self.feedback_message = f"Found expected value in previously received message." # pylint: disable= attribute-defined-outside-init

def update(self) -> py_trees.common.Status:
if self.found is True:
return py_trees.common.Status.SUCCESS
elif self.found is False:
if self.fail_if_bad_comparison:
return py_trees.common.Status.FAILURE

return py_trees.common.Status.RUNNING

def _callback(self, msg):
self.last_msg = msg
self.found = self.check_data(msg)
if self.found is True:
self.feedback_message = f"Found expected value in received message."
else:
self.feedback_message = f"Received message does not contain expected value."

def set_expected_value(self, expected_value_string):
if not isinstance(expected_value_string, str):
raise ActionError("Only string allowed as expected_value.", action=self)
error_string = ""
try:
parsed_value = literal_eval("".join(expected_value_string.split('\\')))
msg = get_ros_message_type(self.topic_type)()
if self.member_name == "":
self.expected_value = msg
set_message_fields(self.expected_value, parsed_value)
else:
self.expected_value = getattr(msg, self.member_name)
error_string = f"Member {self.member_name} is expected to be of type {type(self.expected_value).__name__}. "
if type(self.expected_value).__name__ in dir(builtins):
self.expected_value = parsed_value
else:
set_message_fields(self.expected_value, parsed_value)
except (TypeError, AttributeError) as e:
raise ActionError(f"Could not parse '{expected_value_string}'. {error_string}", action=self) from e
11 changes: 11 additions & 0 deletions scenario_execution_ros/scenario_execution_ros/lib_osc/ros.osc
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,17 @@ action check_data:
fail_if_bad_comparison: bool = false # py_trees.common.Status.FAILURE instead of py_trees.common.Status.RUNNING if comparison failed
wait_for_first_message: bool = true # start checking with the first received message after action execution. If false, the check is executed on the last received message

action check_data_external:
# Compare received topic messages using an external python function.
topic_name: string # name of the topic to connect to
topic_type: string # class of the message type (e.g. std_msgs.msg.String)
qos_profile: qos_preset_profiles = qos_preset_profiles!system_default # qos profile for the subscriber
file_path: string # path to python file containing the external check function
function_name: string # python function to be called. The function is expected to have the signature: 'def function_name(msg) -> bool'
fail_if_no_data: bool = false # py_trees.common.Status.FAILURE instead of py_trees.common.Status.RUNNING if there is no data on action execution
fail_if_bad_comparison: bool = false # py_trees.common.Status.FAILURE instead of py_trees.common.Status.RUNNING if comparison failed
wait_for_first_message: bool = true # start checking with the first received message after action execution. If false, the check is executed on the last received message

action differential_drive_robot.odometry_distance_traveled:
# Wait until a defined distance was traveled, based on odometry
distance: length # traveled distance at which the action succeeds.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import osc.helpers
import osc.ros

scenario test:
timeout(10s)
do serial:
check_data_external(
topic_name: '/foo',
topic_type: 'std_msgs.msg.String',
file_path: 'test_ros_check_data_external.py',
function_name: 'check_msg')
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#!/usr/bin/env python3
# Copyright (C) 2024 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions
# and limitations under the License.
#
# SPDX-License-Identifier: Apache-2.0


def check_msg(msg):
return msg.data == "foo"
2 changes: 2 additions & 0 deletions scenario_execution_ros/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
('share/' + PACKAGE_NAME, ['package.xml']),
(os.path.join('share', PACKAGE_NAME, 'scenarios'), glob('scenarios/*.osc')),
(os.path.join('share', PACKAGE_NAME, 'scenarios', 'test'), glob('scenarios/test/*osc')),
(os.path.join('share', PACKAGE_NAME, 'scenarios', 'test'), glob('scenarios/test/*py')),
(os.path.join('share', PACKAGE_NAME, 'launch'), glob('launch/*launch.py'))
],
install_requires=[
Expand All @@ -56,6 +57,7 @@
'bag_play = scenario_execution_ros.actions.ros_bag_play:RosBagPlay',
'bag_record = scenario_execution_ros.actions.ros_bag_record:RosBagRecord',
'check_data = scenario_execution_ros.actions.ros_topic_check_data:RosTopicCheckData',
'check_data_external = scenario_execution_ros.actions.ros_topic_check_data_external:RosTopicCheckDataExternal',
'differential_drive_robot.odometry_distance_traveled = scenario_execution_ros.actions.odometry_distance_traveled:OdometryDistanceTraveled',
'differential_drive_robot.tf_close_to = scenario_execution_ros.actions.tf_close_to:TfCloseTo',
'log_check = scenario_execution_ros.actions.ros_log_check:RosLogCheck',
Expand Down
68 changes: 68 additions & 0 deletions scenario_execution_ros/test/test_check_data_external.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# Copyright (C) 2024 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions
# and limitations under the License.
#
# SPDX-License-Identifier: Apache-2.0

import os
import unittest
import threading

from ament_index_python.packages import get_package_share_directory

import rclpy
from std_msgs.msg import String

from scenario_execution_ros import ROSScenarioExecution
from scenario_execution.model.osc2_parser import OpenScenario2Parser
from scenario_execution.utils.logging import Logger


class TestRosCheckDataExternal(unittest.TestCase):
# pylint: disable=missing-function-docstring,missing-class-docstring

def setUp(self) -> None:
rclpy.init()
self.parser = OpenScenario2Parser(Logger('test', False))
self.scenario_execution_ros = ROSScenarioExecution()

self.scenario_dir = get_package_share_directory('scenario_execution_ros')

self.received_msgs = []
self.node = rclpy.create_node('test_node')
self.publisher = self.node.create_publisher(String, "/foo", 10)
self.srv = self.node.create_timer(1, self.timer_callback)

self.executor = rclpy.executors.MultiThreadedExecutor()
self.executor.add_node(self.node)
self.executor_thread = threading.Thread(target=self.executor.spin, daemon=True)
self.executor_thread.start()

def timer_callback(self):
self.publisher.publish(String(data="foo"))

def tearDown(self):
self.node.destroy_node()
rclpy.try_shutdown()

def callback(self, msg):
self.received_msgs.append(msg)

def test_success(self):
tree = self.parser.process_file(os.path.join(
self.scenario_dir, 'scenarios', 'test', 'test_ros_check_data_external.osc'), False)
self.scenario_execution_ros.scenario_file = os.path.join(
self.scenario_dir, 'scenarios', 'test', 'test_ros_check_data_external.osc')
self.scenario_execution_ros.tree = tree
self.scenario_execution_ros.run()
self.assertTrue(self.scenario_execution_ros.process_results())

0 comments on commit db6bfd3

Please sign in to comment.