Nav2 Navigation Stack - rolling  main
ROS 2 Navigation Stack
nav_through_poses_tester_node.py
1 #! /usr/bin/env python3
2 # Copyright 2018 Intel Corporation.
3 # Copyright 2020 Florian Gramss
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 
17 import argparse
18 import sys
19 import time
20 from typing import Optional
21 
22 from action_msgs.msg import GoalStatus
23 from geometry_msgs.msg import Pose, PoseStamped, PoseWithCovarianceStamped
24 from lifecycle_msgs.srv import GetState
25 from nav2_msgs.action import NavigateThroughPoses
26 from nav2_msgs.srv import ManageLifecycleNodes
27 import rclpy
28 from rclpy.action import ActionClient
29 from rclpy.client import Client
30 from rclpy.node import Node
31 from rclpy.qos import QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy
32 
33 
34 class NavTester(Node):
35 
36  def __init__(self, initial_pose: Pose, goal_pose: Pose, namespace: str = ''):
37  super().__init__(node_name='nav2_tester', namespace=namespace)
38  self.initial_pose_pubinitial_pose_pub = self.create_publisher(
39  PoseWithCovarianceStamped, 'initialpose', 10
40  )
41 
42  pose_qos = QoSProfile(
43  durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
44  reliability=QoSReliabilityPolicy.RELIABLE,
45  history=QoSHistoryPolicy.KEEP_LAST,
46  depth=1,
47  )
48 
49  self.model_pose_submodel_pose_sub = self.create_subscription(
50  PoseWithCovarianceStamped, 'amcl_pose', self.poseCallbackposeCallback, pose_qos
51  )
52  self.initial_pose_receivedinitial_pose_received = False
53  self.initial_poseinitial_pose = initial_pose
54  self.goal_posegoal_pose = goal_pose
55  self.action_client: ActionClient[
56  NavigateThroughPoses.Goal,
57  NavigateThroughPoses.Result,
58  NavigateThroughPoses.Feedback
59  ] = ActionClient(self, NavigateThroughPoses, 'navigate_through_poses')
60 
61  def info_msg(self, msg: str) -> None:
62  self.get_logger().info('\033[1;37;44m' + msg + '\033[0m')
63 
64  def warn_msg(self, msg: str) -> None:
65  self.get_logger().warning('\033[1;37;43m' + msg + '\033[0m')
66 
67  def error_msg(self, msg: str) -> None:
68  self.get_logger().error('\033[1;37;41m' + msg + '\033[0m')
69 
70  def setInitialPose(self) -> None:
71  msg = PoseWithCovarianceStamped()
72  msg.pose.pose = self.initial_poseinitial_pose
73  msg.header.frame_id = 'map'
74  self.info_msginfo_msg('Publishing Initial Pose')
75  self.initial_pose_pubinitial_pose_pub.publish(msg)
76  self.currentPosecurrentPose = self.initial_poseinitial_pose
77 
78  def getStampedPoseMsg(self, pose: Pose) -> PoseStamped:
79  msg = PoseStamped()
80  msg.header.frame_id = 'map'
81  msg.pose = pose
82  return msg
83 
84  def runNavigateAction(self, goal_pose: Optional[Pose] = None) -> bool:
85  # Sends a `NavToPose` action request and waits for completion
86  self.info_msginfo_msg("Waiting for 'NavigateThroughPoses' action server")
87  while not self.action_client.wait_for_server(timeout_sec=1.0):
88  self.info_msginfo_msg(
89  "'NavigateThroughPoses' action server not available, waiting..."
90  )
91 
92  self.goal_posegoal_pose = goal_pose if goal_pose is not None else self.goal_posegoal_pose
93  goal_msg = NavigateThroughPoses.Goal()
94  goal_msg.poses.header.frame_id = 'map'
95  goal_msg.poses.header.stamp = self.get_clock().now().to_msg()
96  goal_msg.poses.goals = [
97  self.getStampedPoseMsggetStampedPoseMsg(self.goal_posegoal_pose),
98  self.getStampedPoseMsggetStampedPoseMsg(self.goal_posegoal_pose),
99  ]
100 
101  self.info_msginfo_msg('Sending goal request...')
102  send_goal_future = self.action_client.send_goal_async(goal_msg)
103 
104  rclpy.spin_until_future_complete(self, send_goal_future)
105  goal_handle = send_goal_future.result()
106 
107  if not goal_handle or not goal_handle.accepted:
108  self.error_msgerror_msg('Goal rejected')
109  return False
110 
111  self.info_msginfo_msg('Goal accepted')
112  get_result_future = goal_handle.get_result_async()
113 
114  self.info_msginfo_msg("Waiting for 'NavigateToPose' action to complete")
115  rclpy.spin_until_future_complete(self, get_result_future)
116  status = get_result_future.result().status # type: ignore[union-attr]
117  if status != GoalStatus.STATUS_SUCCEEDED:
118  result = get_result_future.result().result # type: ignore[union-attr]
119  self.info_msginfo_msg(f'Goal failed with status code: {status}'
120  f' error code:{result.error_code}'
121  f' error msg:{result.error_msg}')
122  return False
123 
124  self.info_msginfo_msg('Goal succeeded!')
125  return True
126 
127  def runFakeNavigateAction(self) -> bool:
128  # Sends a `NavToPose` action request and waits for completion
129  self.info_msginfo_msg("Waiting for 'NavigateThroughPoses' action server")
130  while not self.action_client.wait_for_server(timeout_sec=1.0):
131  self.info_msginfo_msg(
132  "'NavigateThroughPoses' action server not available, waiting..."
133  )
134 
135  goal_msg = NavigateThroughPoses.Goal()
136 
137  self.info_msginfo_msg('Sending goal request...')
138  send_goal_future = self.action_client.send_goal_async(goal_msg)
139 
140  rclpy.spin_until_future_complete(self, send_goal_future)
141  goal_handle = send_goal_future.result()
142 
143  if not goal_handle or not goal_handle.accepted:
144  self.error_msgerror_msg('Goal rejected')
145  return False
146 
147  self.info_msginfo_msg('Goal accepted')
148  get_result_future = goal_handle.get_result_async()
149 
150  self.info_msginfo_msg("Waiting for 'NavigateToPose' action to complete")
151  rclpy.spin_until_future_complete(self, get_result_future)
152  status = get_result_future.result().status # type: ignore[union-attr]
153  if status != GoalStatus.STATUS_SUCCEEDED:
154  result = get_result_future.result().result # type: ignore[union-attr]
155  self.info_msginfo_msg(f'Goal failed with status code: {status}'
156  f' error code:{result.error_code}'
157  f' error msg:{result.error_msg}')
158  return False
159 
160  self.info_msginfo_msg('Goal succeeded!')
161  return True
162 
163  def runNavigatePreemptionAction(self, block: bool) -> bool:
164  # Sends a `NavToPose` action request and waits for completion
165  self.info_msginfo_msg("Waiting for 'NavigateThroughPoses' action server")
166  while not self.action_client.wait_for_server(timeout_sec=1.0):
167  self.info_msginfo_msg(
168  "'NavigateThroughPoses' action server not available, waiting..."
169  )
170 
171  goal_msg = NavigateThroughPoses.Goal()
172  goal_msg.poses.goals = [self.getStampedPoseMsggetStampedPoseMsg(self.initial_poseinitial_pose)]
173 
174  self.info_msginfo_msg('Sending goal request...')
175  send_goal_future = self.action_client.send_goal_async(goal_msg)
176 
177  rclpy.spin_until_future_complete(self, send_goal_future)
178  goal_handle = send_goal_future.result()
179 
180  if not goal_handle or not goal_handle.accepted:
181  self.error_msgerror_msg('Goal rejected')
182  return False
183 
184  if not block:
185  return True
186 
187  self.info_msginfo_msg('Goal accepted')
188  get_result_future = goal_handle.get_result_async()
189 
190  self.info_msginfo_msg("Waiting for 'NavigateToPose' action to complete")
191  rclpy.spin_until_future_complete(self, get_result_future)
192  status = get_result_future.result().status # type: ignore[union-attr]
193  if status != GoalStatus.STATUS_SUCCEEDED:
194  result = get_result_future.result().result # type: ignore[union-attr]
195  self.info_msginfo_msg(f'Goal failed with status code: {status}'
196  f' error code:{result.error_code}'
197  f' error msg:{result.error_msg}')
198  return False
199 
200  self.info_msginfo_msg('Goal succeeded!')
201  return True
202 
203  def poseCallback(self, msg: PoseWithCovarianceStamped) -> None:
204  self.info_msginfo_msg('Received amcl_pose')
205  self.current_posecurrent_pose = msg.pose.pose
206  self.initial_pose_receivedinitial_pose_received = True
207 
208  def wait_for_node_active(self, node_name: str) -> None:
209  # Waits for the node within the tester namespace to become active
210  self.info_msginfo_msg(f'Waiting for {node_name} to become active')
211  node_service = f'{node_name}/get_state'
212  state_client: Client[GetState.Request, GetState.Response] = \
213  self.create_client(GetState, node_service)
214  while not state_client.wait_for_service(timeout_sec=1.0):
215  self.info_msginfo_msg(f'{node_service} service not available, waiting...')
216  req = GetState.Request() # empty request
217  state = 'UNKNOWN'
218  while state != 'active':
219  self.info_msginfo_msg(f'Getting {node_name} state...')
220  future = state_client.call_async(req)
221  rclpy.spin_until_future_complete(self, future)
222  if future.result() is not None:
223  state = future.result().current_state.label # type: ignore[union-attr]
224  self.info_msginfo_msg(f'Result of get_state: {state}')
225  else:
226  self.error_msgerror_msg(
227  f'Exception while calling service: {future.exception()!r}'
228  )
229  time.sleep(5)
230 
231  def shutdown(self) -> None:
232  self.info_msginfo_msg('Shutting down')
233  self.action_client.destroy()
234 
235  transition_service = 'lifecycle_manager_navigation/manage_nodes'
236  mgr_client: Client[ManageLifecycleNodes.Request, ManageLifecycleNodes.Response] = \
237  self.create_client(ManageLifecycleNodes, transition_service)
238  while not mgr_client.wait_for_service(timeout_sec=1.0):
239  self.info_msginfo_msg(f'{transition_service} service not available, waiting...')
240 
241  req = ManageLifecycleNodes.Request()
242  req.command = ManageLifecycleNodes.Request().SHUTDOWN
243  future = mgr_client.call_async(req)
244  try:
245  self.info_msginfo_msg('Shutting down navigation lifecycle manager...')
246  rclpy.spin_until_future_complete(self, future)
247  future.result()
248  self.info_msginfo_msg('Shutting down navigation lifecycle manager complete.')
249  except Exception as e: # noqa: B902
250  self.error_msgerror_msg(f'Service call failed {e!r}')
251  transition_service = 'lifecycle_manager_localization/manage_nodes'
252  mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
253  while not mgr_client.wait_for_service(timeout_sec=1.0):
254  self.info_msginfo_msg(f'{transition_service} service not available, waiting...')
255 
256  req = ManageLifecycleNodes.Request()
257  req.command = ManageLifecycleNodes.Request().SHUTDOWN
258  future = mgr_client.call_async(req)
259  try:
260  self.info_msginfo_msg('Shutting down localization lifecycle manager...')
261  rclpy.spin_until_future_complete(self, future)
262  future.result()
263  self.info_msginfo_msg('Shutting down localization lifecycle manager complete')
264  except Exception as e: # noqa: B902
265  self.error_msgerror_msg(f'Service call failed {e!r}')
266 
267  def wait_for_initial_pose(self) -> None:
268  self.initial_pose_receivedinitial_pose_received = False
269  while not self.initial_pose_receivedinitial_pose_received:
270  self.info_msginfo_msg('Setting initial pose')
271  self.setInitialPosesetInitialPose()
272  self.info_msginfo_msg('Waiting for amcl_pose to be received')
273  rclpy.spin_once(self, timeout_sec=1)
274 
275 
276 def run_all_tests(robot_tester: NavTester) -> bool:
277  # set transforms to use_sim_time
278  result = True
279  if result:
280  robot_tester.wait_for_node_active('amcl')
281  robot_tester.wait_for_initial_pose()
282  robot_tester.wait_for_node_active('bt_navigator')
283  result = robot_tester.runNavigateAction()
284  # Test empty navigation request
285  result = result and not robot_tester.runFakeNavigateAction()
286  # Test preempting NHP with the same goal
287  result = result and robot_tester.runNavigatePreemptionAction(False)
288  result = result and robot_tester.runNavigatePreemptionAction(True)
289 
290  # Add more tests here if desired
291 
292  if result:
293  robot_tester.info_msg('Test PASSED')
294  else:
295  robot_tester.error_msg('Test FAILED')
296 
297  return result
298 
299 
300 def fwd_pose(x: float = 0.0, y: float = 0.0, z: float = 0.01) -> Pose:
301  initial_pose = Pose()
302  initial_pose.position.x = x
303  initial_pose.position.y = y
304  initial_pose.position.z = z
305  initial_pose.orientation.x = 0.0
306  initial_pose.orientation.y = 0.0
307  initial_pose.orientation.z = 0.0
308  initial_pose.orientation.w = 1.0
309  return initial_pose
310 
311 
312 def get_testers(args: argparse.Namespace) -> list[NavTester]:
313  testers = []
314 
315  init_x, init_y, final_x, final_y = args.robot[0]
316  tester = NavTester(
317  initial_pose=fwd_pose(float(init_x), float(init_y)),
318  goal_pose=fwd_pose(float(final_x), float(final_y)),
319  )
320  tester.info_msg(
321  'Starting tester, robot going from '
322  + init_x
323  + ', '
324  + init_y
325  + ' to '
326  + final_x
327  + ', '
328  + final_y
329  + '.'
330  )
331  testers.append(tester)
332  return testers
333 
334 
335 def main(argv: list[str] = sys.argv[1:]): # type: ignore[no-untyped-def]
336  # The robot(s) positions from the input arguments
337  parser = argparse.ArgumentParser(description='System-level navigation tester node')
338  group = parser.add_mutually_exclusive_group(required=True)
339  group.add_argument(
340  '-r',
341  '--robot',
342  action='append',
343  nargs=4,
344  metavar=('init_x', 'init_y', 'final_x', 'final_y'),
345  help='The robot starting and final positions.',
346  )
347 
348  args, unknown = parser.parse_known_args()
349 
350  rclpy.init()
351 
352  # Create testers for each robot
353  testers = get_testers(args)
354 
355  # wait a few seconds to make sure entire stacks are up
356  time.sleep(10)
357 
358  for tester in testers:
359  passed = run_all_tests(tester)
360  if not passed:
361  break
362 
363  for tester in testers:
364  # stop and shutdown the nav stack to exit cleanly
365  tester.shutdown()
366 
367  testers[0].info_msg('Done Shutting Down.')
368 
369  if not passed:
370  testers[0].info_msg('Exiting failed')
371  exit(1)
372  else:
373  testers[0].info_msg('Exiting passed')
374  exit(0)
375 
376 
377 if __name__ == '__main__':
378  main()
None poseCallback(self, PoseWithCovarianceStamped msg)