Nav2 Navigation Stack - kilted  kilted
ROS 2 Navigation Stack
nav_through_poses_tester_error_msg_node.py
1 #! /usr/bin/env python3
2 # Copyright 2018 Intel Corporation.
3 # Copyright 2020 Florian Gramss
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 
17 import argparse
18 import sys
19 import time
20 from typing import Optional
21 
22 from action_msgs.msg import GoalStatus
23 from geometry_msgs.msg import Pose, PoseStamped, PoseWithCovarianceStamped
24 from lifecycle_msgs.srv import GetState
25 from nav2_msgs.action import NavigateThroughPoses
26 from nav2_msgs.srv import ManageLifecycleNodes
27 from rcl_interfaces.srv import SetParameters
28 import rclpy
29 from rclpy.action.client import ActionClient
30 from rclpy.node import Node
31 from rclpy.parameter import Parameter
32 from rclpy.qos import QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy
33 from std_msgs.msg import String
34 
35 
36 class NavTester(Node):
37 
38  def __init__(self, initial_pose: Pose, goal_pose: Pose, namespace: str = ''):
39  super().__init__(node_name='nav2_tester', namespace=namespace)
40  self.initial_pose_pubinitial_pose_pub = self.create_publisher(
41  PoseWithCovarianceStamped, 'initialpose', 10
42  )
43 
44  checker_qos = QoSProfile(
45  durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
46  reliability=QoSReliabilityPolicy.RELIABLE,
47  history=QoSHistoryPolicy.KEEP_LAST,
48  depth=1,
49  )
50 
51  self.goal_checker_selector_pubgoal_checker_selector_pub = self.create_publisher(
52  String, 'goal_checker_selector', checker_qos)
53 
54  self.progress_checker_selector_pubprogress_checker_selector_pub = self.create_publisher(
55  String, 'progress_checker_selector', checker_qos)
56 
57  pose_qos = QoSProfile(
58  durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
59  reliability=QoSReliabilityPolicy.RELIABLE,
60  history=QoSHistoryPolicy.KEEP_LAST,
61  depth=1,
62  )
63 
64  self.model_pose_submodel_pose_sub = self.create_subscription(
65  PoseWithCovarianceStamped, 'amcl_pose', self.poseCallbackposeCallback, pose_qos
66  )
67  self.initial_pose_receivedinitial_pose_received = False
68  self.initial_poseinitial_pose = initial_pose
69  self.goal_posegoal_pose = goal_pose
70  self.action_clientaction_client = ActionClient(
71  self, NavigateThroughPoses, 'navigate_through_poses'
72  )
73 
74  self.controller_param_clicontroller_param_cli = self.create_client(
75  SetParameters, '/controller_server/set_parameters'
76  )
77 
78  def info_msg(self, msg: str) -> None:
79  self.get_logger().info('\033[1;37;44m' + msg + '\033[0m')
80 
81  def warn_msg(self, msg: str) -> None:
82  self.get_logger().warning('\033[1;37;43m' + msg + '\033[0m')
83 
84  def error_msg(self, msg: str) -> None:
85  self.get_logger().error('\033[1;37;41m' + msg + '\033[0m')
86 
87  def setInitialPose(self) -> None:
88  msg = PoseWithCovarianceStamped()
89  msg.pose.pose = self.initial_poseinitial_pose
90  msg.header.frame_id = 'map'
91  self.info_msginfo_msg('Publishing Initial Pose')
92  self.initial_pose_pubinitial_pose_pub.publish(msg)
93  self.currentPosecurrentPose = self.initial_poseinitial_pose
94 
95  def setGoalChecker(self, name: str) -> None:
96  msg = String()
97  msg.data = name
98  self.goal_checker_selector_pubgoal_checker_selector_pub.publish(msg)
99 
100  def setProgressChecker(self, name: str) -> None:
101  msg = String()
102  msg.data = name
103  self.progress_checker_selector_pubprogress_checker_selector_pub.publish(msg)
104 
105  def setControllerParam(self, name: str, parameter_type: Parameter.Type, value: float) -> None:
106  req = SetParameters.Request()
107  req.parameters = [
108  Parameter(name, parameter_type, value).to_parameter_msg()
109  ]
110  future = self.controller_param_clicontroller_param_cli.call_async(req)
111  rclpy.spin_until_future_complete(self, future)
112 
113  def getStampedPoseMsg(self, pose: Pose) -> PoseStamped:
114  msg = PoseStamped()
115  msg.header.frame_id = 'map'
116  msg.pose = pose
117  return msg
118 
119  def runNavigateAction(self,
120  goal_pose: Optional[Pose] = None,
121  behavior_tree: Optional[str] = None,
122  expected_error_code: Optional[int] = None,
123  expected_error_msg: Optional[str] = None) -> bool:
124  # Sends a `NavToPose` action request and waits for completion
125  self.info_msginfo_msg("Waiting for 'NavigateThroughPoses' action server")
126  while not self.action_clientaction_client.wait_for_server(timeout_sec=1.0):
127  self.info_msginfo_msg(
128  "'NavigateThroughPoses' action server not available, waiting..."
129  )
130 
131  self.goal_posegoal_pose = goal_pose if goal_pose is not None else self.goal_posegoal_pose
132  goal_msg = NavigateThroughPoses.Goal()
133  goal_msg.poses.header.frame_id = 'map'
134  goal_msg.poses.header.stamp = self.get_clock().now().to_msg()
135  goal_msg.poses.goals = [
136  self.getStampedPoseMsggetStampedPoseMsg(self.goal_posegoal_pose),
137  self.getStampedPoseMsggetStampedPoseMsg(self.goal_posegoal_pose),
138  ]
139  goal_msg.behavior_tree = behavior_tree
140 
141  self.info_msginfo_msg('Sending goal request...')
142  send_goal_future = self.action_clientaction_client.send_goal_async(goal_msg)
143 
144  rclpy.spin_until_future_complete(self, send_goal_future)
145  goal_handle = send_goal_future.result()
146 
147  if goal_handle is None or not goal_handle.accepted:
148  self.error_msgerror_msg('Goal rejected')
149  return False
150 
151  self.info_msginfo_msg('Goal accepted')
152  get_result_future = goal_handle.get_result_async()
153 
154  self.info_msginfo_msg("Waiting for 'NavigateToPose' action to complete")
155  rclpy.spin_until_future_complete(self, get_result_future)
156  status = get_result_future.result().status # type: ignore[union-attr]
157  if status != GoalStatus.STATUS_SUCCEEDED:
158  result = get_result_future.result().result # type: ignore[union-attr]
159  if (result.error_code == expected_error_code and
160  result.error_msg == expected_error_msg):
161  self.info_msginfo_msg(f'Goal failed as expected with status code: {status}'
162  f' error code:{result.error_code}'
163  f' error msg:{result.error_msg}')
164  return True
165  else:
166  self.error_msgerror_msg(f'Goal failed unexpectedly with status code: {status}'
167  f' Expected error_code:{expected_error_code},'
168  f' Got error_code:{result.error_code},'
169  f' Expected error_msg:{expected_error_msg},'
170  f' Got error_msg:{result.error_msg}')
171  return False
172 
173  self.info_msginfo_msg('Goal succeeded!')
174  return True
175 
176  def poseCallback(self, msg: PoseWithCovarianceStamped) -> None:
177  self.info_msginfo_msg('Received amcl_pose')
178  self.current_posecurrent_pose = msg.pose.pose
179  self.initial_pose_receivedinitial_pose_received = True
180 
181  def wait_for_node_active(self, node_name: str) -> None:
182  # Waits for the node within the tester namespace to become active
183  self.info_msginfo_msg(f'Waiting for {node_name} to become active')
184  node_service = f'{node_name}/get_state'
185  state_client = self.create_client(GetState, node_service)
186  while not state_client.wait_for_service(timeout_sec=1.0):
187  self.info_msginfo_msg(f'{node_service} service not available, waiting...')
188  req = GetState.Request() # empty request
189  state = 'UNKNOWN'
190  while state != 'active':
191  self.info_msginfo_msg(f'Getting {node_name} state...')
192  future = state_client.call_async(req)
193  rclpy.spin_until_future_complete(self, future)
194  if future.result() is not None:
195  state = future.result().current_state.label # type: ignore[union-attr]
196  self.info_msginfo_msg(f'Result of get_state: {state}')
197  else:
198  self.error_msgerror_msg(
199  f'Exception while calling service: {future.exception()!r}'
200  )
201  time.sleep(5)
202 
203  def shutdown(self) -> None:
204  self.info_msginfo_msg('Shutting down')
205  self.action_clientaction_client.destroy()
206 
207  transition_service = 'lifecycle_manager_navigation/manage_nodes'
208  mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
209  while not mgr_client.wait_for_service(timeout_sec=1.0):
210  self.info_msginfo_msg(f'{transition_service} service not available, waiting...')
211 
212  req = ManageLifecycleNodes.Request()
213  req.command = ManageLifecycleNodes.Request.SHUTDOWN
214  future = mgr_client.call_async(req)
215  try:
216  self.info_msginfo_msg('Shutting down navigation lifecycle manager...')
217  rclpy.spin_until_future_complete(self, future)
218  future.result()
219  self.info_msginfo_msg('Shutting down navigation lifecycle manager complete.')
220  except Exception as e: # noqa: B902
221  self.error_msgerror_msg(f'Service call failed {e!r}')
222  transition_service = 'lifecycle_manager_localization/manage_nodes'
223  mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
224  while not mgr_client.wait_for_service(timeout_sec=1.0):
225  self.info_msginfo_msg(f'{transition_service} service not available, waiting...')
226 
227  req = ManageLifecycleNodes.Request()
228  req.command = ManageLifecycleNodes.Request.SHUTDOWN
229  future = mgr_client.call_async(req)
230  try:
231  self.info_msginfo_msg('Shutting down localization lifecycle manager...')
232  rclpy.spin_until_future_complete(self, future)
233  future.result()
234  self.info_msginfo_msg('Shutting down localization lifecycle manager complete')
235  except Exception as e: # noqa: B902
236  self.error_msgerror_msg(f'Service call failed {e!r}')
237 
238  def wait_for_initial_pose(self) -> None:
239  self.initial_pose_receivedinitial_pose_received = False
240  while not self.initial_pose_receivedinitial_pose_received:
241  self.info_msginfo_msg('Setting initial pose')
242  self.setInitialPosesetInitialPose()
243  self.info_msginfo_msg('Waiting for amcl_pose to be received')
244  rclpy.spin_once(self, timeout_sec=1)
245 
246 
247 def run_all_tests(robot_tester: NavTester) -> bool:
248  pose_out_of_bounds = Pose()
249  pose_out_of_bounds.position.x = 2000.0
250  pose_out_of_bounds.position.y = 4000.0
251  pose_out_of_bounds.position.z = 0.0
252  pose_out_of_bounds.orientation.x = 0.0
253  pose_out_of_bounds.orientation.y = 0.0
254  pose_out_of_bounds.orientation.z = 0.0
255  pose_out_of_bounds.orientation.w = 1.0
256 
257  reasonable_pose = Pose()
258  reasonable_pose.position.x = -1.0
259  reasonable_pose.position.y = 0.0
260  reasonable_pose.position.z = 0.0
261  reasonable_pose.orientation.x = 0.0
262  reasonable_pose.orientation.y = 0.0
263  reasonable_pose.orientation.z = 0.0
264  reasonable_pose.orientation.w = 1.0
265 
266  robot_tester.wait_for_node_active('amcl')
267  robot_tester.wait_for_initial_pose()
268  robot_tester.wait_for_node_active('bt_navigator')
269  robot_tester.setGoalChecker('general_goal_checker')
270  robot_tester.setProgressChecker('progress_checker')
271 
272  result = True
273  if result:
274  robot_tester.info_msg('Test non existing behavior_tree xml file')
275  result = robot_tester.runNavigateAction(
276  goal_pose=pose_out_of_bounds,
277  behavior_tree='behavior_tree_that_does_not_exist.xml',
278  expected_error_code=NavigateThroughPoses.Result.FAILED_TO_LOAD_BEHAVIOR_TREE,
279  expected_error_msg=('Error loading XML file: behavior_tree_that_does_not_exist.xml. '
280  'Navigation canceled.'))
281 
282  if result:
283  robot_tester.info_msg('Test goal out of bounds')
284  result = robot_tester.runNavigateAction(
285  goal_pose=pose_out_of_bounds,
286  behavior_tree='',
287  expected_error_code=304,
288  expected_error_msg=('GridBasedplugin failed to plan from '
289  '(-2.00, -0.50) to (2000.00, 4000.00): '
290  '"Goal Coordinates of(2000.000000, 4000.000000) '
291  'was outside bounds"'))
292 
293  if result:
294  robot_tester.info_msg('Test for unknown goal checker')
295  robot_tester.setGoalChecker('junk_goal_checker')
296  result = robot_tester.runNavigateAction(
297  goal_pose=reasonable_pose,
298  behavior_tree='',
299  expected_error_code=100,
300  expected_error_msg=('Failed to find goal checker name: junk_goal_checker'))
301  robot_tester.setGoalChecker('general_goal_checker')
302 
303  if result:
304  robot_tester.info_msg('Test for unknown progress checker')
305  robot_tester.setProgressChecker('junk_progress_checker')
306  result = robot_tester.runNavigateAction(
307  goal_pose=reasonable_pose,
308  behavior_tree='',
309  expected_error_code=100,
310  expected_error_msg=('Failed to find progress checker name: junk_progress_checker'))
311  robot_tester.setProgressChecker('progress_checker')
312 
313  if result:
314  robot_tester.info_msg('Test for impossible to achieve progress parameters')
315  robot_tester.setControllerParam(
316  'progress_checker.movement_time_allowance',
317  Parameter.Type.DOUBLE,
318  0.1)
319  robot_tester.setControllerParam(
320  'progress_checker.required_movement_radius',
321  Parameter.Type.DOUBLE,
322  10.0)
323  # Limit controller to generate very slow velocities
324  # Note assumes nav2_dwb_controller dwb_core::DWBLocalPlanner
325  robot_tester.setControllerParam(
326  'FollowPath.max_vel_x',
327  Parameter.Type.DOUBLE,
328  0.0001)
329  result = robot_tester.runNavigateAction(
330  goal_pose=reasonable_pose,
331  behavior_tree='',
332  expected_error_code=105,
333  expected_error_msg=('Failed to make progress'))
334 
335  # Add more tests here if desired
336  if result:
337  robot_tester.info_msg('Test PASSED')
338  else:
339  robot_tester.error_msg('Test FAILED')
340 
341  return result
342 
343 
344 def fwd_pose(x: float = 0.0, y: float = 0.0, z: float = 0.01) -> Pose:
345  initial_pose = Pose()
346  initial_pose.position.x = x
347  initial_pose.position.y = y
348  initial_pose.position.z = z
349  initial_pose.orientation.x = 0.0
350  initial_pose.orientation.y = 0.0
351  initial_pose.orientation.z = 0.0
352  initial_pose.orientation.w = 1.0
353  return initial_pose
354 
355 
356 def get_testers(args: argparse.Namespace) -> list[NavTester]:
357  testers = []
358 
359  init_x, init_y, final_x, final_y = args.robot[0]
360  tester = NavTester(
361  initial_pose=fwd_pose(float(init_x), float(init_y)),
362  goal_pose=fwd_pose(float(final_x), float(final_y)),
363  )
364  tester.info_msg(
365  'Starting tester, robot going from '
366  + init_x
367  + ', '
368  + init_y
369  + ' to '
370  + final_x
371  + ', '
372  + final_y
373  + '.'
374  )
375  testers.append(tester)
376  return testers
377 
378 
379 def main(argv: list[str] = sys.argv[1:]): # type: ignore[no-untyped-def]
380  # The robot(s) positions from the input arguments
381  parser = argparse.ArgumentParser(description='System-level navigation tester node')
382  group = parser.add_mutually_exclusive_group(required=True)
383  group.add_argument(
384  '-r',
385  '--robot',
386  action='append',
387  nargs=4,
388  metavar=('init_x', 'init_y', 'final_x', 'final_y'),
389  help='The robot starting and final positions.',
390  )
391 
392  args, _ = parser.parse_known_args()
393 
394  rclpy.init()
395 
396  # Create testers for each robot
397  testers = get_testers(args)
398 
399  # wait a few seconds to make sure entire stacks are up
400  time.sleep(10)
401 
402  passed = False
403  for tester in testers:
404  passed = run_all_tests(tester)
405  if not passed:
406  break
407 
408  for tester in testers:
409  # stop and shutdown the nav stack to exit cleanly
410  tester.shutdown()
411 
412  testers[0].info_msg('Done Shutting Down.')
413 
414  if not passed:
415  testers[0].info_msg('Exiting failed')
416  exit(1)
417  else:
418  testers[0].info_msg('Exiting passed')
419  exit(0)
420 
421 
422 if __name__ == '__main__':
423  main()