From f533dbf869b30234fa0d62fd681ade65b38f3a14 Mon Sep 17 00:00:00 2001 From: Leander Stephen D'Souza Date: Fri, 4 Apr 2025 00:26:33 +0000 Subject: [PATCH 1/3] Configured nav2_loopback_sim to be compatible with mypy. Signed-off-by: Leander Stephen D'Souza --- .../launch/loopback_simulation.launch.py | 2 +- .../nav2_loopback_sim/loopback_simulator.py | 51 ++++++++++--------- nav2_loopback_sim/nav2_loopback_sim/utils.py | 11 ++-- nav2_loopback_sim/test/test_copyright.py | 2 +- nav2_loopback_sim/test/test_flake8.py | 7 ++- nav2_loopback_sim/test/test_pep257.py | 2 +- tools/pyproject.toml | 7 +++ 7 files changed, 47 insertions(+), 35 deletions(-) diff --git a/nav2_loopback_sim/launch/loopback_simulation.launch.py b/nav2_loopback_sim/launch/loopback_simulation.launch.py index 48883f0cb6a..18756f240d0 100644 --- a/nav2_loopback_sim/launch/loopback_simulation.launch.py +++ b/nav2_loopback_sim/launch/loopback_simulation.launch.py @@ -21,7 +21,7 @@ from launch_ros.actions import Node -def generate_launch_description(): +def generate_launch_description() -> LaunchDescription: bringup_dir = get_package_share_directory('nav2_bringup') params_file = LaunchConfiguration('params_file') declare_params_file_cmd = DeclareLaunchArgument( diff --git a/nav2_loopback_sim/nav2_loopback_sim/loopback_simulator.py b/nav2_loopback_sim/nav2_loopback_sim/loopback_simulator.py index 18cbcaf5199..53d67540214 100644 --- a/nav2_loopback_sim/nav2_loopback_sim/loopback_simulator.py +++ b/nav2_loopback_sim/nav2_loopback_sim/loopback_simulator.py @@ -14,16 +14,19 @@ import math +from typing import Optional from geometry_msgs.msg import (PoseWithCovarianceStamped, Quaternion, TransformStamped, Twist, TwistStamped, Vector3) from nav2_simple_commander.line_iterator import LineIterator from nav_msgs.msg import Odometry from nav_msgs.srv import GetMap +import numpy as np import rclpy from rclpy.duration import Duration from rclpy.node import Node from rclpy.qos import DurabilityPolicy, QoSProfile, ReliabilityPolicy +from rclpy.timer import Timer from rosgraph_msgs.msg import Clock from sensor_msgs.msg import LaserScan from tf2_ros import Buffer, TransformBroadcaster, TransformListener @@ -44,15 +47,15 @@ class LoopbackSimulator(Node): - def __init__(self): + def __init__(self) -> None: super().__init__(node_name='loopback_simulator') self.curr_cmd_vel = None self.curr_cmd_vel_time = self.get_clock().now() - self.initial_pose = None - self.timer = None + self.initial_pose: PoseWithCovarianceStamped = None + self.timer: Optional[Timer] = None self.setupTimer = None self.map = None - self.mat_base_to_laser = None + self.mat_base_to_laser: Optional[np.ndarray[np.float64, np.dtype[np.float64]]] = None self.declare_parameter('update_duration', 0.01) self.update_dur = self.get_parameter('update_duration').get_parameter_value().double_value @@ -126,7 +129,7 @@ def __init__(self): self.info('Loopback simulator initialized') - def getBaseToLaserTf(self): + def getBaseToLaserTf(self) -> None: try: # Wait for transform and lookup transform = self.tf_buffer.lookup_transform( @@ -134,20 +137,20 @@ def getBaseToLaserTf(self): self.mat_base_to_laser = transformStampedToMatrix(transform) except Exception as ex: - self.get_logger().error('Transform lookup failed: %s' % str(ex)) + self.get_logger().error(f'Transform lookup failed: {str(ex)}') - def setupTimerCallback(self): + def setupTimerCallback(self) -> None: # Publish initial identity odom transform & laser scan to warm up system self.tf_broadcaster.sendTransform(self.t_odom_to_base_link) if self.mat_base_to_laser is None: self.getBaseToLaserTf() - def clockTimerCallback(self): + def clockTimerCallback(self) -> None: msg = Clock() msg.clock = self.get_clock().now().to_msg() self.clock_pub.publish(msg) - def cmdVelCallback(self, msg): + def cmdVelCallback(self, msg: Twist) -> None: self.debug('Received cmd_vel') if self.initial_pose is None: # Don't consider velocities before the initial pose is set @@ -155,7 +158,7 @@ def cmdVelCallback(self, msg): self.curr_cmd_vel = msg self.curr_cmd_vel_time = self.get_clock().now() - def cmdVelStampedCallback(self, msg): + def cmdVelStampedCallback(self, msg: TwistStamped) -> None: self.debug('Received cmd_vel') if self.initial_pose is None: # Don't consider velocities before the initial pose is set @@ -163,7 +166,7 @@ def cmdVelStampedCallback(self, msg): self.curr_cmd_vel = msg.twist self.curr_cmd_vel_time = rclpy.time.Time.from_msg(msg.header.stamp) - def initialPoseCallback(self, msg): + def initialPoseCallback(self, msg: PoseWithCovarianceStamped) -> None: self.info('Received initial pose!') if self.initial_pose is None: # Initialize transforms (map->odom as input pose, odom->base_link start from identity) @@ -202,7 +205,7 @@ def initialPoseCallback(self, msg): tf_transformations.concatenate_matrices(mat_map_to_base_link, mat_base_link_to_odom) self.t_map_to_odom.transform = matrixToTransform(mat_map_to_odom) - def timerCallback(self): + def timerCallback(self) -> None: # If no data, just republish existing transforms without change one_sec = Duration(seconds=1) if self.curr_cmd_vel is None or self.get_clock().now() - self.curr_cmd_vel_time > one_sec: @@ -227,7 +230,7 @@ def timerCallback(self): self.publishTransforms(self.t_map_to_odom, self.t_odom_to_base_link) self.publishOdometry(self.t_odom_to_base_link) - def publishLaserScan(self): + def publishLaserScan(self) -> None: # Publish a bogus laser scan for collision monitor self.scan_msg = LaserScan() self.scan_msg.header.stamp = (self.get_clock().now()).to_msg() @@ -247,7 +250,8 @@ def publishLaserScan(self): self.getLaserScan(num_samples) self.scan_pub.publish(self.scan_msg) - def publishTransforms(self, map_to_odom, odom_to_base_link): + def publishTransforms(self, map_to_odom: TransformStamped, + odom_to_base_link: TransformStamped) -> None: map_to_odom.header.stamp = \ (self.get_clock().now() + Duration(seconds=self.update_dur)).to_msg() odom_to_base_link.header.stamp = self.get_clock().now().to_msg() @@ -255,7 +259,7 @@ def publishTransforms(self, map_to_odom, odom_to_base_link): self.tf_broadcaster.sendTransform(map_to_odom) self.tf_broadcaster.sendTransform(odom_to_base_link) - def publishOdometry(self, odom_to_base_link): + def publishOdometry(self, odom_to_base_link: TransformStamped) -> None: odom = Odometry() odom.header.stamp = self.get_clock().now().to_msg() odom.header.frame_id = 'odom' @@ -266,22 +270,23 @@ def publishOdometry(self, odom_to_base_link): odom.twist.twist = self.curr_cmd_vel self.odom_pub.publish(odom) - def info(self, msg): + def info(self, msg: str) -> None: self.get_logger().info(msg) return - def debug(self, msg): + def debug(self, msg: str) -> None: self.get_logger().debug(msg) return - def getMap(self): + def getMap(self) -> None: request = GetMap.Request() if self.map_client.wait_for_service(timeout_sec=5.0): # Request to get map future = self.map_client.call_async(request) rclpy.spin_until_future_complete(self, future) - if future.result() is not None: - self.map = future.result().map + result = future.result() + if result is not None: + self.map = result.map self.get_logger().info('Laser scan will be populated using map data') else: self.get_logger().warn( @@ -294,7 +299,7 @@ def getMap(self): 'Laser scan will be populated using max range' ) - def getLaserPose(self): + def getLaserPose(self) -> tuple[float, float, float]: mat_map_to_odom = transformStampedToMatrix(self.t_map_to_odom) mat_odom_to_base = transformStampedToMatrix(self.t_odom_to_base_link) @@ -316,7 +321,7 @@ def getLaserPose(self): return x, y, theta - def getLaserScan(self, num_samples): + def getLaserScan(self, num_samples: int) -> None: if self.map is None or self.initial_pose is None or self.mat_base_to_laser is None: self.scan_msg.ranges = [self.scan_msg.range_max - 0.1] * num_samples return @@ -358,7 +363,7 @@ def getLaserScan(self, num_samples): line_iterator.advance() -def main(): +def main() -> None: rclpy.init() loopback_simulator = LoopbackSimulator() rclpy.spin(loopback_simulator) diff --git a/nav2_loopback_sim/nav2_loopback_sim/utils.py b/nav2_loopback_sim/nav2_loopback_sim/utils.py index ff5c6012e37..be3b4e73c29 100644 --- a/nav2_loopback_sim/nav2_loopback_sim/utils.py +++ b/nav2_loopback_sim/nav2_loopback_sim/utils.py @@ -16,6 +16,7 @@ import math from geometry_msgs.msg import Quaternion, Transform +from nav_msgs.msg import OccupancyGrid import numpy as np import tf_transformations @@ -24,7 +25,7 @@ """ -def addYawToQuat(quaternion, yaw_to_add): +def addYawToQuat(quaternion: Quaternion, yaw_to_add: float) -> Quaternion: q = [quaternion.x, quaternion.y, quaternion.z, quaternion.w] q2 = tf_transformations.quaternion_from_euler(0.0, 0.0, yaw_to_add) q_new = tf_transformations.quaternion_multiply(q, q2) @@ -36,7 +37,7 @@ def addYawToQuat(quaternion, yaw_to_add): return new_quaternion -def transformStampedToMatrix(transform): +def transformStampedToMatrix(transform: Transform) -> np.ndarray[np.float64, np.dtype[np.float64]]: translation = transform.transform.translation rotation = transform.transform.rotation matrix = np.eye(4) @@ -53,7 +54,7 @@ def transformStampedToMatrix(transform): return matrix -def matrixToTransform(matrix): +def matrixToTransform(matrix: np.ndarray[np.float64, np.dtype[np.float64]]) -> Transform: transform = Transform() transform.translation.x = matrix[0, 3] transform.translation.y = matrix[1, 3] @@ -66,11 +67,11 @@ def matrixToTransform(matrix): return transform -def worldToMap(world_x, world_y, map_msg): +def worldToMap(world_x: float, world_y: float, map_msg: OccupancyGrid) -> tuple[int, int]: map_x = int(math.floor((world_x - map_msg.info.origin.position.x) / map_msg.info.resolution)) map_y = int(math.floor((world_y - map_msg.info.origin.position.y) / map_msg.info.resolution)) return map_x, map_y -def getMapOccupancy(x, y, map_msg): +def getMapOccupancy(x: int, y: int, map_msg: OccupancyGrid): # type: ignore[no-untyped-def] return map_msg.data[y * map_msg.info.width + x] diff --git a/nav2_loopback_sim/test/test_copyright.py b/nav2_loopback_sim/test/test_copyright.py index cc8ff03f79a..1f869834559 100644 --- a/nav2_loopback_sim/test/test_copyright.py +++ b/nav2_loopback_sim/test/test_copyright.py @@ -18,6 +18,6 @@ @pytest.mark.copyright @pytest.mark.linter -def test_copyright(): +def test_copyright() -> None: rc = main(argv=['.', 'test']) assert rc == 0, 'Found errors' diff --git a/nav2_loopback_sim/test/test_flake8.py b/nav2_loopback_sim/test/test_flake8.py index 26030113cce..fe9aa6a300c 100644 --- a/nav2_loopback_sim/test/test_flake8.py +++ b/nav2_loopback_sim/test/test_flake8.py @@ -18,8 +18,7 @@ @pytest.mark.flake8 @pytest.mark.linter -def test_flake8(): +def test_flake8() -> None: rc, errors = main_with_errors(argv=[]) - assert rc == 0, 'Found %d code style errors / warnings:\n' % len( - errors - ) + '\n'.join(errors) + assert rc == 0, f'Found {len(errors)} code style errors / warnings:\n' + \ + '\n'.join(errors) diff --git a/nav2_loopback_sim/test/test_pep257.py b/nav2_loopback_sim/test/test_pep257.py index b234a3840f4..b6808e1d80f 100644 --- a/nav2_loopback_sim/test/test_pep257.py +++ b/nav2_loopback_sim/test/test_pep257.py @@ -18,6 +18,6 @@ @pytest.mark.linter @pytest.mark.pep257 -def test_pep257(): +def test_pep257() -> None: rc = main(argv=['.', 'test']) assert rc == 0, 'Found code style errors / warnings' diff --git a/tools/pyproject.toml b/tools/pyproject.toml index 9f88cd9de93..012f67b60f1 100644 --- a/tools/pyproject.toml +++ b/tools/pyproject.toml @@ -25,11 +25,18 @@ module = [ "ament_index_python.*", "nav2_common.*", "nav2_msgs.*", + "nav2_simple_commander.*", "launch_testing.*", "action_msgs.*", "geometry_msgs.*", "sensor_msgs.*", "tf2_ros.*", + "nav_msgs.*", + "rosgraph_msgs.*", + "tf_transformations.*", + "ament_pep257.*", + "ament_flake8.*", + "ament_copyright.*", ] ignore_missing_imports = true From 098e8f646a2c6f1c2d2c3c0dcb981f2464b9cb72 Mon Sep 17 00:00:00 2001 From: Leander Stephen D'Souza Date: Fri, 4 Apr 2025 00:27:19 +0000 Subject: [PATCH 2/3] Added nav2_loopback_sim to the workflow. Signed-off-by: Leander Stephen D'Souza --- .github/workflows/lint.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index ed639ab1bb4..21c96f169d1 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -35,6 +35,7 @@ jobs: - "nav2_costmap_2d" - "opennav_docking" - "nav2_lifecycle_manager" + - "nav2_loopback_sim" steps: - uses: actions/checkout@v4 From f41dc60f7fe9d8087bcdbee0a01b34b6ee18153b Mon Sep 17 00:00:00 2001 From: Leander Stephen D'Souza Date: Fri, 4 Apr 2025 20:16:28 +0000 Subject: [PATCH 3/3] Separated packages from list for mypy workflow. Signed-off-by: Leander Stephen D'Souza --- .github/workflows/lint.yml | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 21c96f169d1..268035bc602 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -25,17 +25,6 @@ jobs: runs-on: ubuntu-latest container: image: rostooling/setup-ros-docker:ubuntu-noble-ros-rolling-ros-base-latest - strategy: - matrix: - mypy_packages: - - "nav2_smac_planner" - - "nav2_common" - - "nav2_bringup" - - "nav2_collision_monitor" - - "nav2_costmap_2d" - - "opennav_docking" - - "nav2_lifecycle_manager" - - "nav2_loopback_sim" steps: - uses: actions/checkout@v4 @@ -46,7 +35,15 @@ jobs: with: linter: mypy distribution: rolling - package-name: "${{ join(matrix.mypy_packages, ' ') }}" + package-name: >- + nav2_smac_planner + nav2_common + nav2_bringup + nav2_collision_monitor + nav2_costmap_2d + opennav_docking + nav2_lifecycle_manager + nav2_loopback_sim arguments: --config tools/pyproject.toml pre-commit: