Nav2 Navigation Stack - rolling  main
ROS 2 Navigation Stack
tester_node.py
1 #! /usr/bin/env python3
2 # Copyright 2025 Open Navigation LLC
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 
16 import argparse
17 import math
18 import sys
19 import time
20 
21 from action_msgs.msg import GoalStatus
22 from geometry_msgs.msg import Pose, PoseStamped, PoseWithCovarianceStamped
23 from lifecycle_msgs.srv import GetState
24 from nav2_msgs.action import ComputeAndTrackRoute, ComputeRoute
25 from nav2_msgs.srv import ManageLifecycleNodes
26 from nav2_simple_commander.robot_navigator import BasicNavigator
27 import rclpy
28 from rclpy.action import ActionClient
29 from rclpy.client import Client
30 from rclpy.node import Node
31 from rclpy.qos import QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy
32 from std_srvs.srv import Trigger
33 
34 
35 class RouteTester(Node):
36 
37  def __init__(self, initial_pose: Pose, goal_pose: Pose, namespace: str = ''):
38  super().__init__(node_name='nav2_tester', namespace=namespace)
39  self.initial_pose_pubinitial_pose_pub = self.create_publisher(
40  PoseWithCovarianceStamped, 'initialpose', 10
41  )
42 
43  pose_qos = QoSProfile(
44  durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
45  reliability=QoSReliabilityPolicy.RELIABLE,
46  history=QoSHistoryPolicy.KEEP_LAST,
47  depth=1,
48  )
49 
50  self.model_pose_submodel_pose_sub = self.create_subscription(
51  PoseWithCovarianceStamped, 'amcl_pose', self.poseCallbackposeCallback, pose_qos
52  )
53  self.initial_pose_receivedinitial_pose_received = False
54  self.initial_poseinitial_pose = initial_pose
55  self.goal_posegoal_pose = goal_pose
56  self.compute_action_client: ActionClient[
57  ComputeRoute.Goal,
58  ComputeRoute.Result,
59  ComputeRoute.Feedback
60  ] = ActionClient(self, ComputeRoute, 'compute_route')
61  self.compute_track_action_client: ActionClient[
62  ComputeAndTrackRoute.Goal,
63  ComputeAndTrackRoute.Result,
64  ComputeAndTrackRoute.Feedback
65  ] = ActionClient(
66  self, ComputeAndTrackRoute, 'compute_and_track_route')
67  self.feedback_msgsfeedback_msgs: list[ComputeAndTrackRoute.Feedback] = []
68 
69  self.navigatornavigator = BasicNavigator()
70 
71  def runComputeRouteTest(self, use_poses: bool = True) -> bool:
72  # Test 1: See if we can compute a route that is valid and correctly sized
73  self.info_msginfo_msg("Waiting for 'ComputeRoute' action server")
74  while not self.compute_action_client.wait_for_server(timeout_sec=1.0):
75  self.info_msginfo_msg("'ComputeRoute' action server not available, waiting...")
76 
77  route_msg = ComputeRoute.Goal()
78  if use_poses:
79  route_msg.start = self.getStampedPoseMsggetStampedPoseMsg(self.initial_poseinitial_pose)
80  route_msg.goal = self.getStampedPoseMsggetStampedPoseMsg(self.goal_posegoal_pose)
81  route_msg.use_start = True
82  route_msg.use_poses = True
83  else:
84  # Same request, just now the node IDs to test
85  route_msg.start_id = 7
86  route_msg.goal_id = 13
87  route_msg.use_start = False
88  route_msg.use_poses = False
89 
90  self.info_msginfo_msg('Sending ComputeRoute goal request...')
91  send_goal_future = self.compute_action_client.send_goal_async(route_msg)
92 
93  rclpy.spin_until_future_complete(self, send_goal_future)
94  goal_handle = send_goal_future.result()
95 
96  if not goal_handle or not goal_handle.accepted:
97  self.error_msgerror_msg('Goal rejected')
98  return False
99 
100  self.info_msginfo_msg('Goal accepted')
101  get_result_future = goal_handle.get_result_async()
102 
103  self.info_msginfo_msg("Waiting for 'ComputeRoute' action to complete")
104  rclpy.spin_until_future_complete(self, get_result_future)
105  status = get_result_future.result().status # type: ignore[union-attr]
106  result = get_result_future.result().result # type: ignore[union-attr]
107  if status != GoalStatus.STATUS_SUCCEEDED:
108  self.info_msginfo_msg(f'Goal failed with status code: {status}')
109  return False
110 
111  self.info_msginfo_msg('Action completed! Checking validity of results...')
112 
113  # Check result for validity
114  assert (len(result.path.poses) == 80)
115  assert (result.route.route_cost > 6)
116  assert (result.route.route_cost < 7)
117  assert (len(result.route.nodes) == 5)
118  assert (len(result.route.edges) == 4)
119  assert (result.error_code == 0)
120  assert (result.error_msg == '')
121 
122  self.info_msginfo_msg('Goal succeeded!')
123  return True
124 
125  def runComputeRouteSamePoseTest(self) -> bool:
126  # Test 2: try with the same start and goal point edge case
127  self.info_msginfo_msg("Waiting for 'ComputeRoute' action server")
128  while not self.compute_action_client.wait_for_server(timeout_sec=1.0):
129  self.info_msginfo_msg("'ComputeRoute' action server not available, waiting...")
130 
131  route_msg = ComputeRoute.Goal()
132  route_msg.start_id = 7
133  route_msg.goal_id = 7
134  route_msg.use_start = False
135  route_msg.use_poses = False
136 
137  self.info_msginfo_msg('Sending ComputeRoute goal request...')
138  send_goal_future = self.compute_action_client.send_goal_async(route_msg)
139 
140  rclpy.spin_until_future_complete(self, send_goal_future)
141  goal_handle = send_goal_future.result()
142 
143  if not goal_handle or not goal_handle.accepted:
144  self.error_msgerror_msg('Goal rejected')
145  return False
146 
147  self.info_msginfo_msg('Goal accepted')
148  get_result_future = goal_handle.get_result_async()
149 
150  self.info_msginfo_msg("Waiting for 'ComputeRoute' action to complete")
151  rclpy.spin_until_future_complete(self, get_result_future)
152  status = get_result_future.result().status # type: ignore[union-attr]
153  result = get_result_future.result().result # type: ignore[union-attr]
154  if status != GoalStatus.STATUS_SUCCEEDED:
155  self.info_msginfo_msg(f'Goal failed with status code: {status}')
156  return False
157 
158  self.info_msginfo_msg('Action completed! Checking validity of results...')
159 
160  # Check result for validity, should be a 1-node path as its the same
161  assert (len(result.path.poses) == 1)
162  assert (len(result.route.nodes) == 1)
163  assert (len(result.route.edges) == 0)
164  assert (result.error_code == 0)
165  assert (result.error_msg == '')
166 
167  self.info_msginfo_msg('Goal succeeded!')
168  return True
169 
170  def runTrackRouteTest(self) -> bool:
171  # Test 3: See if we can compute and track a route with proper state
172  self.info_msginfo_msg("Waiting for 'ComputeAndTrackRoute' action server")
173  while not self.compute_track_action_client.wait_for_server(timeout_sec=1.0):
174  self.info_msginfo_msg("'ComputeAndTrackRoute' action server not available, waiting...")
175 
176  route_msg = ComputeAndTrackRoute.Goal()
177  route_msg.goal = self.getStampedPoseMsggetStampedPoseMsg(self.goal_posegoal_pose)
178  route_msg.use_start = False # Use TF pose instead
179  route_msg.use_poses = True
180 
181  self.info_msginfo_msg('Sending ComputeAndTrackRoute goal request...')
182  send_goal_future = self.compute_track_action_client.send_goal_async(
183  route_msg, feedback_callback=self.feedback_callbackfeedback_callback)
184 
185  rclpy.spin_until_future_complete(self, send_goal_future)
186  goal_handle = send_goal_future.result()
187 
188  if not goal_handle or not goal_handle.accepted:
189  self.error_msgerror_msg('Goal rejected')
190  return False
191 
192  self.info_msginfo_msg('Goal accepted')
193  get_result_future = goal_handle.get_result_async()
194 
195  # Trigger a reroute
196  time.sleep(1)
197  self.info_msginfo_msg('Triggering a reroute')
198  srv_client: Client[Trigger.Request, Trigger.Response] = \
199  self.create_client(Trigger, 'route_server/ReroutingService/reroute')
200  while not srv_client.wait_for_service(timeout_sec=1.0):
201  self.info_msginfo_msg('Reroute service not available, waiting...')
202  req = Trigger.Request()
203  future = srv_client.call_async(req)
204  rclpy.spin_until_future_complete(self, future)
205  if future.result() is not None:
206  self.info_msginfo_msg('Reroute triggered')
207  else:
208  self.error_msgerror_msg('Reroute failed')
209  return False
210  # Wait a bit for it to compute the route and start tracking (but no movement)
211 
212  # Cancel it after a bit
213  time.sleep(2)
214  cancel_future = goal_handle.cancel_goal_async()
215  rclpy.spin_until_future_complete(self, cancel_future)
216  status = cancel_future.result()
217  if status is not None and len(status.goals_canceling) > 0:
218  self.info_msginfo_msg('Action cancel completed!')
219  else:
220  self.info_msginfo_msg('Goal cancel failed')
221  return False
222 
223  # Send it again
224  self.info_msginfo_msg('Sending ComputeAndTrackRoute goal request...')
225  send_goal_future = self.compute_track_action_client.send_goal_async(
226  route_msg, feedback_callback=self.feedback_callbackfeedback_callback)
227 
228  rclpy.spin_until_future_complete(self, send_goal_future)
229  goal_handle = send_goal_future.result()
230 
231  if not goal_handle or not goal_handle.accepted:
232  self.error_msgerror_msg('Goal rejected')
233  return False
234 
235  self.info_msginfo_msg('Goal accepted')
236  get_result_future = goal_handle.get_result_async()
237 
238  # Wait a bit for it to compute the route and start tracking (but no movement)
239  time.sleep(2)
240 
241  # Preempt with a new request type on the graph
242  route_msg.use_poses = False
243  route_msg.start_id = 7
244  route_msg.goal_id = 13
245  send_goal_future = self.compute_track_action_client.send_goal_async(
246  route_msg, feedback_callback=self.feedback_callbackfeedback_callback)
247 
248  rclpy.spin_until_future_complete(self, send_goal_future)
249  goal_handle = send_goal_future.result()
250 
251  if not goal_handle or not goal_handle.accepted:
252  self.error_msgerror_msg('Goal rejected')
253  return False
254 
255  self.info_msginfo_msg('Goal accepted')
256  get_result_future = goal_handle.get_result_async()
257  self.feedback_msgsfeedback_msgs = []
258 
259  self.info_msginfo_msg("Waiting for 'ComputeAndTrackRoute' action to complete")
260  progressing = True
261  last_feedback_msg = None
262  follow_path_task = None
263  while progressing:
264  rclpy.spin_until_future_complete(self, get_result_future, timeout_sec=0.10)
265  if get_result_future.result() is not None:
266  status = get_result_future.result().status # type: ignore[union-attr]
267  if status == GoalStatus.STATUS_SUCCEEDED:
268  progressing = False
269  elif status == GoalStatus.STATUS_CANCELED or status == GoalStatus.STATUS_ABORTED:
270  self.info_msginfo_msg(f'Goal failed with status code: {status}')
271  return False
272 
273  # Else, processing. Check feedback
274  while len(self.feedback_msgsfeedback_msgs) > 0:
275  feedback_msg = self.feedback_msgsfeedback_msgs.pop(0)
276 
277  # Start following the path
278  if (last_feedback_msg and feedback_msg.path != last_feedback_msg.path):
279  follow_path_task = self.navigatornavigator.followPath(feedback_msg.path)
280 
281  # Check if the feedback is valid, if changed (not for route operations)
282  if last_feedback_msg and \
283  last_feedback_msg.current_edge_id != feedback_msg.current_edge_id and \
284  int(feedback_msg.current_edge_id) != 0:
285  if last_feedback_msg.next_node_id != feedback_msg.last_node_id:
286  self.error_msgerror_msg('Feedback state is not tracking in order!')
287  return False
288 
289  last_feedback_msg = feedback_msg
290 
291  # Validate the state of the final feedback message
292  if last_feedback_msg is None:
293  self.error_msgerror_msg('No feedback message received!')
294  return False
295 
296  if int(last_feedback_msg.next_node_id) != 0:
297  self.error_msgerror_msg('Terminal feedback state of nodes is not correct!')
298  return False
299  if int(last_feedback_msg.current_edge_id) != 0:
300  self.error_msgerror_msg('Terminal feedback state of edges is not correct!')
301  return False
302  if int(last_feedback_msg.route.nodes[-1].nodeid) != 13:
303  self.error_msgerror_msg('Final route node is not correct!')
304  return False
305 
306  while not self.navigatornavigator.isTaskComplete(task=follow_path_task):
307  time.sleep(0.1)
308 
309  self.info_msginfo_msg('Action completed! Checking validity of terminal condition...')
310 
311  # Check result for validity
312  if not self.distanceFromGoaldistanceFromGoal() < 1.0:
313  self.error_msgerror_msg('Did not make it to the goal pose!')
314  return False
315 
316  self.info_msginfo_msg('Goal succeeded!')
317  return True
318 
319  def feedback_callback(self, feedback_msg: ComputeAndTrackRoute.Feedback) -> None:
320  self.feedback_msgsfeedback_msgs.append(feedback_msg.feedback)
321 
322  def distanceFromGoal(self) -> float:
323  d_x = self.current_posecurrent_pose.position.x - self.goal_posegoal_pose.position.x
324  d_y = self.current_posecurrent_pose.position.y - self.goal_posegoal_pose.position.y
325  distance = math.sqrt(d_x * d_x + d_y * d_y)
326  self.info_msginfo_msg(f'Distance from goal is: {distance}')
327  return distance
328 
329  def info_msg(self, msg: str) -> None:
330  self.get_logger().info('\033[1;37;44m' + msg + '\033[0m')
331 
332  def error_msg(self, msg: str) -> None:
333  self.get_logger().error('\033[1;37;41m' + msg + '\033[0m')
334 
335  def setInitialPose(self) -> None:
336  msg = PoseWithCovarianceStamped()
337  msg.pose.pose = self.initial_poseinitial_pose
338  msg.header.frame_id = 'map'
339  self.info_msginfo_msg('Publishing Initial Pose')
340  self.initial_pose_pubinitial_pose_pub.publish(msg)
341  self.currentPosecurrentPose = self.initial_poseinitial_pose
342 
343  def getStampedPoseMsg(self, pose: Pose) -> PoseStamped:
344  msg = PoseStamped()
345  msg.header.frame_id = 'map'
346  msg.pose = pose
347  return msg
348 
349  def poseCallback(self, msg: PoseWithCovarianceStamped) -> None:
350  self.info_msginfo_msg('Received amcl_pose')
351  self.current_posecurrent_pose = msg.pose.pose
352  self.initial_pose_receivedinitial_pose_received = True
353 
354  def wait_for_node_active(self, node_name: str) -> None:
355  # Waits for the node within the tester namespace to become active
356  self.info_msginfo_msg(f'Waiting for {node_name} to become active')
357  node_service = f'{node_name}/get_state'
358  state_client: Client[GetState.Request, GetState.Response] = \
359  self.create_client(GetState, node_service)
360  while not state_client.wait_for_service(timeout_sec=1.0):
361  self.info_msginfo_msg(f'{node_service} service not available, waiting...')
362  req = GetState.Request() # empty request
363  state = 'UNKNOWN'
364  while state != 'active':
365  self.info_msginfo_msg(f'Getting {node_name} state...')
366  future = state_client.call_async(req)
367  rclpy.spin_until_future_complete(self, future)
368  if future.result() is not None:
369  state = future.result().current_state.label # type: ignore[union-attr]
370  self.info_msginfo_msg(f'Result of get_state: {state}')
371  else:
372  self.error_msgerror_msg(
373  f'Exception while calling service: {future.exception()!r}'
374  )
375  time.sleep(5)
376 
377  def shutdown(self) -> None:
378  self.info_msginfo_msg('Shutting down')
379  self.compute_action_client.destroy()
380  self.compute_track_action_client.destroy()
381 
382  transition_service = 'lifecycle_manager_navigation/manage_nodes'
383  mgr_client: Client[ManageLifecycleNodes.Request, ManageLifecycleNodes.Response] = \
384  self.create_client(ManageLifecycleNodes, transition_service)
385  while not mgr_client.wait_for_service(timeout_sec=1.0):
386  self.info_msginfo_msg(f'{transition_service} service not available, waiting...')
387 
388  req = ManageLifecycleNodes.Request()
389  req.command = ManageLifecycleNodes.Request().SHUTDOWN
390  future = mgr_client.call_async(req)
391  try:
392  self.info_msginfo_msg('Shutting down navigation lifecycle manager...')
393  rclpy.spin_until_future_complete(self, future)
394  future.result()
395  self.info_msginfo_msg('Shutting down navigation lifecycle manager complete.')
396  except Exception as e: # noqa: B902
397  self.error_msgerror_msg(f'Service call failed {e!r}')
398  transition_service = 'lifecycle_manager_localization/manage_nodes'
399  mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
400  while not mgr_client.wait_for_service(timeout_sec=1.0):
401  self.info_msginfo_msg(f'{transition_service} service not available, waiting...')
402 
403  req = ManageLifecycleNodes.Request()
404  req.command = ManageLifecycleNodes.Request().SHUTDOWN
405  future = mgr_client.call_async(req)
406  try:
407  self.info_msginfo_msg('Shutting down localization lifecycle manager...')
408  rclpy.spin_until_future_complete(self, future)
409  future.result()
410  self.info_msginfo_msg('Shutting down localization lifecycle manager complete')
411  except Exception as e: # noqa: B902
412  self.error_msgerror_msg(f'Service call failed {e!r}')
413 
414  def wait_for_initial_pose(self) -> None:
415  self.initial_pose_receivedinitial_pose_received = False
416  while not self.initial_pose_receivedinitial_pose_received:
417  self.info_msginfo_msg('Setting initial pose')
418  self.setInitialPosesetInitialPose()
419  self.info_msginfo_msg('Waiting for amcl_pose to be received')
420  rclpy.spin_once(self, timeout_sec=1)
421 
422 
423 def run_all_tests(robot_tester: RouteTester) -> bool:
424  # set transforms to use_sim_time
425  robot_tester.wait_for_node_active('amcl')
426  robot_tester.wait_for_initial_pose()
427  robot_tester.wait_for_node_active('bt_navigator')
428  result_poses = robot_tester.runComputeRouteTest(use_poses=True)
429  result_node_ids = robot_tester.runComputeRouteTest(use_poses=False)
430  result_same = robot_tester.runComputeRouteSamePoseTest()
431  result = result_poses and result_node_ids and result_same and robot_tester.runTrackRouteTest()
432 
433  if result:
434  robot_tester.info_msg('Test PASSED')
435  else:
436  robot_tester.error_msg('Test FAILED')
437  return result
438 
439 
440 def fwd_pose(x: float = 0.0, y: float = 0.0, z: float = 0.01) -> Pose:
441  initial_pose = Pose()
442  initial_pose.position.x = x
443  initial_pose.position.y = y
444  initial_pose.position.z = z
445  initial_pose.orientation.x = 0.0
446  initial_pose.orientation.y = 0.0
447  initial_pose.orientation.z = 0.0
448  initial_pose.orientation.w = 1.0
449  return initial_pose
450 
451 
452 def main(argv: list[str] = sys.argv[1:]): # type: ignore[no-untyped-def]
453  # The robot(s) positions from the input arguments
454  parser = argparse.ArgumentParser(description='Route server tester node')
455  group = parser.add_mutually_exclusive_group(required=True)
456  group.add_argument(
457  '-r',
458  '--robot',
459  action='append',
460  nargs=4,
461  metavar=('init_x', 'init_y', 'final_x', 'final_y'),
462  help='The robot starting and final positions.',
463  )
464  args, unknown = parser.parse_known_args()
465 
466  rclpy.init()
467 
468  # Create test object
469  init_x, init_y, final_x, final_y = args.robot[0]
470  tester = RouteTester(
471  initial_pose=fwd_pose(float(init_x), float(init_y)),
472  goal_pose=fwd_pose(float(final_x), float(final_y)),
473  )
474  tester.info_msg(
475  'Starting tester, robot going from '
476  + init_x
477  + ', '
478  + init_y
479  + ' to '
480  + final_x
481  + ', '
482  + final_y
483  + ' via route server.'
484  )
485 
486  # wait a few seconds to make sure entire stacks are up
487  time.sleep(10)
488 
489  # run tests
490  passed = run_all_tests(tester)
491 
492  tester.shutdown()
493  tester.info_msg('Done Shutting Down.')
494 
495  if not passed:
496  tester.info_msg('Exiting failed')
497  exit(1)
498  else:
499  tester.info_msg('Exiting passed')
500  exit(0)
501 
502 
503 if __name__ == '__main__':
504  main()
None feedback_callback(self, ComputeAndTrackRoute.Feedback feedback_msg)
Definition: tester_node.py:319
None info_msg(self, str msg)
Definition: tester_node.py:329
None poseCallback(self, PoseWithCovarianceStamped msg)
Definition: tester_node.py:349
None error_msg(self, str msg)
Definition: tester_node.py:332
PoseStamped getStampedPoseMsg(self, Pose pose)
Definition: tester_node.py:343
float distanceFromGoal(self)
Definition: tester_node.py:322