Nav2 Navigation Stack - rolling  main
ROS 2 Navigation Stack
tester_node.py
1 #! /usr/bin/env python3
2 # Copyright 2025 Open Navigation LLC
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 
16 import argparse
17 import math
18 import sys
19 import time
20 
21 from action_msgs.msg import GoalStatus
22 from geometry_msgs.msg import Pose, PoseStamped, PoseWithCovarianceStamped
23 from lifecycle_msgs.srv import GetState
24 from nav2_msgs.action import ComputeAndTrackRoute, ComputeRoute
25 from nav2_msgs.srv import ManageLifecycleNodes
26 from nav2_simple_commander.robot_navigator import BasicNavigator
27 import rclpy
28 from rclpy.action import ActionClient
29 from rclpy.client import Client
30 from rclpy.node import Node
31 from rclpy.qos import QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy
32 from std_srvs.srv import Trigger
33 
34 
35 class RouteTester(Node):
36 
37  def __init__(self, initial_pose: Pose, goal_pose: Pose, namespace: str = ''):
38  super().__init__(node_name='nav2_tester', namespace=namespace)
39  self.initial_pose_pubinitial_pose_pub = self.create_publisher(
40  PoseWithCovarianceStamped, 'initialpose', 10
41  )
42 
43  pose_qos = QoSProfile(
44  durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
45  reliability=QoSReliabilityPolicy.RELIABLE,
46  history=QoSHistoryPolicy.KEEP_LAST,
47  depth=1,
48  )
49 
50  self.model_pose_submodel_pose_sub = self.create_subscription(
51  PoseWithCovarianceStamped, 'amcl_pose', self.poseCallbackposeCallback, pose_qos
52  )
53  self.initial_pose_receivedinitial_pose_received = False
54  self.initial_poseinitial_pose = initial_pose
55  self.goal_posegoal_pose = goal_pose
56  self.compute_action_client: ActionClient[
57  ComputeRoute.Goal,
58  ComputeRoute.Result,
59  ComputeRoute.Feedback
60  ] = ActionClient(self, ComputeRoute, 'compute_route')
61  self.compute_track_action_client: ActionClient[
62  ComputeAndTrackRoute.Goal,
63  ComputeAndTrackRoute.Result,
64  ComputeAndTrackRoute.Feedback
65  ] = ActionClient(
66  self, ComputeAndTrackRoute, 'compute_and_track_route')
67  self.feedback_msgsfeedback_msgs: list[ComputeAndTrackRoute.Feedback] = []
68 
69  self.navigatornavigator = BasicNavigator()
70 
71  def runComputeRouteTest(self, use_poses: bool = True) -> bool:
72  # Test 1: See if we can compute a route that is valid and correctly sized
73  self.info_msginfo_msg("Waiting for 'ComputeRoute' action server")
74  while not self.compute_action_client.wait_for_server(timeout_sec=1.0):
75  self.info_msginfo_msg("'ComputeRoute' action server not available, waiting...")
76 
77  route_msg = ComputeRoute.Goal()
78  if use_poses:
79  route_msg.start = self.getStampedPoseMsggetStampedPoseMsg(self.initial_poseinitial_pose)
80  route_msg.goal = self.getStampedPoseMsggetStampedPoseMsg(self.goal_posegoal_pose)
81  route_msg.use_start = True
82  route_msg.use_poses = True
83  else:
84  # Same request, just now the node IDs to test
85  route_msg.start_id = 7
86  route_msg.goal_id = 13
87  route_msg.use_start = False
88  route_msg.use_poses = False
89 
90  self.info_msginfo_msg('Sending ComputeRoute goal request...')
91  send_goal_future = self.compute_action_client.send_goal_async(route_msg)
92 
93  rclpy.spin_until_future_complete(self, send_goal_future)
94  goal_handle = send_goal_future.result()
95 
96  if not goal_handle or not goal_handle.accepted:
97  self.error_msgerror_msg('Goal rejected')
98  return False
99 
100  self.info_msginfo_msg('Goal accepted')
101  get_result_future = goal_handle.get_result_async()
102 
103  self.info_msginfo_msg("Waiting for 'ComputeRoute' action to complete")
104  rclpy.spin_until_future_complete(self, get_result_future)
105  status = get_result_future.result().status # type: ignore[union-attr]
106  result = get_result_future.result().result # type: ignore[union-attr]
107  if status != GoalStatus.STATUS_SUCCEEDED:
108  self.info_msginfo_msg(f'Goal failed with status code: {status}')
109  return False
110 
111  self.info_msginfo_msg('Action completed! Checking validity of results...')
112 
113  # Check result for validity
114  assert (len(result.path.poses) == 80)
115  assert (result.route.route_cost > 6)
116  assert (result.route.route_cost < 7)
117  assert (len(result.route.nodes) == 5)
118  assert (len(result.route.edges) == 4)
119  assert (result.error_code == 0)
120  assert (result.error_msg == '')
121 
122  self.info_msginfo_msg('Goal succeeded!')
123  return True
124 
125  def runComputeRouteSamePoseTest(self) -> bool:
126  # Test 2: try with the same start and goal point edge case
127  self.info_msginfo_msg("Waiting for 'ComputeRoute' action server")
128  while not self.compute_action_client.wait_for_server(timeout_sec=1.0):
129  self.info_msginfo_msg("'ComputeRoute' action server not available, waiting...")
130 
131  route_msg = ComputeRoute.Goal()
132  route_msg.start_id = 7
133  route_msg.goal_id = 7
134  route_msg.use_start = False
135  route_msg.use_poses = False
136 
137  self.info_msginfo_msg('Sending ComputeRoute goal request...')
138  send_goal_future = self.compute_action_client.send_goal_async(route_msg)
139 
140  rclpy.spin_until_future_complete(self, send_goal_future)
141  goal_handle = send_goal_future.result()
142 
143  if not goal_handle or not goal_handle.accepted:
144  self.error_msgerror_msg('Goal rejected')
145  return False
146 
147  self.info_msginfo_msg('Goal accepted')
148  get_result_future = goal_handle.get_result_async()
149 
150  self.info_msginfo_msg("Waiting for 'ComputeRoute' action to complete")
151  rclpy.spin_until_future_complete(self, get_result_future)
152  status = get_result_future.result().status # type: ignore[union-attr]
153  result = get_result_future.result().result # type: ignore[union-attr]
154  if status != GoalStatus.STATUS_SUCCEEDED:
155  self.info_msginfo_msg(f'Goal failed with status code: {status}')
156  return False
157 
158  self.info_msginfo_msg('Action completed! Checking validity of results...')
159 
160  # Check result for validity, should be a 1-node path as its the same
161  assert (len(result.path.poses) == 1)
162  assert (len(result.route.nodes) == 1)
163  assert (len(result.route.edges) == 0)
164  assert (result.error_code == 0)
165  assert (result.error_msg == '')
166 
167  self.info_msginfo_msg('Goal succeeded!')
168  return True
169 
170  def runTrackRouteTest(self) -> bool:
171  # Test 3: See if we can compute and track a route with proper state
172  self.info_msginfo_msg("Waiting for 'ComputeAndTrackRoute' action server")
173  while not self.compute_track_action_client.wait_for_server(timeout_sec=1.0):
174  self.info_msginfo_msg("'ComputeAndTrackRoute' action server not available, waiting...")
175 
176  route_msg = ComputeAndTrackRoute.Goal()
177  route_msg.goal = self.getStampedPoseMsggetStampedPoseMsg(self.goal_posegoal_pose)
178  route_msg.use_start = False # Use TF pose instead
179  route_msg.use_poses = True
180 
181  self.info_msginfo_msg('Sending ComputeAndTrackRoute goal request...')
182  send_goal_future = self.compute_track_action_client.send_goal_async(
183  route_msg, feedback_callback=self.feedback_callbackfeedback_callback)
184 
185  rclpy.spin_until_future_complete(self, send_goal_future)
186  goal_handle = send_goal_future.result()
187 
188  if not goal_handle or not goal_handle.accepted:
189  self.error_msgerror_msg('Goal rejected')
190  return False
191 
192  self.info_msginfo_msg('Goal accepted')
193  get_result_future = goal_handle.get_result_async()
194 
195  # Trigger a reroute
196  time.sleep(1)
197  self.info_msginfo_msg('Triggering a reroute')
198  srv_client: Client[Trigger.Request, Trigger.Response] = \
199  self.create_client(Trigger, 'route_server/ReroutingService/reroute')
200  while not srv_client.wait_for_service(timeout_sec=1.0):
201  self.info_msginfo_msg('Reroute service not available, waiting...')
202  req = Trigger.Request()
203  future = srv_client.call_async(req)
204  rclpy.spin_until_future_complete(self, future)
205  if future.result() is not None:
206  self.info_msginfo_msg('Reroute triggered')
207  else:
208  self.error_msgerror_msg('Reroute failed')
209  return False
210  # Wait a bit for it to compute the route and start tracking (but no movement)
211 
212  # Cancel it after a bit
213  time.sleep(2)
214  cancel_future = goal_handle.cancel_goal_async()
215  rclpy.spin_until_future_complete(self, cancel_future)
216  status = cancel_future.result()
217  if status is not None and len(status.goals_canceling) > 0:
218  self.info_msginfo_msg('Action cancel completed!')
219  else:
220  self.info_msginfo_msg('Goal cancel failed')
221  return False
222 
223  # Send it again
224  self.info_msginfo_msg('Sending ComputeAndTrackRoute goal request...')
225  send_goal_future = self.compute_track_action_client.send_goal_async(
226  route_msg, feedback_callback=self.feedback_callbackfeedback_callback)
227 
228  rclpy.spin_until_future_complete(self, send_goal_future)
229  goal_handle = send_goal_future.result()
230 
231  if not goal_handle or not goal_handle.accepted:
232  self.error_msgerror_msg('Goal rejected')
233  return False
234 
235  self.info_msginfo_msg('Goal accepted')
236  get_result_future = goal_handle.get_result_async()
237 
238  # Wait a bit for it to compute the route and start tracking (but no movement)
239  time.sleep(2)
240 
241  # Preempt with a new request type on the graph
242  route_msg.use_poses = False
243  route_msg.start_id = 7
244  route_msg.goal_id = 13
245  send_goal_future = self.compute_track_action_client.send_goal_async(
246  route_msg, feedback_callback=self.feedback_callbackfeedback_callback)
247 
248  rclpy.spin_until_future_complete(self, send_goal_future)
249  goal_handle = send_goal_future.result()
250 
251  if not goal_handle or not goal_handle.accepted:
252  self.error_msgerror_msg('Goal rejected')
253  return False
254 
255  self.info_msginfo_msg('Goal accepted')
256  get_result_future = goal_handle.get_result_async()
257  self.feedback_msgsfeedback_msgs = []
258 
259  self.info_msginfo_msg("Waiting for 'ComputeAndTrackRoute' action to complete")
260  progressing = True
261  last_feedback_msg = None
262  follow_path_task = None
263  while progressing:
264  rclpy.spin_until_future_complete(self, get_result_future, timeout_sec=0.10)
265  if get_result_future.result() is not None:
266  status = get_result_future.result().status # type: ignore[union-attr]
267  if status == GoalStatus.STATUS_SUCCEEDED:
268  progressing = False
269  elif status == GoalStatus.STATUS_CANCELED or status == GoalStatus.STATUS_ABORTED:
270  self.info_msginfo_msg(f'Goal failed with status code: {status}')
271  return False
272 
273  # Else, processing. Check feedback
274  while len(self.feedback_msgsfeedback_msgs) > 0:
275  feedback_msg = self.feedback_msgsfeedback_msgs.pop(0)
276 
277  # Start following the path
278  if (last_feedback_msg and feedback_msg.path != last_feedback_msg.path):
279  follow_path_task = self.navigatornavigator.followPath(feedback_msg.path)
280 
281  # Check if the feedback is valid, if changed (not for route operations)
282  if last_feedback_msg and \
283  last_feedback_msg.current_edge_id != feedback_msg.current_edge_id and \
284  int(feedback_msg.current_edge_id) != 0:
285  if last_feedback_msg.next_node_id != feedback_msg.last_node_id:
286  self.error_msgerror_msg('Feedback state is not tracking in order!')
287  return False
288 
289  last_feedback_msg = feedback_msg
290 
291  # Validate the state of the final feedback message
292  if last_feedback_msg is None:
293  self.error_msgerror_msg('No feedback message received!')
294  return False
295 
296  if int(last_feedback_msg.next_node_id) != 0:
297  self.error_msgerror_msg('Terminal feedback state of nodes is not correct!')
298  return False
299  if int(last_feedback_msg.current_edge_id) != 0:
300  self.error_msgerror_msg('Terminal feedback state of edges is not correct!')
301  return False
302  if int(last_feedback_msg.route.nodes[-1].nodeid) != 13:
303  self.error_msgerror_msg('Final route node is not correct!')
304  return False
305 
306  while not self.navigatornavigator.isTaskComplete(task=follow_path_task):
307  time.sleep(0.1)
308 
309  self.info_msginfo_msg('Action completed! Checking validity of terminal condition...')
310 
311  # Check result for validity
312  if not self.distanceFromGoaldistanceFromGoal() < 1.0:
313  self.error_msgerror_msg('Did not make it to the goal pose!')
314  return False
315 
316  self.info_msginfo_msg('Goal succeeded!')
317  return True
318 
319  def feedback_callback(
320  self, feedback_msg: ComputeAndTrackRoute.Impl.FeedbackMessage) -> None:
321  self.feedback_msgsfeedback_msgs.append(feedback_msg.feedback)
322 
323  def distanceFromGoal(self) -> float:
324  d_x = self.current_posecurrent_pose.position.x - self.goal_posegoal_pose.position.x
325  d_y = self.current_posecurrent_pose.position.y - self.goal_posegoal_pose.position.y
326  distance = math.sqrt(d_x * d_x + d_y * d_y)
327  self.info_msginfo_msg(f'Distance from goal is: {distance}')
328  return distance
329 
330  def info_msg(self, msg: str) -> None:
331  self.get_logger().info('\033[1;37;44m' + msg + '\033[0m')
332 
333  def error_msg(self, msg: str) -> None:
334  self.get_logger().error('\033[1;37;41m' + msg + '\033[0m')
335 
336  def setInitialPose(self) -> None:
337  msg = PoseWithCovarianceStamped()
338  msg.pose.pose = self.initial_poseinitial_pose
339  msg.header.frame_id = 'map'
340  self.info_msginfo_msg('Publishing Initial Pose')
341  self.initial_pose_pubinitial_pose_pub.publish(msg)
342  self.currentPosecurrentPose = self.initial_poseinitial_pose
343 
344  def getStampedPoseMsg(self, pose: Pose) -> PoseStamped:
345  msg = PoseStamped()
346  msg.header.frame_id = 'map'
347  msg.pose = pose
348  return msg
349 
350  def poseCallback(self, msg: PoseWithCovarianceStamped) -> None:
351  self.info_msginfo_msg('Received amcl_pose')
352  self.current_posecurrent_pose = msg.pose.pose
353  self.initial_pose_receivedinitial_pose_received = True
354 
355  def wait_for_node_active(self, node_name: str) -> None:
356  # Waits for the node within the tester namespace to become active
357  self.info_msginfo_msg(f'Waiting for {node_name} to become active')
358  node_service = f'{node_name}/get_state'
359  state_client: Client[GetState.Request, GetState.Response] = \
360  self.create_client(GetState, node_service)
361  while not state_client.wait_for_service(timeout_sec=1.0):
362  self.info_msginfo_msg(f'{node_service} service not available, waiting...')
363  req = GetState.Request() # empty request
364  state = 'UNKNOWN'
365  while state != 'active':
366  self.info_msginfo_msg(f'Getting {node_name} state...')
367  future = state_client.call_async(req)
368  rclpy.spin_until_future_complete(self, future)
369  if future.result() is not None:
370  state = future.result().current_state.label # type: ignore[union-attr]
371  self.info_msginfo_msg(f'Result of get_state: {state}')
372  else:
373  self.error_msgerror_msg(
374  f'Exception while calling service: {future.exception()!r}'
375  )
376  time.sleep(5)
377 
378  def shutdown(self) -> None:
379  self.info_msginfo_msg('Shutting down')
380  self.compute_action_client.destroy()
381  self.compute_track_action_client.destroy()
382 
383  transition_service = 'lifecycle_manager_navigation/manage_nodes'
384  mgr_client: Client[ManageLifecycleNodes.Request, ManageLifecycleNodes.Response] = \
385  self.create_client(ManageLifecycleNodes, transition_service)
386  while not mgr_client.wait_for_service(timeout_sec=1.0):
387  self.info_msginfo_msg(f'{transition_service} service not available, waiting...')
388 
389  req = ManageLifecycleNodes.Request()
390  req.command = ManageLifecycleNodes.Request.SHUTDOWN
391  future = mgr_client.call_async(req)
392  try:
393  self.info_msginfo_msg('Shutting down navigation lifecycle manager...')
394  rclpy.spin_until_future_complete(self, future)
395  future.result()
396  self.info_msginfo_msg('Shutting down navigation lifecycle manager complete.')
397  except Exception as e: # noqa: B902
398  self.error_msgerror_msg(f'Service call failed {e!r}')
399  transition_service = 'lifecycle_manager_localization/manage_nodes'
400  mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
401  while not mgr_client.wait_for_service(timeout_sec=1.0):
402  self.info_msginfo_msg(f'{transition_service} service not available, waiting...')
403 
404  req = ManageLifecycleNodes.Request()
405  req.command = ManageLifecycleNodes.Request.SHUTDOWN
406  future = mgr_client.call_async(req)
407  try:
408  self.info_msginfo_msg('Shutting down localization lifecycle manager...')
409  rclpy.spin_until_future_complete(self, future)
410  future.result()
411  self.info_msginfo_msg('Shutting down localization lifecycle manager complete')
412  except Exception as e: # noqa: B902
413  self.error_msgerror_msg(f'Service call failed {e!r}')
414 
415  def wait_for_initial_pose(self) -> None:
416  self.initial_pose_receivedinitial_pose_received = False
417  while not self.initial_pose_receivedinitial_pose_received:
418  self.info_msginfo_msg('Setting initial pose')
419  self.setInitialPosesetInitialPose()
420  self.info_msginfo_msg('Waiting for amcl_pose to be received')
421  rclpy.spin_once(self, timeout_sec=1)
422 
423 
424 def run_all_tests(robot_tester: RouteTester) -> bool:
425  # set transforms to use_sim_time
426  robot_tester.wait_for_node_active('amcl')
427  robot_tester.wait_for_initial_pose()
428  robot_tester.wait_for_node_active('bt_navigator')
429  result_poses = robot_tester.runComputeRouteTest(use_poses=True)
430  result_node_ids = robot_tester.runComputeRouteTest(use_poses=False)
431  result_same = robot_tester.runComputeRouteSamePoseTest()
432  result = result_poses and result_node_ids and result_same and robot_tester.runTrackRouteTest()
433 
434  if result:
435  robot_tester.info_msg('Test PASSED')
436  else:
437  robot_tester.error_msg('Test FAILED')
438  return result
439 
440 
441 def fwd_pose(x: float = 0.0, y: float = 0.0, z: float = 0.01) -> Pose:
442  initial_pose = Pose()
443  initial_pose.position.x = x
444  initial_pose.position.y = y
445  initial_pose.position.z = z
446  initial_pose.orientation.x = 0.0
447  initial_pose.orientation.y = 0.0
448  initial_pose.orientation.z = 0.0
449  initial_pose.orientation.w = 1.0
450  return initial_pose
451 
452 
453 def main(argv: list[str] = sys.argv[1:]): # type: ignore[no-untyped-def]
454  # The robot(s) positions from the input arguments
455  parser = argparse.ArgumentParser(description='Route server tester node')
456  group = parser.add_mutually_exclusive_group(required=True)
457  group.add_argument(
458  '-r',
459  '--robot',
460  action='append',
461  nargs=4,
462  metavar=('init_x', 'init_y', 'final_x', 'final_y'),
463  help='The robot starting and final positions.',
464  )
465  args, unknown = parser.parse_known_args()
466 
467  rclpy.init()
468 
469  # Create test object
470  init_x, init_y, final_x, final_y = args.robot[0]
471  tester = RouteTester(
472  initial_pose=fwd_pose(float(init_x), float(init_y)),
473  goal_pose=fwd_pose(float(final_x), float(final_y)),
474  )
475  tester.info_msg(
476  'Starting tester, robot going from '
477  + init_x
478  + ', '
479  + init_y
480  + ' to '
481  + final_x
482  + ', '
483  + final_y
484  + ' via route server.'
485  )
486 
487  # wait a few seconds to make sure entire stacks are up
488  time.sleep(10)
489 
490  # run tests
491  passed = run_all_tests(tester)
492 
493  tester.shutdown()
494  tester.info_msg('Done Shutting Down.')
495 
496  if not passed:
497  tester.info_msg('Exiting failed')
498  exit(1)
499  else:
500  tester.info_msg('Exiting passed')
501  exit(0)
502 
503 
504 if __name__ == '__main__':
505  main()
None info_msg(self, str msg)
Definition: tester_node.py:330
None poseCallback(self, PoseWithCovarianceStamped msg)
Definition: tester_node.py:350
None error_msg(self, str msg)
Definition: tester_node.py:333
None feedback_callback(self, ComputeAndTrackRoute.Impl.FeedbackMessage feedback_msg)
Definition: tester_node.py:320
PoseStamped getStampedPoseMsg(self, Pose pose)
Definition: tester_node.py:344
float distanceFromGoal(self)
Definition: tester_node.py:323