Nav2 Navigation Stack - kilted  kilted
ROS 2 Navigation Stack
tester_node.py
1 #! /usr/bin/env python3
2 # Copyright 2025 Open Navigation LLC
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 
16 import argparse
17 import math
18 import sys
19 import time
20 
21 from action_msgs.msg import GoalStatus
22 from geometry_msgs.msg import Pose, PoseStamped, PoseWithCovarianceStamped
23 from lifecycle_msgs.srv import GetState
24 from nav2_msgs.action import ComputeAndTrackRoute, ComputeRoute
25 from nav2_msgs.srv import ManageLifecycleNodes
26 from nav2_simple_commander.robot_navigator import BasicNavigator
27 import rclpy
28 from rclpy.action import ActionClient # type: ignore[attr-defined]
29 from rclpy.node import Node
30 from rclpy.qos import QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy
31 from std_srvs.srv import Trigger
32 
33 
34 class RouteTester(Node):
35 
36  def __init__(self, initial_pose: Pose, goal_pose: Pose, namespace: str = ''):
37  super().__init__(node_name='nav2_tester', namespace=namespace)
38  self.initial_pose_pubinitial_pose_pub = self.create_publisher(
39  PoseWithCovarianceStamped, 'initialpose', 10
40  )
41 
42  pose_qos = QoSProfile(
43  durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
44  reliability=QoSReliabilityPolicy.RELIABLE,
45  history=QoSHistoryPolicy.KEEP_LAST,
46  depth=1,
47  )
48 
49  self.model_pose_submodel_pose_sub = self.create_subscription(
50  PoseWithCovarianceStamped, 'amcl_pose', self.poseCallbackposeCallback, pose_qos
51  )
52  self.initial_pose_receivedinitial_pose_received = False
53  self.initial_poseinitial_pose = initial_pose
54  self.goal_posegoal_pose = goal_pose
55  self.compute_action_clientcompute_action_client = ActionClient(self, ComputeRoute, 'compute_route')
56  self.compute_track_action_clientcompute_track_action_client = ActionClient(
57  self, ComputeAndTrackRoute, 'compute_and_track_route')
58  self.feedback_msgsfeedback_msgs: list[ComputeAndTrackRoute.Feedback] = []
59 
60  self.navigatornavigator = BasicNavigator()
61 
62  def runComputeRouteTest(self, use_poses: bool = True) -> bool:
63  # Test 1: See if we can compute a route that is valid and correctly sized
64  self.info_msginfo_msg("Waiting for 'ComputeRoute' action server")
65  while not self.compute_action_clientcompute_action_client.wait_for_server(timeout_sec=1.0):
66  self.info_msginfo_msg("'ComputeRoute' action server not available, waiting...")
67 
68  route_msg = ComputeRoute.Goal()
69  if use_poses:
70  route_msg.start = self.getStampedPoseMsggetStampedPoseMsg(self.initial_poseinitial_pose)
71  route_msg.goal = self.getStampedPoseMsggetStampedPoseMsg(self.goal_posegoal_pose)
72  route_msg.use_start = True
73  route_msg.use_poses = True
74  else:
75  # Same request, just now the node IDs to test
76  route_msg.start_id = 7
77  route_msg.goal_id = 13
78  route_msg.use_start = False
79  route_msg.use_poses = False
80 
81  self.info_msginfo_msg('Sending ComputeRoute goal request...')
82  send_goal_future = self.compute_action_clientcompute_action_client.send_goal_async(route_msg)
83 
84  rclpy.spin_until_future_complete(self, send_goal_future)
85  goal_handle = send_goal_future.result()
86 
87  if not goal_handle or not goal_handle.accepted:
88  self.error_msgerror_msg('Goal rejected')
89  return False
90 
91  self.info_msginfo_msg('Goal accepted')
92  get_result_future = goal_handle.get_result_async()
93 
94  self.info_msginfo_msg("Waiting for 'ComputeRoute' action to complete")
95  rclpy.spin_until_future_complete(self, get_result_future)
96  status = get_result_future.result().status # type: ignore[union-attr]
97  result = get_result_future.result().result # type: ignore[union-attr]
98  if status != GoalStatus.STATUS_SUCCEEDED:
99  self.info_msginfo_msg(f'Goal failed with status code: {status}')
100  return False
101 
102  self.info_msginfo_msg('Action completed! Checking validity of results...')
103 
104  # Check result for validity
105  assert (len(result.path.poses) == 80)
106  assert (result.route.route_cost > 6)
107  assert (result.route.route_cost < 7)
108  assert (len(result.route.nodes) == 5)
109  assert (len(result.route.edges) == 4)
110  assert (result.error_code == 0)
111  assert (result.error_msg == '')
112 
113  self.info_msginfo_msg('Goal succeeded!')
114  return True
115 
116  def runComputeRouteSamePoseTest(self) -> bool:
117  # Test 2: try with the same start and goal point edge case
118  self.info_msginfo_msg("Waiting for 'ComputeRoute' action server")
119  while not self.compute_action_clientcompute_action_client.wait_for_server(timeout_sec=1.0):
120  self.info_msginfo_msg("'ComputeRoute' action server not available, waiting...")
121 
122  route_msg = ComputeRoute.Goal()
123  route_msg.start_id = 7
124  route_msg.goal_id = 7
125  route_msg.use_start = False
126  route_msg.use_poses = False
127 
128  self.info_msginfo_msg('Sending ComputeRoute goal request...')
129  send_goal_future = self.compute_action_clientcompute_action_client.send_goal_async(route_msg)
130 
131  rclpy.spin_until_future_complete(self, send_goal_future)
132  goal_handle = send_goal_future.result()
133 
134  if not goal_handle or not goal_handle.accepted:
135  self.error_msgerror_msg('Goal rejected')
136  return False
137 
138  self.info_msginfo_msg('Goal accepted')
139  get_result_future = goal_handle.get_result_async()
140 
141  self.info_msginfo_msg("Waiting for 'ComputeRoute' action to complete")
142  rclpy.spin_until_future_complete(self, get_result_future)
143  status = get_result_future.result().status # type: ignore[union-attr]
144  result = get_result_future.result().result # type: ignore[union-attr]
145  if status != GoalStatus.STATUS_SUCCEEDED:
146  self.info_msginfo_msg(f'Goal failed with status code: {status}')
147  return False
148 
149  self.info_msginfo_msg('Action completed! Checking validity of results...')
150 
151  # Check result for validity, should be a 1-node path as its the same
152  assert (len(result.path.poses) == 1)
153  assert (len(result.route.nodes) == 1)
154  assert (len(result.route.edges) == 0)
155  assert (result.error_code == 0)
156  assert (result.error_msg == '')
157 
158  self.info_msginfo_msg('Goal succeeded!')
159  return True
160 
161  def runTrackRouteTest(self) -> bool:
162  # Test 3: See if we can compute and track a route with proper state
163  self.info_msginfo_msg("Waiting for 'ComputeAndTrackRoute' action server")
164  while not self.compute_track_action_clientcompute_track_action_client.wait_for_server(timeout_sec=1.0):
165  self.info_msginfo_msg("'ComputeAndTrackRoute' action server not available, waiting...")
166 
167  route_msg = ComputeAndTrackRoute.Goal()
168  route_msg.goal = self.getStampedPoseMsggetStampedPoseMsg(self.goal_posegoal_pose)
169  route_msg.use_start = False # Use TF pose instead
170  route_msg.use_poses = True
171 
172  self.info_msginfo_msg('Sending ComputeAndTrackRoute goal request...')
173  send_goal_future = self.compute_track_action_clientcompute_track_action_client.send_goal_async(
174  route_msg, feedback_callback=self.feedback_callbackfeedback_callback)
175 
176  rclpy.spin_until_future_complete(self, send_goal_future)
177  goal_handle = send_goal_future.result()
178 
179  if not goal_handle or not goal_handle.accepted:
180  self.error_msgerror_msg('Goal rejected')
181  return False
182 
183  self.info_msginfo_msg('Goal accepted')
184  get_result_future = goal_handle.get_result_async()
185 
186  # Trigger a reroute
187  time.sleep(1)
188  self.info_msginfo_msg('Triggering a reroute')
189  srv_client = self.create_client(Trigger, 'route_server/ReroutingService/reroute')
190  while not srv_client.wait_for_service(timeout_sec=1.0):
191  self.info_msginfo_msg('Reroute service not available, waiting...')
192  req = Trigger.Request()
193  future = srv_client.call_async(req)
194  rclpy.spin_until_future_complete(self, future)
195  if future.result() is not None:
196  self.info_msginfo_msg('Reroute triggered')
197  else:
198  self.error_msgerror_msg('Reroute failed')
199  return False
200  # Wait a bit for it to compute the route and start tracking (but no movement)
201 
202  # Cancel it after a bit
203  time.sleep(2)
204  cancel_future = goal_handle.cancel_goal_async()
205  rclpy.spin_until_future_complete(self, cancel_future)
206  status = cancel_future.result()
207  if status is not None and len(status.goals_canceling) > 0:
208  self.info_msginfo_msg('Action cancel completed!')
209  else:
210  self.info_msginfo_msg('Goal cancel failed')
211  return False
212 
213  # Send it again
214  self.info_msginfo_msg('Sending ComputeAndTrackRoute goal request...')
215  send_goal_future = self.compute_track_action_clientcompute_track_action_client.send_goal_async(
216  route_msg, feedback_callback=self.feedback_callbackfeedback_callback)
217 
218  rclpy.spin_until_future_complete(self, send_goal_future)
219  goal_handle = send_goal_future.result()
220 
221  if not goal_handle or not goal_handle.accepted:
222  self.error_msgerror_msg('Goal rejected')
223  return False
224 
225  self.info_msginfo_msg('Goal accepted')
226  get_result_future = goal_handle.get_result_async()
227 
228  # Wait a bit for it to compute the route and start tracking (but no movement)
229  time.sleep(2)
230 
231  # Preempt with a new request type on the graph
232  route_msg.use_poses = False
233  route_msg.start_id = 7
234  route_msg.goal_id = 13
235  send_goal_future = self.compute_track_action_clientcompute_track_action_client.send_goal_async(
236  route_msg, feedback_callback=self.feedback_callbackfeedback_callback)
237 
238  rclpy.spin_until_future_complete(self, send_goal_future)
239  goal_handle = send_goal_future.result()
240 
241  if not goal_handle or not goal_handle.accepted:
242  self.error_msgerror_msg('Goal rejected')
243  return False
244 
245  self.info_msginfo_msg('Goal accepted')
246  get_result_future = goal_handle.get_result_async()
247  self.feedback_msgsfeedback_msgs = []
248 
249  self.info_msginfo_msg("Waiting for 'ComputeAndTrackRoute' action to complete")
250  progressing = True
251  last_feedback_msg = None
252  follow_path_task = None
253  while progressing:
254  rclpy.spin_until_future_complete(self, get_result_future, timeout_sec=0.10)
255  if get_result_future.result() is not None:
256  status = get_result_future.result().status # type: ignore[union-attr]
257  if status == GoalStatus.STATUS_SUCCEEDED:
258  progressing = False
259  elif status == GoalStatus.STATUS_CANCELED or status == GoalStatus.STATUS_ABORTED:
260  self.info_msginfo_msg(f'Goal failed with status code: {status}')
261  return False
262 
263  # Else, processing. Check feedback
264  while len(self.feedback_msgsfeedback_msgs) > 0:
265  feedback_msg = self.feedback_msgsfeedback_msgs.pop(0)
266 
267  # Start following the path
268  if (last_feedback_msg and feedback_msg.path != last_feedback_msg.path):
269  follow_path_task = self.navigatornavigator.followPath(feedback_msg.path)
270 
271  # Check if the feedback is valid, if changed (not for route operations)
272  if last_feedback_msg and \
273  last_feedback_msg.current_edge_id != feedback_msg.current_edge_id and \
274  int(feedback_msg.current_edge_id) != 0:
275  if last_feedback_msg.next_node_id != feedback_msg.last_node_id:
276  self.error_msgerror_msg('Feedback state is not tracking in order!')
277  return False
278 
279  last_feedback_msg = feedback_msg
280 
281  # Validate the state of the final feedback message
282  if last_feedback_msg is None:
283  self.error_msgerror_msg('No feedback message received!')
284  return False
285 
286  if int(last_feedback_msg.next_node_id) != 0:
287  self.error_msgerror_msg('Terminal feedback state of nodes is not correct!')
288  return False
289  if int(last_feedback_msg.current_edge_id) != 0:
290  self.error_msgerror_msg('Terminal feedback state of edges is not correct!')
291  return False
292  if int(last_feedback_msg.route.nodes[-1].nodeid) != 13:
293  self.error_msgerror_msg('Final route node is not correct!')
294  return False
295 
296  while not self.navigatornavigator.isTaskComplete(task=follow_path_task):
297  time.sleep(0.1)
298 
299  self.info_msginfo_msg('Action completed! Checking validity of terminal condition...')
300 
301  # Check result for validity
302  if not self.distanceFromGoaldistanceFromGoal() < 1.0:
303  self.error_msgerror_msg('Did not make it to the goal pose!')
304  return False
305 
306  self.info_msginfo_msg('Goal succeeded!')
307  return True
308 
309  def feedback_callback(self, feedback_msg: ComputeAndTrackRoute.Feedback) -> None:
310  self.feedback_msgsfeedback_msgs.append(feedback_msg.feedback)
311 
312  def distanceFromGoal(self) -> float:
313  d_x = self.current_posecurrent_pose.position.x - self.goal_posegoal_pose.position.x
314  d_y = self.current_posecurrent_pose.position.y - self.goal_posegoal_pose.position.y
315  distance = math.sqrt(d_x * d_x + d_y * d_y)
316  self.info_msginfo_msg(f'Distance from goal is: {distance}')
317  return distance
318 
319  def info_msg(self, msg: str) -> None:
320  self.get_logger().info('\033[1;37;44m' + msg + '\033[0m')
321 
322  def error_msg(self, msg: str) -> None:
323  self.get_logger().error('\033[1;37;41m' + msg + '\033[0m')
324 
325  def setInitialPose(self) -> None:
326  msg = PoseWithCovarianceStamped()
327  msg.pose.pose = self.initial_poseinitial_pose
328  msg.header.frame_id = 'map'
329  self.info_msginfo_msg('Publishing Initial Pose')
330  self.initial_pose_pubinitial_pose_pub.publish(msg)
331  self.currentPosecurrentPose = self.initial_poseinitial_pose
332 
333  def getStampedPoseMsg(self, pose: Pose) -> PoseStamped:
334  msg = PoseStamped()
335  msg.header.frame_id = 'map'
336  msg.pose = pose
337  return msg
338 
339  def poseCallback(self, msg: PoseWithCovarianceStamped) -> None:
340  self.info_msginfo_msg('Received amcl_pose')
341  self.current_posecurrent_pose = msg.pose.pose
342  self.initial_pose_receivedinitial_pose_received = True
343 
344  def wait_for_node_active(self, node_name: str) -> None:
345  # Waits for the node within the tester namespace to become active
346  self.info_msginfo_msg(f'Waiting for {node_name} to become active')
347  node_service = f'{node_name}/get_state'
348  state_client = self.create_client(GetState, node_service)
349  while not state_client.wait_for_service(timeout_sec=1.0):
350  self.info_msginfo_msg(f'{node_service} service not available, waiting...')
351  req = GetState.Request() # empty request
352  state = 'UNKNOWN'
353  while state != 'active':
354  self.info_msginfo_msg(f'Getting {node_name} state...')
355  future = state_client.call_async(req)
356  rclpy.spin_until_future_complete(self, future)
357  if future.result() is not None:
358  state = future.result().current_state.label # type: ignore[union-attr]
359  self.info_msginfo_msg(f'Result of get_state: {state}')
360  else:
361  self.error_msgerror_msg(
362  f'Exception while calling service: {future.exception()!r}'
363  )
364  time.sleep(5)
365 
366  def shutdown(self) -> None:
367  self.info_msginfo_msg('Shutting down')
368  self.compute_action_clientcompute_action_client.destroy()
369  self.compute_track_action_clientcompute_track_action_client.destroy()
370 
371  transition_service = 'lifecycle_manager_navigation/manage_nodes'
372  mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
373  while not mgr_client.wait_for_service(timeout_sec=1.0):
374  self.info_msginfo_msg(f'{transition_service} service not available, waiting...')
375 
376  req = ManageLifecycleNodes.Request()
377  req.command = ManageLifecycleNodes.Request().SHUTDOWN
378  future = mgr_client.call_async(req)
379  try:
380  self.info_msginfo_msg('Shutting down navigation lifecycle manager...')
381  rclpy.spin_until_future_complete(self, future)
382  future.result()
383  self.info_msginfo_msg('Shutting down navigation lifecycle manager complete.')
384  except Exception as e: # noqa: B902
385  self.error_msgerror_msg(f'Service call failed {e!r}')
386  transition_service = 'lifecycle_manager_localization/manage_nodes'
387  mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
388  while not mgr_client.wait_for_service(timeout_sec=1.0):
389  self.info_msginfo_msg(f'{transition_service} service not available, waiting...')
390 
391  req = ManageLifecycleNodes.Request()
392  req.command = ManageLifecycleNodes.Request().SHUTDOWN
393  future = mgr_client.call_async(req)
394  try:
395  self.info_msginfo_msg('Shutting down localization lifecycle manager...')
396  rclpy.spin_until_future_complete(self, future)
397  future.result()
398  self.info_msginfo_msg('Shutting down localization lifecycle manager complete')
399  except Exception as e: # noqa: B902
400  self.error_msgerror_msg(f'Service call failed {e!r}')
401 
402  def wait_for_initial_pose(self) -> None:
403  self.initial_pose_receivedinitial_pose_received = False
404  while not self.initial_pose_receivedinitial_pose_received:
405  self.info_msginfo_msg('Setting initial pose')
406  self.setInitialPosesetInitialPose()
407  self.info_msginfo_msg('Waiting for amcl_pose to be received')
408  rclpy.spin_once(self, timeout_sec=1)
409 
410 
411 def run_all_tests(robot_tester: RouteTester) -> bool:
412  # set transforms to use_sim_time
413  robot_tester.wait_for_node_active('amcl')
414  robot_tester.wait_for_initial_pose()
415  robot_tester.wait_for_node_active('bt_navigator')
416  result_poses = robot_tester.runComputeRouteTest(use_poses=True)
417  result_node_ids = robot_tester.runComputeRouteTest(use_poses=False)
418  result_same = robot_tester.runComputeRouteSamePoseTest()
419  result = result_poses and result_node_ids and result_same and robot_tester.runTrackRouteTest()
420 
421  if result:
422  robot_tester.info_msg('Test PASSED')
423  else:
424  robot_tester.error_msg('Test FAILED')
425  return result
426 
427 
428 def fwd_pose(x: float = 0.0, y: float = 0.0, z: float = 0.01) -> Pose:
429  initial_pose = Pose()
430  initial_pose.position.x = x
431  initial_pose.position.y = y
432  initial_pose.position.z = z
433  initial_pose.orientation.x = 0.0
434  initial_pose.orientation.y = 0.0
435  initial_pose.orientation.z = 0.0
436  initial_pose.orientation.w = 1.0
437  return initial_pose
438 
439 
440 def main(argv: list[str] = sys.argv[1:]): # type: ignore[no-untyped-def]
441  # The robot(s) positions from the input arguments
442  parser = argparse.ArgumentParser(description='Route server tester node')
443  group = parser.add_mutually_exclusive_group(required=True)
444  group.add_argument(
445  '-r',
446  '--robot',
447  action='append',
448  nargs=4,
449  metavar=('init_x', 'init_y', 'final_x', 'final_y'),
450  help='The robot starting and final positions.',
451  )
452  args, unknown = parser.parse_known_args()
453 
454  rclpy.init()
455 
456  # Create test object
457  init_x, init_y, final_x, final_y = args.robot[0]
458  tester = RouteTester(
459  initial_pose=fwd_pose(float(init_x), float(init_y)),
460  goal_pose=fwd_pose(float(final_x), float(final_y)),
461  )
462  tester.info_msg(
463  'Starting tester, robot going from '
464  + init_x
465  + ', '
466  + init_y
467  + ' to '
468  + final_x
469  + ', '
470  + final_y
471  + ' via route server.'
472  )
473 
474  # wait a few seconds to make sure entire stacks are up
475  time.sleep(10)
476 
477  # run tests
478  passed = run_all_tests(tester)
479 
480  tester.shutdown()
481  tester.info_msg('Done Shutting Down.')
482 
483  if not passed:
484  tester.info_msg('Exiting failed')
485  exit(1)
486  else:
487  tester.info_msg('Exiting passed')
488  exit(0)
489 
490 
491 if __name__ == '__main__':
492  main()
None feedback_callback(self, ComputeAndTrackRoute.Feedback feedback_msg)
Definition: tester_node.py:309
None info_msg(self, str msg)
Definition: tester_node.py:319
None poseCallback(self, PoseWithCovarianceStamped msg)
Definition: tester_node.py:339
None error_msg(self, str msg)
Definition: tester_node.py:322
PoseStamped getStampedPoseMsg(self, Pose pose)
Definition: tester_node.py:333
float distanceFromGoal(self)
Definition: tester_node.py:312