Nav2 Navigation Stack - rolling  main
ROS 2 Navigation Stack
nav_through_poses_tester_error_msg_node.py
1 #! /usr/bin/env python3
2 # Copyright 2018 Intel Corporation.
3 # Copyright 2020 Florian Gramss
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 
17 import argparse
18 import sys
19 import time
20 from typing import Optional
21 
22 from action_msgs.msg import GoalStatus
23 from geometry_msgs.msg import Pose, PoseStamped, PoseWithCovarianceStamped
24 from lifecycle_msgs.srv import GetState
25 from nav2_msgs.action import NavigateThroughPoses
26 from nav2_msgs.srv import ManageLifecycleNodes
27 from rcl_interfaces.srv import SetParameters
28 import rclpy
29 from rclpy.action.client import ActionClient
30 from rclpy.client import Client
31 from rclpy.node import Node
32 from rclpy.parameter import Parameter
33 from rclpy.qos import QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy
34 from std_msgs.msg import String
35 
36 
37 class NavTester(Node):
38 
39  def __init__(self, initial_pose: Pose, goal_pose: Pose, namespace: str = ''):
40  super().__init__(node_name='nav2_tester', namespace=namespace)
41  self.initial_pose_pubinitial_pose_pub = self.create_publisher(
42  PoseWithCovarianceStamped, 'initialpose', 10
43  )
44 
45  checker_qos = QoSProfile(
46  durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
47  reliability=QoSReliabilityPolicy.RELIABLE,
48  history=QoSHistoryPolicy.KEEP_LAST,
49  depth=1,
50  )
51 
52  self.goal_checker_selector_pubgoal_checker_selector_pub = self.create_publisher(
53  String, 'goal_checker_selector', checker_qos)
54 
55  self.progress_checker_selector_pubprogress_checker_selector_pub = self.create_publisher(
56  String, 'progress_checker_selector', checker_qos)
57 
58  pose_qos = QoSProfile(
59  durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
60  reliability=QoSReliabilityPolicy.RELIABLE,
61  history=QoSHistoryPolicy.KEEP_LAST,
62  depth=1,
63  )
64 
65  self.model_pose_submodel_pose_sub = self.create_subscription(
66  PoseWithCovarianceStamped, 'amcl_pose', self.poseCallbackposeCallback, pose_qos
67  )
68  self.initial_pose_receivedinitial_pose_received = False
69  self.initial_poseinitial_pose = initial_pose
70  self.goal_posegoal_pose = goal_pose
71 
72  self.action_client: ActionClient[
73  NavigateThroughPoses.Goal,
74  NavigateThroughPoses.Result,
75  NavigateThroughPoses.Feedback
76  ] = ActionClient(self, NavigateThroughPoses, 'navigate_through_poses')
77 
78  self.controller_param_cli: Client[SetParameters.Request, SetParameters.Response] = \
79  self.create_client(
80  SetParameters, '/controller_server/set_parameters'
81  )
82 
83  def info_msg(self, msg: str) -> None:
84  self.get_logger().info('\033[1;37;44m' + msg + '\033[0m')
85 
86  def warn_msg(self, msg: str) -> None:
87  self.get_logger().warning('\033[1;37;43m' + msg + '\033[0m')
88 
89  def error_msg(self, msg: str) -> None:
90  self.get_logger().error('\033[1;37;41m' + msg + '\033[0m')
91 
92  def setInitialPose(self) -> None:
93  msg = PoseWithCovarianceStamped()
94  msg.pose.pose = self.initial_poseinitial_pose
95  msg.header.frame_id = 'map'
96  self.info_msginfo_msg('Publishing Initial Pose')
97  self.initial_pose_pubinitial_pose_pub.publish(msg)
98  self.currentPosecurrentPose = self.initial_poseinitial_pose
99 
100  def setGoalChecker(self, name: str) -> None:
101  msg = String()
102  msg.data = name
103  self.goal_checker_selector_pubgoal_checker_selector_pub.publish(msg)
104 
105  def setProgressChecker(self, name: str) -> None:
106  msg = String()
107  msg.data = name
108  self.progress_checker_selector_pubprogress_checker_selector_pub.publish(msg)
109 
110  def setControllerParam(self, name: str, parameter_type: Parameter.Type, value: float) -> None:
111  req = SetParameters.Request()
112  req.parameters = [
113  Parameter(name, parameter_type, value).to_parameter_msg()
114  ]
115  future = self.controller_param_cli.call_async(req)
116  rclpy.spin_until_future_complete(self, future)
117 
118  def getStampedPoseMsg(self, pose: Pose) -> PoseStamped:
119  msg = PoseStamped()
120  msg.header.frame_id = 'map'
121  msg.pose = pose
122  return msg
123 
124  def runNavigateAction(self,
125  goal_pose: Optional[Pose] = None,
126  behavior_tree: Optional[str] = None,
127  expected_error_code: Optional[int] = None,
128  expected_error_msg: Optional[str] = None) -> bool:
129  # Sends a `NavToPose` action request and waits for completion
130  self.info_msginfo_msg("Waiting for 'NavigateThroughPoses' action server")
131  while not self.action_client.wait_for_server(timeout_sec=1.0):
132  self.info_msginfo_msg(
133  "'NavigateThroughPoses' action server not available, waiting..."
134  )
135 
136  self.goal_posegoal_pose = goal_pose if goal_pose is not None else self.goal_posegoal_pose
137  goal_msg = NavigateThroughPoses.Goal()
138  goal_msg.poses.header.frame_id = 'map'
139  goal_msg.poses.header.stamp = self.get_clock().now().to_msg()
140  goal_msg.poses.goals = [
141  self.getStampedPoseMsggetStampedPoseMsg(self.goal_posegoal_pose),
142  self.getStampedPoseMsggetStampedPoseMsg(self.goal_posegoal_pose),
143  ]
144  goal_msg.behavior_tree = behavior_tree
145 
146  self.info_msginfo_msg('Sending goal request...')
147  send_goal_future = self.action_client.send_goal_async(goal_msg)
148 
149  rclpy.spin_until_future_complete(self, send_goal_future)
150  goal_handle = send_goal_future.result()
151 
152  if goal_handle is None or not goal_handle.accepted:
153  self.error_msgerror_msg('Goal rejected')
154  return False
155 
156  self.info_msginfo_msg('Goal accepted')
157  get_result_future = goal_handle.get_result_async()
158 
159  self.info_msginfo_msg("Waiting for 'NavigateToPose' action to complete")
160  rclpy.spin_until_future_complete(self, get_result_future)
161  status = get_result_future.result().status # type: ignore[union-attr]
162  if status != GoalStatus.STATUS_SUCCEEDED:
163  result = get_result_future.result().result # type: ignore[union-attr]
164  if (result.error_code == expected_error_code and
165  result.error_msg == expected_error_msg):
166  self.info_msginfo_msg(f'Goal failed as expected with status code: {status}'
167  f' error code:{result.error_code}'
168  f' error msg:{result.error_msg}')
169  return True
170  else:
171  self.error_msgerror_msg(f'Goal failed unexpectedly with status code: {status}'
172  f' Expected error_code:{expected_error_code},'
173  f' Got error_code:{result.error_code},'
174  f' Expected error_msg:{expected_error_msg},'
175  f' Got error_msg:{result.error_msg}')
176  return False
177 
178  self.info_msginfo_msg('Goal succeeded!')
179  return True
180 
181  def poseCallback(self, msg: PoseWithCovarianceStamped) -> None:
182  self.info_msginfo_msg('Received amcl_pose')
183  self.current_posecurrent_pose = msg.pose.pose
184  self.initial_pose_receivedinitial_pose_received = True
185 
186  def wait_for_node_active(self, node_name: str) -> None:
187  # Waits for the node within the tester namespace to become active
188  self.info_msginfo_msg(f'Waiting for {node_name} to become active')
189  node_service = f'{node_name}/get_state'
190  state_client: Client[GetState.Request, GetState.Response] = \
191  self.create_client(GetState, node_service)
192  while not state_client.wait_for_service(timeout_sec=1.0):
193  self.info_msginfo_msg(f'{node_service} service not available, waiting...')
194  req = GetState.Request() # empty request
195  state = 'UNKNOWN'
196  while state != 'active':
197  self.info_msginfo_msg(f'Getting {node_name} state...')
198  future = state_client.call_async(req)
199  rclpy.spin_until_future_complete(self, future)
200  if future.result() is not None:
201  state = future.result().current_state.label # type: ignore[union-attr]
202  self.info_msginfo_msg(f'Result of get_state: {state}')
203  else:
204  self.error_msgerror_msg(
205  f'Exception while calling service: {future.exception()!r}'
206  )
207  time.sleep(5)
208 
209  def shutdown(self) -> None:
210  self.info_msginfo_msg('Shutting down')
211  self.action_client.destroy()
212 
213  transition_service = 'lifecycle_manager_navigation/manage_nodes'
214  mgr_client: Client[ManageLifecycleNodes.Request, ManageLifecycleNodes.Response] = \
215  self.create_client(ManageLifecycleNodes, transition_service)
216  while not mgr_client.wait_for_service(timeout_sec=1.0):
217  self.info_msginfo_msg(f'{transition_service} service not available, waiting...')
218 
219  req = ManageLifecycleNodes.Request()
220  req.command = ManageLifecycleNodes.Request.SHUTDOWN
221  future = mgr_client.call_async(req)
222  try:
223  self.info_msginfo_msg('Shutting down navigation lifecycle manager...')
224  rclpy.spin_until_future_complete(self, future)
225  future.result()
226  self.info_msginfo_msg('Shutting down navigation lifecycle manager complete.')
227  except Exception as e: # noqa: B902
228  self.error_msgerror_msg(f'Service call failed {e!r}')
229  transition_service = 'lifecycle_manager_localization/manage_nodes'
230  mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
231  while not mgr_client.wait_for_service(timeout_sec=1.0):
232  self.info_msginfo_msg(f'{transition_service} service not available, waiting...')
233 
234  req = ManageLifecycleNodes.Request()
235  req.command = ManageLifecycleNodes.Request.SHUTDOWN
236  future = mgr_client.call_async(req)
237  try:
238  self.info_msginfo_msg('Shutting down localization lifecycle manager...')
239  rclpy.spin_until_future_complete(self, future)
240  future.result()
241  self.info_msginfo_msg('Shutting down localization lifecycle manager complete')
242  except Exception as e: # noqa: B902
243  self.error_msgerror_msg(f'Service call failed {e!r}')
244 
245  def wait_for_initial_pose(self) -> None:
246  self.initial_pose_receivedinitial_pose_received = False
247  while not self.initial_pose_receivedinitial_pose_received:
248  self.info_msginfo_msg('Setting initial pose')
249  self.setInitialPosesetInitialPose()
250  self.info_msginfo_msg('Waiting for amcl_pose to be received')
251  rclpy.spin_once(self, timeout_sec=1)
252 
253 
254 def run_all_tests(robot_tester: NavTester) -> bool:
255  pose_out_of_bounds = Pose()
256  pose_out_of_bounds.position.x = 2000.0
257  pose_out_of_bounds.position.y = 4000.0
258  pose_out_of_bounds.position.z = 0.0
259  pose_out_of_bounds.orientation.x = 0.0
260  pose_out_of_bounds.orientation.y = 0.0
261  pose_out_of_bounds.orientation.z = 0.0
262  pose_out_of_bounds.orientation.w = 1.0
263 
264  reasonable_pose = Pose()
265  reasonable_pose.position.x = -1.0
266  reasonable_pose.position.y = 0.0
267  reasonable_pose.position.z = 0.0
268  reasonable_pose.orientation.x = 0.0
269  reasonable_pose.orientation.y = 0.0
270  reasonable_pose.orientation.z = 0.0
271  reasonable_pose.orientation.w = 1.0
272 
273  robot_tester.wait_for_node_active('amcl')
274  robot_tester.wait_for_initial_pose()
275  robot_tester.wait_for_node_active('bt_navigator')
276  robot_tester.setGoalChecker('general_goal_checker')
277  robot_tester.setProgressChecker('progress_checker')
278 
279  result = True
280  if result:
281  robot_tester.info_msg('Test non existing behavior_tree xml file')
282  result = robot_tester.runNavigateAction(
283  goal_pose=pose_out_of_bounds,
284  behavior_tree='behavior_tree_that_does_not_exist.xml',
285  expected_error_code=NavigateThroughPoses.Result.FAILED_TO_LOAD_BEHAVIOR_TREE,
286  expected_error_msg=(
287  'Error loading BT: behavior_tree_that_does_not_exist.xml. '
288  'Navigation canceled.'
289  ),
290  )
291 
292  if result:
293  robot_tester.info_msg('Test goal out of bounds')
294  result = robot_tester.runNavigateAction(
295  goal_pose=pose_out_of_bounds,
296  behavior_tree='',
297  expected_error_code=304,
298  expected_error_msg=('GridBasedplugin failed to plan from '
299  '(-2.00, -0.50) to (2000.00, 4000.00): '
300  '"Goal Coordinates of(2000.000000, 4000.000000) '
301  'was outside bounds"'))
302 
303  if result:
304  robot_tester.info_msg('Test for unknown goal checker')
305  robot_tester.setGoalChecker('junk_goal_checker')
306  result = robot_tester.runNavigateAction(
307  goal_pose=reasonable_pose,
308  behavior_tree='',
309  expected_error_code=100,
310  expected_error_msg=('Failed to find goal checker name: junk_goal_checker'))
311  robot_tester.setGoalChecker('general_goal_checker')
312 
313  if result:
314  robot_tester.info_msg('Test for unknown progress checker')
315  robot_tester.setProgressChecker('junk_progress_checker')
316  result = robot_tester.runNavigateAction(
317  goal_pose=reasonable_pose,
318  behavior_tree='',
319  expected_error_code=100,
320  expected_error_msg=('Failed to find progress checker name: junk_progress_checker'))
321  robot_tester.setProgressChecker('progress_checker')
322 
323  if result:
324  robot_tester.info_msg('Test for impossible to achieve progress parameters')
325  robot_tester.setControllerParam(
326  'progress_checker.movement_time_allowance',
327  Parameter.Type.DOUBLE,
328  0.1)
329  robot_tester.setControllerParam(
330  'progress_checker.required_movement_radius',
331  Parameter.Type.DOUBLE,
332  10.0)
333  # Limit controller to generate very slow velocities
334  # Note assumes nav2_dwb_controller dwb_core::DWBLocalPlanner
335  robot_tester.setControllerParam(
336  'FollowPath.max_vel_x',
337  Parameter.Type.DOUBLE,
338  0.0001)
339  result = robot_tester.runNavigateAction(
340  goal_pose=reasonable_pose,
341  behavior_tree='',
342  expected_error_code=105,
343  expected_error_msg=('Failed to make progress'))
344 
345  # Add more tests here if desired
346  if result:
347  robot_tester.info_msg('Test PASSED')
348  else:
349  robot_tester.error_msg('Test FAILED')
350 
351  return result
352 
353 
354 def fwd_pose(x: float = 0.0, y: float = 0.0, z: float = 0.01) -> Pose:
355  initial_pose = Pose()
356  initial_pose.position.x = x
357  initial_pose.position.y = y
358  initial_pose.position.z = z
359  initial_pose.orientation.x = 0.0
360  initial_pose.orientation.y = 0.0
361  initial_pose.orientation.z = 0.0
362  initial_pose.orientation.w = 1.0
363  return initial_pose
364 
365 
366 def get_testers(args: argparse.Namespace) -> list[NavTester]:
367  testers = []
368 
369  init_x, init_y, final_x, final_y = args.robot[0]
370  tester = NavTester(
371  initial_pose=fwd_pose(float(init_x), float(init_y)),
372  goal_pose=fwd_pose(float(final_x), float(final_y)),
373  )
374  tester.info_msg(
375  'Starting tester, robot going from '
376  + init_x
377  + ', '
378  + init_y
379  + ' to '
380  + final_x
381  + ', '
382  + final_y
383  + '.'
384  )
385  testers.append(tester)
386  return testers
387 
388 
389 def main(argv: list[str] = sys.argv[1:]): # type: ignore[no-untyped-def]
390  # The robot(s) positions from the input arguments
391  parser = argparse.ArgumentParser(description='System-level navigation tester node')
392  group = parser.add_mutually_exclusive_group(required=True)
393  group.add_argument(
394  '-r',
395  '--robot',
396  action='append',
397  nargs=4,
398  metavar=('init_x', 'init_y', 'final_x', 'final_y'),
399  help='The robot starting and final positions.',
400  )
401 
402  args, _ = parser.parse_known_args()
403 
404  rclpy.init()
405 
406  # Create testers for each robot
407  testers = get_testers(args)
408 
409  # wait a few seconds to make sure entire stacks are up
410  time.sleep(10)
411 
412  passed = False
413  for tester in testers:
414  passed = run_all_tests(tester)
415  if not passed:
416  break
417 
418  for tester in testers:
419  # stop and shutdown the nav stack to exit cleanly
420  tester.shutdown()
421 
422  testers[0].info_msg('Done Shutting Down.')
423 
424  if not passed:
425  testers[0].info_msg('Exiting failed')
426  exit(1)
427  else:
428  testers[0].info_msg('Exiting passed')
429  exit(0)
430 
431 
432 if __name__ == '__main__':
433  main()