Nav2 Navigation Stack - kilted  kilted
ROS 2 Navigation Stack
nav_through_poses_tester_node.py
1 #! /usr/bin/env python3
2 # Copyright 2018 Intel Corporation.
3 # Copyright 2020 Florian Gramss
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 
17 import argparse
18 import sys
19 import time
20 from typing import Optional
21 
22 from action_msgs.msg import GoalStatus
23 from geometry_msgs.msg import Pose, PoseStamped, PoseWithCovarianceStamped
24 from lifecycle_msgs.srv import GetState
25 from nav2_msgs.action import NavigateThroughPoses
26 from nav2_msgs.srv import ManageLifecycleNodes
27 import rclpy
28 from rclpy.action import ActionClient # type: ignore[attr-defined]
29 from rclpy.node import Node
30 from rclpy.qos import QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy
31 
32 
33 class NavTester(Node):
34 
35  def __init__(self, initial_pose: Pose, goal_pose: Pose, namespace: str = ''):
36  super().__init__(node_name='nav2_tester', namespace=namespace)
37  self.initial_pose_pubinitial_pose_pub = self.create_publisher(
38  PoseWithCovarianceStamped, 'initialpose', 10
39  )
40 
41  pose_qos = QoSProfile(
42  durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
43  reliability=QoSReliabilityPolicy.RELIABLE,
44  history=QoSHistoryPolicy.KEEP_LAST,
45  depth=1,
46  )
47 
48  self.model_pose_submodel_pose_sub = self.create_subscription(
49  PoseWithCovarianceStamped, 'amcl_pose', self.poseCallbackposeCallback, pose_qos
50  )
51  self.initial_pose_receivedinitial_pose_received = False
52  self.initial_poseinitial_pose = initial_pose
53  self.goal_posegoal_pose = goal_pose
54  self.action_clientaction_client = ActionClient(
55  self, NavigateThroughPoses, 'navigate_through_poses'
56  )
57 
58  def info_msg(self, msg: str) -> None:
59  self.get_logger().info('\033[1;37;44m' + msg + '\033[0m')
60 
61  def warn_msg(self, msg: str) -> None:
62  self.get_logger().warn('\033[1;37;43m' + msg + '\033[0m')
63 
64  def error_msg(self, msg: str) -> None:
65  self.get_logger().error('\033[1;37;41m' + msg + '\033[0m')
66 
67  def setInitialPose(self) -> None:
68  msg = PoseWithCovarianceStamped()
69  msg.pose.pose = self.initial_poseinitial_pose
70  msg.header.frame_id = 'map'
71  self.info_msginfo_msg('Publishing Initial Pose')
72  self.initial_pose_pubinitial_pose_pub.publish(msg)
73  self.currentPosecurrentPose = self.initial_poseinitial_pose
74 
75  def getStampedPoseMsg(self, pose: Pose) -> PoseStamped:
76  msg = PoseStamped()
77  msg.header.frame_id = 'map'
78  msg.pose = pose
79  return msg
80 
81  def runNavigateAction(self, goal_pose: Optional[Pose] = None) -> bool:
82  # Sends a `NavToPose` action request and waits for completion
83  self.info_msginfo_msg("Waiting for 'NavigateThroughPoses' action server")
84  while not self.action_clientaction_client.wait_for_server(timeout_sec=1.0):
85  self.info_msginfo_msg(
86  "'NavigateThroughPoses' action server not available, waiting..."
87  )
88 
89  self.goal_posegoal_pose = goal_pose if goal_pose is not None else self.goal_posegoal_pose
90  goal_msg = NavigateThroughPoses.Goal()
91  goal_msg.poses.header.frame_id = 'map'
92  goal_msg.poses.header.stamp = self.get_clock().now().to_msg()
93  goal_msg.poses.goals = [
94  self.getStampedPoseMsggetStampedPoseMsg(self.goal_posegoal_pose),
95  self.getStampedPoseMsggetStampedPoseMsg(self.goal_posegoal_pose),
96  ]
97 
98  self.info_msginfo_msg('Sending goal request...')
99  send_goal_future = self.action_clientaction_client.send_goal_async(goal_msg)
100 
101  rclpy.spin_until_future_complete(self, send_goal_future)
102  goal_handle = send_goal_future.result()
103 
104  if not goal_handle or not goal_handle.accepted:
105  self.error_msgerror_msg('Goal rejected')
106  return False
107 
108  self.info_msginfo_msg('Goal accepted')
109  get_result_future = goal_handle.get_result_async()
110 
111  self.info_msginfo_msg("Waiting for 'NavigateToPose' action to complete")
112  rclpy.spin_until_future_complete(self, get_result_future)
113  status = get_result_future.result().status # type: ignore[union-attr]
114  if status != GoalStatus.STATUS_SUCCEEDED:
115  result = get_result_future.result().result # type: ignore[union-attr]
116  self.info_msginfo_msg(f'Goal failed with status code: {status}'
117  f' error code:{result.error_code}'
118  f' error msg:{result.error_msg}')
119  return False
120 
121  self.info_msginfo_msg('Goal succeeded!')
122  return True
123 
124  def runFakeNavigateAction(self) -> bool:
125  # Sends a `NavToPose` action request and waits for completion
126  self.info_msginfo_msg("Waiting for 'NavigateThroughPoses' action server")
127  while not self.action_clientaction_client.wait_for_server(timeout_sec=1.0):
128  self.info_msginfo_msg(
129  "'NavigateThroughPoses' action server not available, waiting..."
130  )
131 
132  goal_msg = NavigateThroughPoses.Goal()
133 
134  self.info_msginfo_msg('Sending goal request...')
135  send_goal_future = self.action_clientaction_client.send_goal_async(goal_msg)
136 
137  rclpy.spin_until_future_complete(self, send_goal_future)
138  goal_handle = send_goal_future.result()
139 
140  if not goal_handle or not goal_handle.accepted:
141  self.error_msgerror_msg('Goal rejected')
142  return False
143 
144  self.info_msginfo_msg('Goal accepted')
145  get_result_future = goal_handle.get_result_async()
146 
147  self.info_msginfo_msg("Waiting for 'NavigateToPose' action to complete")
148  rclpy.spin_until_future_complete(self, get_result_future)
149  status = get_result_future.result().status # type: ignore[union-attr]
150  if status != GoalStatus.STATUS_SUCCEEDED:
151  result = get_result_future.result().result # type: ignore[union-attr]
152  self.info_msginfo_msg(f'Goal failed with status code: {status}'
153  f' error code:{result.error_code}'
154  f' error msg:{result.error_msg}')
155  return False
156 
157  self.info_msginfo_msg('Goal succeeded!')
158  return True
159 
160  def runNavigatePreemptionAction(self, block: bool) -> bool:
161  # Sends a `NavToPose` action request and waits for completion
162  self.info_msginfo_msg("Waiting for 'NavigateThroughPoses' action server")
163  while not self.action_clientaction_client.wait_for_server(timeout_sec=1.0):
164  self.info_msginfo_msg(
165  "'NavigateThroughPoses' action server not available, waiting..."
166  )
167 
168  goal_msg = NavigateThroughPoses.Goal()
169  goal_msg.poses.goals = [self.getStampedPoseMsggetStampedPoseMsg(self.initial_poseinitial_pose)]
170 
171  self.info_msginfo_msg('Sending goal request...')
172  send_goal_future = self.action_clientaction_client.send_goal_async(goal_msg)
173 
174  rclpy.spin_until_future_complete(self, send_goal_future)
175  goal_handle = send_goal_future.result()
176 
177  if not goal_handle or not goal_handle.accepted:
178  self.error_msgerror_msg('Goal rejected')
179  return False
180 
181  if not block:
182  return True
183 
184  self.info_msginfo_msg('Goal accepted')
185  get_result_future = goal_handle.get_result_async()
186 
187  self.info_msginfo_msg("Waiting for 'NavigateToPose' action to complete")
188  rclpy.spin_until_future_complete(self, get_result_future)
189  status = get_result_future.result().status # type: ignore[union-attr]
190  if status != GoalStatus.STATUS_SUCCEEDED:
191  result = get_result_future.result().result # type: ignore[union-attr]
192  self.info_msginfo_msg(f'Goal failed with status code: {status}'
193  f' error code:{result.error_code}'
194  f' error msg:{result.error_msg}')
195  return False
196 
197  self.info_msginfo_msg('Goal succeeded!')
198  return True
199 
200  def poseCallback(self, msg: PoseWithCovarianceStamped) -> None:
201  self.info_msginfo_msg('Received amcl_pose')
202  self.current_posecurrent_pose = msg.pose.pose
203  self.initial_pose_receivedinitial_pose_received = True
204 
205  def wait_for_node_active(self, node_name: str) -> None:
206  # Waits for the node within the tester namespace to become active
207  self.info_msginfo_msg(f'Waiting for {node_name} to become active')
208  node_service = f'{node_name}/get_state'
209  state_client = self.create_client(GetState, node_service)
210  while not state_client.wait_for_service(timeout_sec=1.0):
211  self.info_msginfo_msg(f'{node_service} service not available, waiting...')
212  req = GetState.Request() # empty request
213  state = 'UNKNOWN'
214  while state != 'active':
215  self.info_msginfo_msg(f'Getting {node_name} state...')
216  future = state_client.call_async(req)
217  rclpy.spin_until_future_complete(self, future)
218  if future.result() is not None:
219  state = future.result().current_state.label # type: ignore[union-attr]
220  self.info_msginfo_msg(f'Result of get_state: {state}')
221  else:
222  self.error_msgerror_msg(
223  f'Exception while calling service: {future.exception()!r}'
224  )
225  time.sleep(5)
226 
227  def shutdown(self) -> None:
228  self.info_msginfo_msg('Shutting down')
229  self.action_clientaction_client.destroy()
230 
231  transition_service = 'lifecycle_manager_navigation/manage_nodes'
232  mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
233  while not mgr_client.wait_for_service(timeout_sec=1.0):
234  self.info_msginfo_msg(f'{transition_service} service not available, waiting...')
235 
236  req = ManageLifecycleNodes.Request()
237  req.command = ManageLifecycleNodes.Request().SHUTDOWN
238  future = mgr_client.call_async(req)
239  try:
240  self.info_msginfo_msg('Shutting down navigation lifecycle manager...')
241  rclpy.spin_until_future_complete(self, future)
242  future.result()
243  self.info_msginfo_msg('Shutting down navigation lifecycle manager complete.')
244  except Exception as e: # noqa: B902
245  self.error_msgerror_msg(f'Service call failed {e!r}')
246  transition_service = 'lifecycle_manager_localization/manage_nodes'
247  mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
248  while not mgr_client.wait_for_service(timeout_sec=1.0):
249  self.info_msginfo_msg(f'{transition_service} service not available, waiting...')
250 
251  req = ManageLifecycleNodes.Request()
252  req.command = ManageLifecycleNodes.Request().SHUTDOWN
253  future = mgr_client.call_async(req)
254  try:
255  self.info_msginfo_msg('Shutting down localization lifecycle manager...')
256  rclpy.spin_until_future_complete(self, future)
257  future.result()
258  self.info_msginfo_msg('Shutting down localization lifecycle manager complete')
259  except Exception as e: # noqa: B902
260  self.error_msgerror_msg(f'Service call failed {e!r}')
261 
262  def wait_for_initial_pose(self) -> None:
263  self.initial_pose_receivedinitial_pose_received = False
264  while not self.initial_pose_receivedinitial_pose_received:
265  self.info_msginfo_msg('Setting initial pose')
266  self.setInitialPosesetInitialPose()
267  self.info_msginfo_msg('Waiting for amcl_pose to be received')
268  rclpy.spin_once(self, timeout_sec=1)
269 
270 
271 def run_all_tests(robot_tester: NavTester) -> bool:
272  # set transforms to use_sim_time
273  result = True
274  if result:
275  robot_tester.wait_for_node_active('amcl')
276  robot_tester.wait_for_initial_pose()
277  robot_tester.wait_for_node_active('bt_navigator')
278  result = robot_tester.runNavigateAction()
279  # Test empty navigation request
280  result = result and not robot_tester.runFakeNavigateAction()
281  # Test preempting NHP with the same goal
282  result = result and robot_tester.runNavigatePreemptionAction(False)
283  result = result and robot_tester.runNavigatePreemptionAction(True)
284 
285  # Add more tests here if desired
286 
287  if result:
288  robot_tester.info_msg('Test PASSED')
289  else:
290  robot_tester.error_msg('Test FAILED')
291 
292  return result
293 
294 
295 def fwd_pose(x: float = 0.0, y: float = 0.0, z: float = 0.01) -> Pose:
296  initial_pose = Pose()
297  initial_pose.position.x = x
298  initial_pose.position.y = y
299  initial_pose.position.z = z
300  initial_pose.orientation.x = 0.0
301  initial_pose.orientation.y = 0.0
302  initial_pose.orientation.z = 0.0
303  initial_pose.orientation.w = 1.0
304  return initial_pose
305 
306 
307 def get_testers(args: argparse.Namespace) -> list[NavTester]:
308  testers = []
309 
310  init_x, init_y, final_x, final_y = args.robot[0]
311  tester = NavTester(
312  initial_pose=fwd_pose(float(init_x), float(init_y)),
313  goal_pose=fwd_pose(float(final_x), float(final_y)),
314  )
315  tester.info_msg(
316  'Starting tester, robot going from '
317  + init_x
318  + ', '
319  + init_y
320  + ' to '
321  + final_x
322  + ', '
323  + final_y
324  + '.'
325  )
326  testers.append(tester)
327  return testers
328 
329 
330 def main(argv: list[str] = sys.argv[1:]): # type: ignore[no-untyped-def]
331  # The robot(s) positions from the input arguments
332  parser = argparse.ArgumentParser(description='System-level navigation tester node')
333  group = parser.add_mutually_exclusive_group(required=True)
334  group.add_argument(
335  '-r',
336  '--robot',
337  action='append',
338  nargs=4,
339  metavar=('init_x', 'init_y', 'final_x', 'final_y'),
340  help='The robot starting and final positions.',
341  )
342 
343  args, unknown = parser.parse_known_args()
344 
345  rclpy.init()
346 
347  # Create testers for each robot
348  testers = get_testers(args)
349 
350  # wait a few seconds to make sure entire stacks are up
351  time.sleep(10)
352 
353  for tester in testers:
354  passed = run_all_tests(tester)
355  if not passed:
356  break
357 
358  for tester in testers:
359  # stop and shutdown the nav stack to exit cleanly
360  tester.shutdown()
361 
362  testers[0].info_msg('Done Shutting Down.')
363 
364  if not passed:
365  testers[0].info_msg('Exiting failed')
366  exit(1)
367  else:
368  testers[0].info_msg('Exiting passed')
369  exit(0)
370 
371 
372 if __name__ == '__main__':
373  main()
None poseCallback(self, PoseWithCovarianceStamped msg)