Nav2 Navigation Stack - humble  humble
ROS 2 Navigation Stack
nav_through_poses_tester_node.py
1 #! /usr/bin/env python3
2 # Copyright 2018 Intel Corporation.
3 # Copyright 2020 Florian Gramss
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 
17 import argparse
18 import sys
19 import time
20 
21 from typing import Optional
22 
23 from action_msgs.msg import GoalStatus
24 from geometry_msgs.msg import Pose
25 from geometry_msgs.msg import PoseStamped
26 from geometry_msgs.msg import PoseWithCovarianceStamped
27 from lifecycle_msgs.srv import GetState
28 from nav2_msgs.action import NavigateThroughPoses
29 from nav2_msgs.srv import ManageLifecycleNodes
30 
31 import rclpy
32 
33 from rclpy.action import ActionClient
34 from rclpy.node import Node
35 from rclpy.qos import QoSDurabilityPolicy, QoSHistoryPolicy, QoSReliabilityPolicy
36 from rclpy.qos import QoSProfile
37 
38 
39 class NavTester(Node):
40 
41  def __init__(
42  self,
43  initial_pose: Pose,
44  goal_pose: Pose,
45  namespace: str = ''
46  ):
47  super().__init__(node_name='nav2_tester', namespace=namespace)
48  self.initial_pose_pubinitial_pose_pub = self.create_publisher(PoseWithCovarianceStamped,
49  'initialpose', 10)
50 
51  pose_qos = QoSProfile(
52  durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
53  reliability=QoSReliabilityPolicy.RELIABLE,
54  history=QoSHistoryPolicy.KEEP_LAST,
55  depth=1)
56 
57  self.model_pose_submodel_pose_sub = self.create_subscription(PoseWithCovarianceStamped,
58  'amcl_pose', self.poseCallbackposeCallback, pose_qos)
59  self.initial_pose_receivedinitial_pose_received = False
60  self.initial_poseinitial_pose = initial_pose
61  self.goal_posegoal_pose = goal_pose
62  self.action_clientaction_client = ActionClient(self, NavigateThroughPoses, 'navigate_through_poses')
63 
64  def info_msg(self, msg: str):
65  self.get_logger().info('\033[1;37;44m' + msg + '\033[0m')
66 
67  def warn_msg(self, msg: str):
68  self.get_logger().warn('\033[1;37;43m' + msg + '\033[0m')
69 
70  def error_msg(self, msg: str):
71  self.get_logger().error('\033[1;37;41m' + msg + '\033[0m')
72 
73  def setInitialPose(self):
74  msg = PoseWithCovarianceStamped()
75  msg.pose.pose = self.initial_poseinitial_pose
76  msg.header.frame_id = 'map'
77  self.info_msginfo_msg('Publishing Initial Pose')
78  self.initial_pose_pubinitial_pose_pub.publish(msg)
79  self.currentPosecurrentPose = self.initial_poseinitial_pose
80 
81  def getStampedPoseMsg(self, pose: Pose):
82  msg = PoseStamped()
83  msg.header.frame_id = 'map'
84  msg.pose = pose
85  return msg
86 
87  def runNavigateAction(self, goal_pose: Optional[Pose] = None):
88  # Sends a `NavToPose` action request and waits for completion
89  self.info_msginfo_msg("Waiting for 'NavigateThroughPoses' action server")
90  while not self.action_clientaction_client.wait_for_server(timeout_sec=1.0):
91  self.info_msginfo_msg("'NavigateThroughPoses' action server not available, waiting...")
92 
93  self.goal_posegoal_pose = goal_pose if goal_pose is not None else self.goal_posegoal_pose
94  goal_msg = NavigateThroughPoses.Goal()
95  goal_msg.poses = [self.getStampedPoseMsggetStampedPoseMsg(self.goal_posegoal_pose),
96  self.getStampedPoseMsggetStampedPoseMsg(self.goal_posegoal_pose)]
97 
98  self.info_msginfo_msg('Sending goal request...')
99  send_goal_future = self.action_clientaction_client.send_goal_async(goal_msg)
100 
101  rclpy.spin_until_future_complete(self, send_goal_future)
102  goal_handle = send_goal_future.result()
103 
104  if not goal_handle.accepted:
105  self.error_msgerror_msg('Goal rejected')
106  return False
107 
108  self.info_msginfo_msg('Goal accepted')
109  get_result_future = goal_handle.get_result_async()
110 
111  self.info_msginfo_msg("Waiting for 'NavigateToPose' action to complete")
112  rclpy.spin_until_future_complete(self, get_result_future)
113  status = get_result_future.result().status
114  if status != GoalStatus.STATUS_SUCCEEDED:
115  self.info_msginfo_msg(f'Goal failed with status code: {status}')
116  return False
117 
118  self.info_msginfo_msg('Goal succeeded!')
119  return True
120 
121  def runFakeNavigateAction(self):
122  # Sends a `NavToPose` action request and waits for completion
123  self.info_msginfo_msg("Waiting for 'NavigateThroughPoses' action server")
124  while not self.action_clientaction_client.wait_for_server(timeout_sec=1.0):
125  self.info_msginfo_msg("'NavigateThroughPoses' action server not available, waiting...")
126 
127  goal_msg = NavigateThroughPoses.Goal()
128 
129  self.info_msginfo_msg('Sending goal request...')
130  send_goal_future = self.action_clientaction_client.send_goal_async(goal_msg)
131 
132  rclpy.spin_until_future_complete(self, send_goal_future)
133  goal_handle = send_goal_future.result()
134 
135  if not goal_handle.accepted:
136  self.error_msgerror_msg('Goal rejected')
137  return False
138 
139  self.info_msginfo_msg('Goal accepted')
140  get_result_future = goal_handle.get_result_async()
141 
142  self.info_msginfo_msg("Waiting for 'NavigateToPose' action to complete")
143  rclpy.spin_until_future_complete(self, get_result_future)
144  status = get_result_future.result().status
145  if status != GoalStatus.STATUS_SUCCEEDED:
146  self.info_msginfo_msg(f'Goal failed with status code: {status}')
147  return False
148 
149  self.info_msginfo_msg('Goal succeeded!')
150  return True
151 
152  def runNavigatePreemptionAction(self, block):
153  # Sends a `NavToPose` action request and waits for completion
154  self.info_msginfo_msg("Waiting for 'NavigateThroughPoses' action server")
155  while not self.action_clientaction_client.wait_for_server(timeout_sec=1.0):
156  self.info_msginfo_msg("'NavigateThroughPoses' action server not available, waiting...")
157 
158  goal_msg = NavigateThroughPoses.Goal()
159  goal_msg.poses = [self.getStampedPoseMsggetStampedPoseMsg(self.initial_poseinitial_pose)]
160 
161  self.info_msginfo_msg('Sending goal request...')
162  send_goal_future = self.action_clientaction_client.send_goal_async(goal_msg)
163 
164  rclpy.spin_until_future_complete(self, send_goal_future)
165  goal_handle = send_goal_future.result()
166 
167  if not goal_handle.accepted:
168  self.error_msgerror_msg('Goal rejected')
169  return False
170 
171  if not block:
172  return True
173 
174  self.info_msginfo_msg('Goal accepted')
175  get_result_future = goal_handle.get_result_async()
176 
177  self.info_msginfo_msg("Waiting for 'NavigateToPose' action to complete")
178  rclpy.spin_until_future_complete(self, get_result_future)
179  status = get_result_future.result().status
180  if status != GoalStatus.STATUS_SUCCEEDED:
181  self.info_msginfo_msg(f'Goal failed with status code: {status}')
182  return False
183 
184  self.info_msginfo_msg('Goal succeeded!')
185  return True
186 
187  def poseCallback(self, msg):
188  self.info_msginfo_msg('Received amcl_pose')
189  self.current_posecurrent_pose = msg.pose.pose
190  self.initial_pose_receivedinitial_pose_received = True
191 
192  def wait_for_node_active(self, node_name: str):
193  # Waits for the node within the tester namespace to become active
194  self.info_msginfo_msg(f'Waiting for {node_name} to become active')
195  node_service = f'{node_name}/get_state'
196  state_client = self.create_client(GetState, node_service)
197  while not state_client.wait_for_service(timeout_sec=1.0):
198  self.info_msginfo_msg(f'{node_service} service not available, waiting...')
199  req = GetState.Request() # empty request
200  state = 'UNKNOWN'
201  while (state != 'active'):
202  self.info_msginfo_msg(f'Getting {node_name} state...')
203  future = state_client.call_async(req)
204  rclpy.spin_until_future_complete(self, future)
205  if future.result() is not None:
206  state = future.result().current_state.label
207  self.info_msginfo_msg(f'Result of get_state: {state}')
208  else:
209  self.error_msgerror_msg(f'Exception while calling service: {future.exception()!r}')
210  time.sleep(5)
211 
212  def shutdown(self):
213  self.info_msginfo_msg('Shutting down')
214  self.action_clientaction_client.destroy()
215 
216  transition_service = 'lifecycle_manager_navigation/manage_nodes'
217  mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
218  while not mgr_client.wait_for_service(timeout_sec=1.0):
219  self.info_msginfo_msg(f'{transition_service} service not available, waiting...')
220 
221  req = ManageLifecycleNodes.Request()
222  req.command = ManageLifecycleNodes.Request().SHUTDOWN
223  future = mgr_client.call_async(req)
224  try:
225  self.info_msginfo_msg('Shutting down navigation lifecycle manager...')
226  rclpy.spin_until_future_complete(self, future)
227  future.result()
228  self.info_msginfo_msg('Shutting down navigation lifecycle manager complete.')
229  except Exception as e: # noqa: B902
230  self.error_msgerror_msg(f'Service call failed {e!r}')
231  transition_service = 'lifecycle_manager_localization/manage_nodes'
232  mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
233  while not mgr_client.wait_for_service(timeout_sec=1.0):
234  self.info_msginfo_msg(f'{transition_service} service not available, waiting...')
235 
236  req = ManageLifecycleNodes.Request()
237  req.command = ManageLifecycleNodes.Request().SHUTDOWN
238  future = mgr_client.call_async(req)
239  try:
240  self.info_msginfo_msg('Shutting down localization lifecycle manager...')
241  rclpy.spin_until_future_complete(self, future)
242  future.result()
243  self.info_msginfo_msg('Shutting down localization lifecycle manager complete')
244  except Exception as e: # noqa: B902
245  self.error_msgerror_msg(f'Service call failed {e!r}')
246 
247  def wait_for_initial_pose(self):
248  self.initial_pose_receivedinitial_pose_received = False
249  while not self.initial_pose_receivedinitial_pose_received:
250  self.info_msginfo_msg('Setting initial pose')
251  self.setInitialPosesetInitialPose()
252  self.info_msginfo_msg('Waiting for amcl_pose to be received')
253  rclpy.spin_once(self, timeout_sec=1)
254 
255 
256 def run_all_tests(robot_tester):
257  # set transforms to use_sim_time
258  result = True
259  if (result):
260  robot_tester.wait_for_node_active('amcl')
261  robot_tester.wait_for_initial_pose()
262  robot_tester.wait_for_node_active('bt_navigator')
263  result = robot_tester.runNavigateAction()
264  # Test empty navigation request
265  result = result and not robot_tester.runFakeNavigateAction()
266  # Test preempting NHP with the same goal
267  result = result and robot_tester.runNavigatePreemptionAction(False)
268  result = result and robot_tester.runNavigatePreemptionAction(True)
269 
270  # Add more tests here if desired
271 
272  if (result):
273  robot_tester.info_msg('Test PASSED')
274  else:
275  robot_tester.error_msg('Test FAILED')
276 
277  return result
278 
279 
280 def fwd_pose(x=0.0, y=0.0, z=0.01):
281  initial_pose = Pose()
282  initial_pose.position.x = x
283  initial_pose.position.y = y
284  initial_pose.position.z = z
285  initial_pose.orientation.x = 0.0
286  initial_pose.orientation.y = 0.0
287  initial_pose.orientation.z = 0.0
288  initial_pose.orientation.w = 1.0
289  return initial_pose
290 
291 
292 def get_testers(args):
293  testers = []
294 
295  init_x, init_y, final_x, final_y = args.robot[0]
296  tester = NavTester(
297  initial_pose=fwd_pose(float(init_x), float(init_y)),
298  goal_pose=fwd_pose(float(final_x), float(final_y)))
299  tester.info_msg(
300  'Starting tester, robot going from ' + init_x + ', ' + init_y +
301  ' to ' + final_x + ', ' + final_y + '.')
302  testers.append(tester)
303  return testers
304 
305 
306 def main(argv=sys.argv[1:]):
307  # The robot(s) positions from the input arguments
308  parser = argparse.ArgumentParser(description='System-level navigation tester node')
309  group = parser.add_mutually_exclusive_group(required=True)
310  group.add_argument('-r', '--robot', action='append', nargs=4,
311  metavar=('init_x', 'init_y', 'final_x', 'final_y'),
312  help='The robot starting and final positions.')
313 
314  args, unknown = parser.parse_known_args()
315 
316  rclpy.init()
317 
318  # Create testers for each robot
319  testers = get_testers(args)
320 
321  # wait a few seconds to make sure entire stacks are up
322  time.sleep(10)
323 
324  for tester in testers:
325  passed = run_all_tests(tester)
326  if not passed:
327  break
328 
329  for tester in testers:
330  # stop and shutdown the nav stack to exit cleanly
331  tester.shutdown()
332 
333  testers[0].info_msg('Done Shutting Down.')
334 
335  if not passed:
336  testers[0].info_msg('Exiting failed')
337  exit(1)
338  else:
339  testers[0].info_msg('Exiting passed')
340  exit(0)
341 
342 
343 if __name__ == '__main__':
344  main()