Nav2 Navigation Stack - rolling  main
ROS 2 Navigation Stack
nav_through_poses_tester_error_msg_node.py
1 #! /usr/bin/env python3
2 # Copyright 2018 Intel Corporation.
3 # Copyright 2020 Florian Gramss
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 
17 import argparse
18 import sys
19 import time
20 from typing import Optional
21 
22 from action_msgs.msg import GoalStatus
23 from geometry_msgs.msg import Pose, PoseStamped, PoseWithCovarianceStamped
24 from lifecycle_msgs.srv import GetState
25 from nav2_msgs.action import NavigateThroughPoses
26 from nav2_msgs.srv import ManageLifecycleNodes
27 from rcl_interfaces.srv import SetParameters
28 import rclpy
29 from rclpy.action.client import ActionClient
30 from rclpy.client import Client
31 from rclpy.node import Node
32 from rclpy.parameter import Parameter
33 from rclpy.qos import QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy
34 from std_msgs.msg import String
35 
36 
37 class NavTester(Node):
38 
39  def __init__(self, initial_pose: Pose, goal_pose: Pose, namespace: str = ''):
40  super().__init__(node_name='nav2_tester', namespace=namespace)
41  self.initial_pose_pubinitial_pose_pub = self.create_publisher(
42  PoseWithCovarianceStamped, 'initialpose', 10
43  )
44 
45  checker_qos = QoSProfile(
46  durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
47  reliability=QoSReliabilityPolicy.RELIABLE,
48  history=QoSHistoryPolicy.KEEP_LAST,
49  depth=1,
50  )
51 
52  self.goal_checker_selector_pubgoal_checker_selector_pub = self.create_publisher(
53  String, 'goal_checker_selector', checker_qos)
54 
55  self.progress_checker_selector_pubprogress_checker_selector_pub = self.create_publisher(
56  String, 'progress_checker_selector', checker_qos)
57 
58  pose_qos = QoSProfile(
59  durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
60  reliability=QoSReliabilityPolicy.RELIABLE,
61  history=QoSHistoryPolicy.KEEP_LAST,
62  depth=1,
63  )
64 
65  self.model_pose_submodel_pose_sub = self.create_subscription(
66  PoseWithCovarianceStamped, 'amcl_pose', self.poseCallbackposeCallback, pose_qos
67  )
68  self.initial_pose_receivedinitial_pose_received = False
69  self.initial_poseinitial_pose = initial_pose
70  self.goal_posegoal_pose = goal_pose
71 
72  self.action_client: ActionClient[
73  NavigateThroughPoses.Goal,
74  NavigateThroughPoses.Result,
75  NavigateThroughPoses.Feedback
76  ] = ActionClient(self, NavigateThroughPoses, 'navigate_through_poses')
77 
78  self.controller_param_cli: Client[SetParameters.Request, SetParameters.Response] = \
79  self.create_client(
80  SetParameters, '/controller_server/set_parameters'
81  )
82 
83  def info_msg(self, msg: str) -> None:
84  self.get_logger().info('\033[1;37;44m' + msg + '\033[0m')
85 
86  def warn_msg(self, msg: str) -> None:
87  self.get_logger().warning('\033[1;37;43m' + msg + '\033[0m')
88 
89  def error_msg(self, msg: str) -> None:
90  self.get_logger().error('\033[1;37;41m' + msg + '\033[0m')
91 
92  def setInitialPose(self) -> None:
93  msg = PoseWithCovarianceStamped()
94  msg.pose.pose = self.initial_poseinitial_pose
95  msg.header.frame_id = 'map'
96  self.info_msginfo_msg('Publishing Initial Pose')
97  self.initial_pose_pubinitial_pose_pub.publish(msg)
98  self.currentPosecurrentPose = self.initial_poseinitial_pose
99 
100  def setGoalChecker(self, name: str) -> None:
101  msg = String()
102  msg.data = name
103  self.goal_checker_selector_pubgoal_checker_selector_pub.publish(msg)
104 
105  def setProgressChecker(self, name: str) -> None:
106  msg = String()
107  msg.data = name
108  self.progress_checker_selector_pubprogress_checker_selector_pub.publish(msg)
109 
110  def setControllerParam(self, name: str, parameter_type: Parameter.Type, value: float) -> None:
111  req = SetParameters.Request()
112  req.parameters = [
113  Parameter(name, parameter_type, value).to_parameter_msg()
114  ]
115  future = self.controller_param_cli.call_async(req)
116  rclpy.spin_until_future_complete(self, future)
117 
118  def getStampedPoseMsg(self, pose: Pose) -> PoseStamped:
119  msg = PoseStamped()
120  msg.header.frame_id = 'map'
121  msg.pose = pose
122  return msg
123 
124  def runNavigateAction(self,
125  goal_pose: Optional[Pose] = None,
126  behavior_tree: Optional[str] = None,
127  expected_error_code: Optional[int] = None,
128  expected_error_msg: Optional[str] = None) -> bool:
129  # Sends a `NavToPose` action request and waits for completion
130  self.info_msginfo_msg("Waiting for 'NavigateThroughPoses' action server")
131  while not self.action_client.wait_for_server(timeout_sec=1.0):
132  self.info_msginfo_msg(
133  "'NavigateThroughPoses' action server not available, waiting..."
134  )
135 
136  self.goal_posegoal_pose = goal_pose if goal_pose is not None else self.goal_posegoal_pose
137  goal_msg = NavigateThroughPoses.Goal()
138  goal_msg.poses.header.frame_id = 'map'
139  goal_msg.poses.header.stamp = self.get_clock().now().to_msg()
140  goal_msg.poses.goals = [
141  self.getStampedPoseMsggetStampedPoseMsg(self.goal_posegoal_pose),
142  self.getStampedPoseMsggetStampedPoseMsg(self.goal_posegoal_pose),
143  ]
144  goal_msg.behavior_tree = behavior_tree
145 
146  self.info_msginfo_msg('Sending goal request...')
147  send_goal_future = self.action_client.send_goal_async(goal_msg)
148 
149  rclpy.spin_until_future_complete(self, send_goal_future)
150  goal_handle = send_goal_future.result()
151 
152  if goal_handle is None or not goal_handle.accepted:
153  self.error_msgerror_msg('Goal rejected')
154  return False
155 
156  self.info_msginfo_msg('Goal accepted')
157  get_result_future = goal_handle.get_result_async()
158 
159  self.info_msginfo_msg("Waiting for 'NavigateToPose' action to complete")
160  rclpy.spin_until_future_complete(self, get_result_future)
161  status = get_result_future.result().status # type: ignore[union-attr]
162  if status != GoalStatus.STATUS_SUCCEEDED:
163  result = get_result_future.result().result # type: ignore[union-attr]
164  if (result.error_code == expected_error_code and
165  result.error_msg == expected_error_msg):
166  self.info_msginfo_msg(f'Goal failed as expected with status code: {status}'
167  f' error code:{result.error_code}'
168  f' error msg:{result.error_msg}')
169  return True
170  else:
171  self.error_msgerror_msg(f'Goal failed unexpectedly with status code: {status}'
172  f' Expected error_code:{expected_error_code},'
173  f' Got error_code:{result.error_code},'
174  f' Expected error_msg:{expected_error_msg},'
175  f' Got error_msg:{result.error_msg}')
176  return False
177 
178  self.info_msginfo_msg('Goal succeeded!')
179  return True
180 
181  def poseCallback(self, msg: PoseWithCovarianceStamped) -> None:
182  self.info_msginfo_msg('Received amcl_pose')
183  self.current_posecurrent_pose = msg.pose.pose
184  self.initial_pose_receivedinitial_pose_received = True
185 
186  def wait_for_node_active(self, node_name: str) -> None:
187  # Waits for the node within the tester namespace to become active
188  self.info_msginfo_msg(f'Waiting for {node_name} to become active')
189  node_service = f'{node_name}/get_state'
190  state_client: Client[GetState.Request, GetState.Response] = \
191  self.create_client(GetState, node_service)
192  while not state_client.wait_for_service(timeout_sec=1.0):
193  self.info_msginfo_msg(f'{node_service} service not available, waiting...')
194  req = GetState.Request() # empty request
195  state = 'UNKNOWN'
196  while state != 'active':
197  self.info_msginfo_msg(f'Getting {node_name} state...')
198  future = state_client.call_async(req)
199  rclpy.spin_until_future_complete(self, future)
200  if future.result() is not None:
201  state = future.result().current_state.label # type: ignore[union-attr]
202  self.info_msginfo_msg(f'Result of get_state: {state}')
203  else:
204  self.error_msgerror_msg(
205  f'Exception while calling service: {future.exception()!r}'
206  )
207  time.sleep(5)
208 
209  def shutdown(self) -> None:
210  self.info_msginfo_msg('Shutting down')
211  self.action_client.destroy()
212 
213  transition_service = 'lifecycle_manager_navigation/manage_nodes'
214  mgr_client: Client[ManageLifecycleNodes.Request, ManageLifecycleNodes.Response] = \
215  self.create_client(ManageLifecycleNodes, transition_service)
216  while not mgr_client.wait_for_service(timeout_sec=1.0):
217  self.info_msginfo_msg(f'{transition_service} service not available, waiting...')
218 
219  req = ManageLifecycleNodes.Request()
220  req.command = ManageLifecycleNodes.Request.SHUTDOWN
221  future = mgr_client.call_async(req)
222  try:
223  self.info_msginfo_msg('Shutting down navigation lifecycle manager...')
224  rclpy.spin_until_future_complete(self, future)
225  future.result()
226  self.info_msginfo_msg('Shutting down navigation lifecycle manager complete.')
227  except Exception as e: # noqa: B902
228  self.error_msgerror_msg(f'Service call failed {e!r}')
229  transition_service = 'lifecycle_manager_localization/manage_nodes'
230  mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
231  while not mgr_client.wait_for_service(timeout_sec=1.0):
232  self.info_msginfo_msg(f'{transition_service} service not available, waiting...')
233 
234  req = ManageLifecycleNodes.Request()
235  req.command = ManageLifecycleNodes.Request.SHUTDOWN
236  future = mgr_client.call_async(req)
237  try:
238  self.info_msginfo_msg('Shutting down localization lifecycle manager...')
239  rclpy.spin_until_future_complete(self, future)
240  future.result()
241  self.info_msginfo_msg('Shutting down localization lifecycle manager complete')
242  except Exception as e: # noqa: B902
243  self.error_msgerror_msg(f'Service call failed {e!r}')
244 
245  def wait_for_initial_pose(self) -> None:
246  self.initial_pose_receivedinitial_pose_received = False
247  while not self.initial_pose_receivedinitial_pose_received:
248  self.info_msginfo_msg('Setting initial pose')
249  self.setInitialPosesetInitialPose()
250  self.info_msginfo_msg('Waiting for amcl_pose to be received')
251  rclpy.spin_once(self, timeout_sec=1)
252 
253 
254 def run_all_tests(robot_tester: NavTester) -> bool:
255  pose_out_of_bounds = Pose()
256  pose_out_of_bounds.position.x = 2000.0
257  pose_out_of_bounds.position.y = 4000.0
258  pose_out_of_bounds.position.z = 0.0
259  pose_out_of_bounds.orientation.x = 0.0
260  pose_out_of_bounds.orientation.y = 0.0
261  pose_out_of_bounds.orientation.z = 0.0
262  pose_out_of_bounds.orientation.w = 1.0
263 
264  reasonable_pose = Pose()
265  reasonable_pose.position.x = -1.0
266  reasonable_pose.position.y = 0.0
267  reasonable_pose.position.z = 0.0
268  reasonable_pose.orientation.x = 0.0
269  reasonable_pose.orientation.y = 0.0
270  reasonable_pose.orientation.z = 0.0
271  reasonable_pose.orientation.w = 1.0
272 
273  robot_tester.wait_for_node_active('amcl')
274  robot_tester.wait_for_initial_pose()
275  robot_tester.wait_for_node_active('bt_navigator')
276  robot_tester.setGoalChecker('general_goal_checker')
277  robot_tester.setProgressChecker('progress_checker')
278 
279  result = True
280  if result:
281  robot_tester.info_msg('Test non existing behavior_tree xml file')
282  result = robot_tester.runNavigateAction(
283  goal_pose=pose_out_of_bounds,
284  behavior_tree='behavior_tree_that_does_not_exist.xml',
285  expected_error_code=NavigateThroughPoses.Result.FAILED_TO_LOAD_BEHAVIOR_TREE,
286  expected_error_msg=('Error loading XML file: behavior_tree_that_does_not_exist.xml. '
287  'Navigation canceled.'))
288 
289  if result:
290  robot_tester.info_msg('Test goal out of bounds')
291  result = robot_tester.runNavigateAction(
292  goal_pose=pose_out_of_bounds,
293  behavior_tree='',
294  expected_error_code=304,
295  expected_error_msg=('GridBasedplugin failed to plan from '
296  '(-2.00, -0.50) to (2000.00, 4000.00): '
297  '"Goal Coordinates of(2000.000000, 4000.000000) '
298  'was outside bounds"'))
299 
300  if result:
301  robot_tester.info_msg('Test for unknown goal checker')
302  robot_tester.setGoalChecker('junk_goal_checker')
303  result = robot_tester.runNavigateAction(
304  goal_pose=reasonable_pose,
305  behavior_tree='',
306  expected_error_code=100,
307  expected_error_msg=('Failed to find goal checker name: junk_goal_checker'))
308  robot_tester.setGoalChecker('general_goal_checker')
309 
310  if result:
311  robot_tester.info_msg('Test for unknown progress checker')
312  robot_tester.setProgressChecker('junk_progress_checker')
313  result = robot_tester.runNavigateAction(
314  goal_pose=reasonable_pose,
315  behavior_tree='',
316  expected_error_code=100,
317  expected_error_msg=('Failed to find progress checker name: junk_progress_checker'))
318  robot_tester.setProgressChecker('progress_checker')
319 
320  if result:
321  robot_tester.info_msg('Test for impossible to achieve progress parameters')
322  robot_tester.setControllerParam(
323  'progress_checker.movement_time_allowance',
324  Parameter.Type.DOUBLE,
325  0.1)
326  robot_tester.setControllerParam(
327  'progress_checker.required_movement_radius',
328  Parameter.Type.DOUBLE,
329  10.0)
330  # Limit controller to generate very slow velocities
331  # Note assumes nav2_dwb_controller dwb_core::DWBLocalPlanner
332  robot_tester.setControllerParam(
333  'FollowPath.max_vel_x',
334  Parameter.Type.DOUBLE,
335  0.0001)
336  result = robot_tester.runNavigateAction(
337  goal_pose=reasonable_pose,
338  behavior_tree='',
339  expected_error_code=105,
340  expected_error_msg=('Failed to make progress'))
341 
342  # Add more tests here if desired
343  if result:
344  robot_tester.info_msg('Test PASSED')
345  else:
346  robot_tester.error_msg('Test FAILED')
347 
348  return result
349 
350 
351 def fwd_pose(x: float = 0.0, y: float = 0.0, z: float = 0.01) -> Pose:
352  initial_pose = Pose()
353  initial_pose.position.x = x
354  initial_pose.position.y = y
355  initial_pose.position.z = z
356  initial_pose.orientation.x = 0.0
357  initial_pose.orientation.y = 0.0
358  initial_pose.orientation.z = 0.0
359  initial_pose.orientation.w = 1.0
360  return initial_pose
361 
362 
363 def get_testers(args: argparse.Namespace) -> list[NavTester]:
364  testers = []
365 
366  init_x, init_y, final_x, final_y = args.robot[0]
367  tester = NavTester(
368  initial_pose=fwd_pose(float(init_x), float(init_y)),
369  goal_pose=fwd_pose(float(final_x), float(final_y)),
370  )
371  tester.info_msg(
372  'Starting tester, robot going from '
373  + init_x
374  + ', '
375  + init_y
376  + ' to '
377  + final_x
378  + ', '
379  + final_y
380  + '.'
381  )
382  testers.append(tester)
383  return testers
384 
385 
386 def main(argv: list[str] = sys.argv[1:]): # type: ignore[no-untyped-def]
387  # The robot(s) positions from the input arguments
388  parser = argparse.ArgumentParser(description='System-level navigation tester node')
389  group = parser.add_mutually_exclusive_group(required=True)
390  group.add_argument(
391  '-r',
392  '--robot',
393  action='append',
394  nargs=4,
395  metavar=('init_x', 'init_y', 'final_x', 'final_y'),
396  help='The robot starting and final positions.',
397  )
398 
399  args, _ = parser.parse_known_args()
400 
401  rclpy.init()
402 
403  # Create testers for each robot
404  testers = get_testers(args)
405 
406  # wait a few seconds to make sure entire stacks are up
407  time.sleep(10)
408 
409  passed = False
410  for tester in testers:
411  passed = run_all_tests(tester)
412  if not passed:
413  break
414 
415  for tester in testers:
416  # stop and shutdown the nav stack to exit cleanly
417  tester.shutdown()
418 
419  testers[0].info_msg('Done Shutting Down.')
420 
421  if not passed:
422  testers[0].info_msg('Exiting failed')
423  exit(1)
424  else:
425  testers[0].info_msg('Exiting passed')
426  exit(0)
427 
428 
429 if __name__ == '__main__':
430  main()