Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions nav2_loopback_sim/nav2_loopback_sim/loopback_simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@

from geometry_msgs.msg import (PoseWithCovarianceStamped, Quaternion, TransformStamped, Twist,
TwistStamped, Vector3)
from nav2_loopback_sim.utils import (addYawToQuat, getMapOccupancy, matrixToTransform,
transformStampedToMatrix, worldToMap)
from nav2_simple_commander.line_iterator import LineIterator
from nav_msgs.msg import Odometry
from nav_msgs.srv import GetMap
import numpy as np
import rclpy
from rclpy.client import Client
from rclpy.duration import Duration
from rclpy.node import Node
from rclpy.qos import DurabilityPolicy, QoSProfile, ReliabilityPolicy
Expand All @@ -32,9 +35,6 @@
from tf2_ros import Buffer, TransformBroadcaster, TransformListener
import tf_transformations

from .utils import (addYawToQuat, getMapOccupancy, matrixToTransform, transformStampedToMatrix,
worldToMap)

"""
This is a loopback simulator that replaces a physics simulator to create a
frictionless, inertialess, and collisionless simulation environment. It
Expand Down Expand Up @@ -144,7 +144,8 @@ def __init__(self) -> None:

self.setupTimer = self.create_timer(0.1, self.setupTimerCallback)

self.map_client = self.create_client(GetMap, '/map_server/map')
self.map_client: Client[GetMap.Request, GetMap.Response] = \
self.create_client(GetMap, '/map_server/map')

self.tf_buffer = Buffer()
self.tf_listener = TransformListener(self.tf_buffer, self)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def main() -> None:
navigator.waitUntilNav2Active()

# Do security route until dead
while rclpy.ok(): # type: ignore[attr-defined]
while rclpy.ok():
# Send our route
route_poses = []
pose = PoseStamped()
Expand Down
139 changes: 110 additions & 29 deletions nav2_simple_commander/nav2_simple_commander/robot_navigator.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import rclpy
from rclpy.action import ActionClient # type: ignore[attr-defined]
from rclpy.action.client import ClientGoalHandle
from rclpy.client import Client
from rclpy.duration import Duration as rclpyDuration
from rclpy.node import Node
from rclpy.qos import QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy
Expand Down Expand Up @@ -103,37 +104,101 @@ def __init__(self, node_name: str = 'basic_navigator', namespace: str = '') -> N
)

self.initial_pose_received = False
self.nav_through_poses_client = ActionClient(
self, NavigateThroughPoses, 'navigate_through_poses'
)
self.nav_to_pose_client = ActionClient(self, NavigateToPose, 'navigate_to_pose')
self.follow_waypoints_client = ActionClient(
self.nav_through_poses_client: ActionClient[
NavigateThroughPoses.Goal,
NavigateThroughPoses.Result,
NavigateThroughPoses.Feedback
] = ActionClient(
self, NavigateThroughPoses, 'navigate_through_poses')
self.nav_to_pose_client: ActionClient[
NavigateToPose.Goal,
NavigateToPose.Result,
NavigateToPose.Feedback
] = ActionClient(self, NavigateToPose, 'navigate_to_pose')
self.follow_waypoints_client: ActionClient[
FollowWaypoints.Goal,
FollowWaypoints.Result,
FollowWaypoints.Feedback
] = ActionClient(
self, FollowWaypoints, 'follow_waypoints'
)
self.follow_gps_waypoints_client = ActionClient(
self.follow_gps_waypoints_client: ActionClient[
FollowGPSWaypoints.Goal,
FollowGPSWaypoints.Result,
FollowGPSWaypoints.Feedback
] = ActionClient(
self, FollowGPSWaypoints, 'follow_gps_waypoints'
)
self.follow_path_client = ActionClient(self, FollowPath, 'follow_path')
self.compute_path_to_pose_client = ActionClient(
self.follow_path_client: ActionClient[
FollowPath.Goal,
FollowPath.Result,
FollowPath.Feedback
] = ActionClient(self, FollowPath, 'follow_path')
self.compute_path_to_pose_client: ActionClient[
ComputePathToPose.Goal,
ComputePathToPose.Result,
ComputePathToPose.Feedback
] = ActionClient(
self, ComputePathToPose, 'compute_path_to_pose'
)
self.compute_path_through_poses_client = ActionClient(
self.compute_path_through_poses_client: ActionClient[
ComputePathThroughPoses.Goal,
ComputePathThroughPoses.Result,
ComputePathThroughPoses.Feedback
] = ActionClient(
self, ComputePathThroughPoses, 'compute_path_through_poses'
)
self.smoother_client = ActionClient(self, SmoothPath, 'smooth_path')
self.compute_route_client = ActionClient(self, ComputeRoute, 'compute_route')
self.compute_and_track_route_client = ActionClient(self, ComputeAndTrackRoute,
'compute_and_track_route')
self.spin_client = ActionClient(self, Spin, 'spin')
self.backup_client = ActionClient(self, BackUp, 'backup')
self.drive_on_heading_client = ActionClient(
self.smoother_client: ActionClient[
SmoothPath.Goal,
SmoothPath.Result,
SmoothPath.Feedback
] = ActionClient(self, SmoothPath, 'smooth_path')
self.compute_route_client: ActionClient[
ComputeRoute.Goal,
ComputeRoute.Result,
ComputeRoute.Feedback
] = ActionClient(self, ComputeRoute, 'compute_route')
self.compute_and_track_route_client: ActionClient[
ComputeAndTrackRoute.Goal,
ComputeAndTrackRoute.Result,
ComputeAndTrackRoute.Feedback
] = ActionClient(self, ComputeAndTrackRoute, 'compute_and_track_route')
self.spin_client: ActionClient[
Spin.Goal,
Spin.Result,
Spin.Feedback
] = ActionClient(self, Spin, 'spin')

self.backup_client: ActionClient[
BackUp.Goal,
BackUp.Result,
BackUp.Feedback
] = ActionClient(self, BackUp, 'backup')
self.drive_on_heading_client: ActionClient[
DriveOnHeading.Goal,
DriveOnHeading.Result,
DriveOnHeading.Feedback
] = ActionClient(
self, DriveOnHeading, 'drive_on_heading'
)
self.assisted_teleop_client = ActionClient(
self.assisted_teleop_client: ActionClient[
AssistedTeleop.Goal,
AssistedTeleop.Result,
AssistedTeleop.Feedback
] = ActionClient(
self, AssistedTeleop, 'assisted_teleop'
)
self.docking_client = ActionClient(self, DockRobot, 'dock_robot')
self.undocking_client = ActionClient(self, UndockRobot, 'undock_robot')
self.docking_client: ActionClient[
DockRobot.Goal,
DockRobot.Result,
DockRobot.Feedback
] = ActionClient(self, DockRobot, 'dock_robot')
self.undocking_client: ActionClient[
UndockRobot.Goal,
UndockRobot.Result,
UndockRobot.Feedback
] = ActionClient(self, UndockRobot, 'undock_robot')

self.localization_pose_sub = self.create_subscription(
PoseWithCovarianceStamped,
'amcl_pose',
Expand All @@ -143,23 +208,36 @@ def __init__(self, node_name: str = 'basic_navigator', namespace: str = '') -> N
self.initial_pose_pub = self.create_publisher(
PoseWithCovarianceStamped, 'initialpose', 10
)
self.change_maps_srv = self.create_client(LoadMap, 'map_server/load_map')
self.clear_costmap_global_srv = self.create_client(
self.change_maps_srv: Client[LoadMap.Request, LoadMap.Response] = \
self.create_client(LoadMap, 'map_server/load_map')
self.clear_costmap_global_srv: Client[
ClearEntireCostmap.Request, ClearEntireCostmap.Response] = \
self.create_client(
ClearEntireCostmap, 'global_costmap/clear_entirely_global_costmap'
)
self.clear_costmap_local_srv = self.create_client(
self.clear_costmap_local_srv: Client[
ClearEntireCostmap.Request, ClearEntireCostmap.Response] = \
self.create_client(
ClearEntireCostmap, 'local_costmap/clear_entirely_local_costmap'
)
self.clear_costmap_except_region_srv = self.create_client(
self.clear_costmap_except_region_srv: Client[
ClearCostmapExceptRegion.Request, ClearCostmapExceptRegion.Response] = \
self.create_client(
ClearCostmapExceptRegion, 'local_costmap/clear_costmap_except_region'
)
self.clear_costmap_around_robot_srv = self.create_client(
self.clear_costmap_around_robot_srv: Client[
ClearCostmapAroundRobot.Request, ClearCostmapAroundRobot.Response] = \
self.create_client(
ClearCostmapAroundRobot, 'local_costmap/clear_costmap_around_robot'
)
self.get_costmap_global_srv = self.create_client(
self.get_costmap_global_srv: Client[
GetCostmap.Request, GetCostmap.Response] = \
self.create_client(
GetCostmap, 'global_costmap/get_costmap'
)
self.get_costmap_local_srv = self.create_client(
self.get_costmap_local_srv: Client[
GetCostmap.Request, GetCostmap.Response] = \
self.create_client(
GetCostmap, 'local_costmap/get_costmap'
)

Expand Down Expand Up @@ -1035,7 +1113,8 @@ def lifecycleStartup(self) -> None:
for srv_name, srv_type in self.get_service_names_and_types():
if srv_type[0] == 'nav2_msgs/srv/ManageLifecycleNodes':
self.info(f'Starting up {srv_name}')
mgr_client = self.create_client(ManageLifecycleNodes, srv_name)
mgr_client: Client[ManageLifecycleNodes.Request, ManageLifecycleNodes.Response] = \
self.create_client(ManageLifecycleNodes, srv_name)
while not mgr_client.wait_for_service(timeout_sec=1.0):
self.info(f'{srv_name} service not available, waiting...')
req = ManageLifecycleNodes.Request()
Expand All @@ -1059,7 +1138,8 @@ def lifecycleShutdown(self) -> None:
for srv_name, srv_type in self.get_service_names_and_types():
if srv_type[0] == 'nav2_msgs/srv/ManageLifecycleNodes':
self.info(f'Shutting down {srv_name}')
mgr_client = self.create_client(ManageLifecycleNodes, srv_name)
mgr_client: Client[ManageLifecycleNodes.Request, ManageLifecycleNodes.Response] = \
self.create_client(ManageLifecycleNodes, srv_name)
while not mgr_client.wait_for_service(timeout_sec=1.0):
self.info(f'{srv_name} service not available, waiting...')
req = ManageLifecycleNodes.Request()
Expand All @@ -1073,7 +1153,8 @@ def _waitForNodeToActivate(self, node_name: str) -> None:
# Waits for the node within the tester namespace to become active
self.debug(f'Waiting for {node_name} to become active..')
node_service = f'{node_name}/get_state'
state_client = self.create_client(GetState, node_service)
state_client: Client[GetState.Request, GetState.Response] = \
self.create_client(GetState, node_service)
while not state_client.wait_for_service(timeout_sec=1.0):
self.info(f'{node_service} service not available, waiting...')

Expand Down
10 changes: 8 additions & 2 deletions nav2_system_tests/src/behaviors/backup/backup_tester.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import rclpy
from rclpy.action import ActionClient # type: ignore[attr-defined]
from rclpy.action.client import ClientGoalHandle
from rclpy.client import Client
from rclpy.duration import Duration
from rclpy.node import Node
from rclpy.qos import QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy
Expand All @@ -40,7 +41,11 @@ def __init__(self) -> None:
history=QoSHistoryPolicy.KEEP_LAST,
depth=1,
)
self.action_client = ActionClient(self, BackUp, 'backup')
self.action_client: ActionClient[
BackUp.Goal,
BackUp.Result,
BackUp.Feedback
] = ActionClient(self, BackUp, 'backup')
self.costmap_pub = self.create_publisher(
Costmap, 'local_costmap/costmap_raw', self.costmap_qos)
self.footprint_pub = self.create_publisher(
Expand Down Expand Up @@ -288,7 +293,8 @@ def shutdown(self) -> None:
self.info_msg('Destroyed backup action client')

transition_service = 'lifecycle_manager_navigation/manage_nodes'
mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
mgr_client: Client[ManageLifecycleNodes.Request, ManageLifecycleNodes.Response] \
= self.create_client(ManageLifecycleNodes, transition_service)
while not mgr_client.wait_for_service(timeout_sec=1.0):
self.info_msg(f'{transition_service} service not available, waiting...')

Expand Down
10 changes: 8 additions & 2 deletions nav2_system_tests/src/behaviors/drive_on_heading/drive_tester.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import rclpy
from rclpy.action import ActionClient # type: ignore[attr-defined]
from rclpy.action.client import ClientGoalHandle
from rclpy.client import Client
from rclpy.duration import Duration
from rclpy.node import Node
from rclpy.qos import QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy
Expand All @@ -40,7 +41,11 @@ def __init__(self) -> None:
history=QoSHistoryPolicy.KEEP_LAST,
depth=1,
)
self.action_client = ActionClient(self, DriveOnHeading, 'drive_on_heading')
self.action_client: ActionClient[
DriveOnHeading.Goal,
DriveOnHeading.Result,
DriveOnHeading.Feedback
] = ActionClient(self, DriveOnHeading, 'drive_on_heading')
self.costmap_pub = self.create_publisher(
Costmap, 'local_costmap/costmap_raw', self.costmap_qos)
self.footprint_pub = self.create_publisher(
Expand Down Expand Up @@ -297,7 +302,8 @@ def shutdown(self) -> None:
self.info_msg('Destroyed DriveOnHeading action client')

transition_service = 'lifecycle_manager_navigation/manage_nodes'
mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
mgr_client: Client[ManageLifecycleNodes.Request, ManageLifecycleNodes.Response] \
= self.create_client(ManageLifecycleNodes, transition_service)
while not mgr_client.wait_for_service(timeout_sec=1.0):
self.info_msg(f'{transition_service} service not available, waiting...')

Expand Down
8 changes: 6 additions & 2 deletions nav2_system_tests/src/behaviors/spin/spin_tester.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import rclpy
from rclpy.action import ActionClient # type: ignore[attr-defined]
from rclpy.action.client import ClientGoalHandle
from rclpy.client import Client
from rclpy.duration import Duration
from rclpy.node import Node
from rclpy.qos import QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy
Expand All @@ -40,7 +41,9 @@ def __init__(self) -> None:
history=QoSHistoryPolicy.KEEP_LAST,
depth=1,
)
self.action_client = ActionClient(self, Spin, 'spin')
self.action_client: ActionClient[Spin.Goal, Spin.Result, Spin.Feedback] \
= ActionClient(self, Spin, 'spin')

self.costmap_pub = self.create_publisher(
Costmap, 'local_costmap/costmap_raw', self.costmap_qos)
self.footprint_pub = self.create_publisher(
Expand Down Expand Up @@ -280,7 +283,8 @@ def shutdown(self) -> None:
self.info_msg('Destroyed backup action client')

transition_service = 'lifecycle_manager_navigation/manage_nodes'
mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
mgr_client: Client[ManageLifecycleNodes.Request, ManageLifecycleNodes.Response] \
= self.create_client(ManageLifecycleNodes, transition_service)
while not mgr_client.wait_for_service(timeout_sec=1.0):
self.info_msg(f'{transition_service} service not available, waiting...')

Expand Down
11 changes: 8 additions & 3 deletions nav2_system_tests/src/costmap_filters/tester_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from nav_msgs.msg import OccupancyGrid, Path
import rclpy
from rclpy.action import ActionClient # type: ignore[attr-defined]
from rclpy.client import Client
from rclpy.node import Node
from rclpy.qos import QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy
from sensor_msgs.msg import PointCloud2
Expand Down Expand Up @@ -151,7 +152,9 @@ def __init__(
self.initial_pose_received = False
self.initial_pose = initial_pose
self.goal_pose = goal_pose
self.action_client = ActionClient(self, NavigateToPose, 'navigate_to_pose')
self.action_client: ActionClient[
NavigateToPose.Goal, NavigateToPose.Result, NavigateToPose.Feedback] = \
ActionClient(self, NavigateToPose, 'navigate_to_pose')

def info_msg(self, msg: str) -> None:
self.get_logger().info('\033[1;37;44m' + msg + '\033[0m')
Expand Down Expand Up @@ -357,7 +360,8 @@ def wait_for_node_active(self, node_name: str) -> None:
# Waits for the node within the tester namespace to become active
self.info_msg(f'Waiting for {node_name} to become active')
node_service = f'{node_name}/get_state'
state_client = self.create_client(GetState, node_service)
state_client: Client[GetState.Request, GetState.Response] \
= self.create_client(GetState, node_service)
while not state_client.wait_for_service(timeout_sec=1.0):
self.info_msg(f'{node_service} service not available, waiting...')
req = GetState.Request() # empty request
Expand All @@ -380,7 +384,8 @@ def shutdown(self) -> None:
self.action_client.destroy()

transition_service = 'lifecycle_manager_navigation/manage_nodes'
mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
mgr_client: Client[ManageLifecycleNodes.Request, ManageLifecycleNodes.Response] \
= self.create_client(ManageLifecycleNodes, transition_service)
while not mgr_client.wait_for_service(timeout_sec=1.0):
self.info_msg(f'{transition_service} service not available, waiting...')

Expand Down
Loading
Loading