Nav2 Navigation Stack - jazzy  jazzy
ROS 2 Navigation Stack
nav_through_poses_tester_node.py
1 #! /usr/bin/env python3
2 # Copyright 2018 Intel Corporation.
3 # Copyright 2020 Florian Gramss
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 
17 import argparse
18 import sys
19 import time
20 
21 from typing import Optional
22 
23 from action_msgs.msg import GoalStatus
24 from geometry_msgs.msg import Pose
25 from geometry_msgs.msg import PoseStamped
26 from geometry_msgs.msg import PoseWithCovarianceStamped
27 from lifecycle_msgs.srv import GetState
28 from nav2_msgs.action import NavigateThroughPoses
29 from nav2_msgs.srv import ManageLifecycleNodes
30 
31 import rclpy
32 
33 from rclpy.action import ActionClient
34 from rclpy.node import Node
35 from rclpy.qos import QoSDurabilityPolicy, QoSHistoryPolicy, QoSReliabilityPolicy
36 from rclpy.qos import QoSProfile
37 
38 
39 class NavTester(Node):
40 
41  def __init__(self, initial_pose: Pose, goal_pose: Pose, namespace: str = ''):
42  super().__init__(node_name='nav2_tester', namespace=namespace)
43  self.initial_pose_pubinitial_pose_pub = self.create_publisher(
44  PoseWithCovarianceStamped, 'initialpose', 10
45  )
46 
47  pose_qos = QoSProfile(
48  durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
49  reliability=QoSReliabilityPolicy.RELIABLE,
50  history=QoSHistoryPolicy.KEEP_LAST,
51  depth=1,
52  )
53 
54  self.model_pose_submodel_pose_sub = self.create_subscription(
55  PoseWithCovarianceStamped, 'amcl_pose', self.poseCallbackposeCallback, pose_qos
56  )
57  self.initial_pose_receivedinitial_pose_received = False
58  self.initial_poseinitial_pose = initial_pose
59  self.goal_posegoal_pose = goal_pose
60  self.action_clientaction_client = ActionClient(
61  self, NavigateThroughPoses, 'navigate_through_poses'
62  )
63 
64  def info_msg(self, msg: str):
65  self.get_logger().info('\033[1;37;44m' + msg + '\033[0m')
66 
67  def warn_msg(self, msg: str):
68  self.get_logger().warn('\033[1;37;43m' + msg + '\033[0m')
69 
70  def error_msg(self, msg: str):
71  self.get_logger().error('\033[1;37;41m' + msg + '\033[0m')
72 
73  def setInitialPose(self):
74  msg = PoseWithCovarianceStamped()
75  msg.pose.pose = self.initial_poseinitial_pose
76  msg.header.frame_id = 'map'
77  self.info_msginfo_msg('Publishing Initial Pose')
78  self.initial_pose_pubinitial_pose_pub.publish(msg)
79  self.currentPosecurrentPose = self.initial_poseinitial_pose
80 
81  def getStampedPoseMsg(self, pose: Pose):
82  msg = PoseStamped()
83  msg.header.frame_id = 'map'
84  msg.pose = pose
85  return msg
86 
87  def runNavigateAction(self, goal_pose: Optional[Pose] = None):
88  # Sends a `NavToPose` action request and waits for completion
89  self.info_msginfo_msg("Waiting for 'NavigateThroughPoses' action server")
90  while not self.action_clientaction_client.wait_for_server(timeout_sec=1.0):
91  self.info_msginfo_msg(
92  "'NavigateThroughPoses' action server not available, waiting..."
93  )
94 
95  self.goal_posegoal_pose = goal_pose if goal_pose is not None else self.goal_posegoal_pose
96  goal_msg = NavigateThroughPoses.Goal()
97  goal_msg.poses = [
98  self.getStampedPoseMsggetStampedPoseMsg(self.goal_posegoal_pose),
99  self.getStampedPoseMsggetStampedPoseMsg(self.goal_posegoal_pose),
100  ]
101 
102  self.info_msginfo_msg('Sending goal request...')
103  send_goal_future = self.action_clientaction_client.send_goal_async(goal_msg)
104 
105  rclpy.spin_until_future_complete(self, send_goal_future)
106  goal_handle = send_goal_future.result()
107 
108  if not goal_handle.accepted:
109  self.error_msgerror_msg('Goal rejected')
110  return False
111 
112  self.info_msginfo_msg('Goal accepted')
113  get_result_future = goal_handle.get_result_async()
114 
115  self.info_msginfo_msg("Waiting for 'NavigateToPose' action to complete")
116  rclpy.spin_until_future_complete(self, get_result_future)
117  status = get_result_future.result().status
118  if status != GoalStatus.STATUS_SUCCEEDED:
119  self.info_msginfo_msg(f'Goal failed with status code: {status}')
120  return False
121 
122  self.info_msginfo_msg('Goal succeeded!')
123  return True
124 
125  def runFakeNavigateAction(self):
126  # Sends a `NavToPose` action request and waits for completion
127  self.info_msginfo_msg("Waiting for 'NavigateThroughPoses' action server")
128  while not self.action_clientaction_client.wait_for_server(timeout_sec=1.0):
129  self.info_msginfo_msg(
130  "'NavigateThroughPoses' action server not available, waiting..."
131  )
132 
133  goal_msg = NavigateThroughPoses.Goal()
134 
135  self.info_msginfo_msg('Sending goal request...')
136  send_goal_future = self.action_clientaction_client.send_goal_async(goal_msg)
137 
138  rclpy.spin_until_future_complete(self, send_goal_future)
139  goal_handle = send_goal_future.result()
140 
141  if not goal_handle.accepted:
142  self.error_msgerror_msg('Goal rejected')
143  return False
144 
145  self.info_msginfo_msg('Goal accepted')
146  get_result_future = goal_handle.get_result_async()
147 
148  self.info_msginfo_msg("Waiting for 'NavigateToPose' action to complete")
149  rclpy.spin_until_future_complete(self, get_result_future)
150  status = get_result_future.result().status
151  if status != GoalStatus.STATUS_SUCCEEDED:
152  self.info_msginfo_msg(f'Goal failed with status code: {status}')
153  return False
154 
155  self.info_msginfo_msg('Goal succeeded!')
156  return True
157 
158  def runNavigatePreemptionAction(self, block):
159  # Sends a `NavToPose` action request and waits for completion
160  self.info_msginfo_msg("Waiting for 'NavigateThroughPoses' action server")
161  while not self.action_clientaction_client.wait_for_server(timeout_sec=1.0):
162  self.info_msginfo_msg(
163  "'NavigateThroughPoses' action server not available, waiting..."
164  )
165 
166  goal_msg = NavigateThroughPoses.Goal()
167  goal_msg.poses = [self.getStampedPoseMsggetStampedPoseMsg(self.initial_poseinitial_pose)]
168 
169  self.info_msginfo_msg('Sending goal request...')
170  send_goal_future = self.action_clientaction_client.send_goal_async(goal_msg)
171 
172  rclpy.spin_until_future_complete(self, send_goal_future)
173  goal_handle = send_goal_future.result()
174 
175  if not goal_handle.accepted:
176  self.error_msgerror_msg('Goal rejected')
177  return False
178 
179  if not block:
180  return True
181 
182  self.info_msginfo_msg('Goal accepted')
183  get_result_future = goal_handle.get_result_async()
184 
185  self.info_msginfo_msg("Waiting for 'NavigateToPose' action to complete")
186  rclpy.spin_until_future_complete(self, get_result_future)
187  status = get_result_future.result().status
188  if status != GoalStatus.STATUS_SUCCEEDED:
189  self.info_msginfo_msg(f'Goal failed with status code: {status}')
190  return False
191 
192  self.info_msginfo_msg('Goal succeeded!')
193  return True
194 
195  def poseCallback(self, msg):
196  self.info_msginfo_msg('Received amcl_pose')
197  self.current_posecurrent_pose = msg.pose.pose
198  self.initial_pose_receivedinitial_pose_received = True
199 
200  def wait_for_node_active(self, node_name: str):
201  # Waits for the node within the tester namespace to become active
202  self.info_msginfo_msg(f'Waiting for {node_name} to become active')
203  node_service = f'{node_name}/get_state'
204  state_client = self.create_client(GetState, node_service)
205  while not state_client.wait_for_service(timeout_sec=1.0):
206  self.info_msginfo_msg(f'{node_service} service not available, waiting...')
207  req = GetState.Request() # empty request
208  state = 'UNKNOWN'
209  while state != 'active':
210  self.info_msginfo_msg(f'Getting {node_name} state...')
211  future = state_client.call_async(req)
212  rclpy.spin_until_future_complete(self, future)
213  if future.result() is not None:
214  state = future.result().current_state.label
215  self.info_msginfo_msg(f'Result of get_state: {state}')
216  else:
217  self.error_msgerror_msg(
218  f'Exception while calling service: {future.exception()!r}'
219  )
220  time.sleep(5)
221 
222  def shutdown(self):
223  self.info_msginfo_msg('Shutting down')
224  self.action_clientaction_client.destroy()
225 
226  transition_service = 'lifecycle_manager_navigation/manage_nodes'
227  mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
228  while not mgr_client.wait_for_service(timeout_sec=1.0):
229  self.info_msginfo_msg(f'{transition_service} service not available, waiting...')
230 
231  req = ManageLifecycleNodes.Request()
232  req.command = ManageLifecycleNodes.Request().SHUTDOWN
233  future = mgr_client.call_async(req)
234  try:
235  self.info_msginfo_msg('Shutting down navigation lifecycle manager...')
236  rclpy.spin_until_future_complete(self, future)
237  future.result()
238  self.info_msginfo_msg('Shutting down navigation lifecycle manager complete.')
239  except Exception as e: # noqa: B902
240  self.error_msgerror_msg(f'Service call failed {e!r}')
241  transition_service = 'lifecycle_manager_localization/manage_nodes'
242  mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
243  while not mgr_client.wait_for_service(timeout_sec=1.0):
244  self.info_msginfo_msg(f'{transition_service} service not available, waiting...')
245 
246  req = ManageLifecycleNodes.Request()
247  req.command = ManageLifecycleNodes.Request().SHUTDOWN
248  future = mgr_client.call_async(req)
249  try:
250  self.info_msginfo_msg('Shutting down localization lifecycle manager...')
251  rclpy.spin_until_future_complete(self, future)
252  future.result()
253  self.info_msginfo_msg('Shutting down localization lifecycle manager complete')
254  except Exception as e: # noqa: B902
255  self.error_msgerror_msg(f'Service call failed {e!r}')
256 
257  def wait_for_initial_pose(self):
258  self.initial_pose_receivedinitial_pose_received = False
259  while not self.initial_pose_receivedinitial_pose_received:
260  self.info_msginfo_msg('Setting initial pose')
261  self.setInitialPosesetInitialPose()
262  self.info_msginfo_msg('Waiting for amcl_pose to be received')
263  rclpy.spin_once(self, timeout_sec=1)
264 
265 
266 def run_all_tests(robot_tester):
267  # set transforms to use_sim_time
268  result = True
269  if result:
270  robot_tester.wait_for_node_active('amcl')
271  robot_tester.wait_for_initial_pose()
272  robot_tester.wait_for_node_active('bt_navigator')
273  result = robot_tester.runNavigateAction()
274  # Test empty navigation request
275  result = result and not robot_tester.runFakeNavigateAction()
276  # Test preempting NHP with the same goal
277  result = result and robot_tester.runNavigatePreemptionAction(False)
278  result = result and robot_tester.runNavigatePreemptionAction(True)
279 
280  # Add more tests here if desired
281 
282  if result:
283  robot_tester.info_msg('Test PASSED')
284  else:
285  robot_tester.error_msg('Test FAILED')
286 
287  return result
288 
289 
290 def fwd_pose(x=0.0, y=0.0, z=0.01):
291  initial_pose = Pose()
292  initial_pose.position.x = x
293  initial_pose.position.y = y
294  initial_pose.position.z = z
295  initial_pose.orientation.x = 0.0
296  initial_pose.orientation.y = 0.0
297  initial_pose.orientation.z = 0.0
298  initial_pose.orientation.w = 1.0
299  return initial_pose
300 
301 
302 def get_testers(args):
303  testers = []
304 
305  init_x, init_y, final_x, final_y = args.robot[0]
306  tester = NavTester(
307  initial_pose=fwd_pose(float(init_x), float(init_y)),
308  goal_pose=fwd_pose(float(final_x), float(final_y)),
309  )
310  tester.info_msg(
311  'Starting tester, robot going from '
312  + init_x
313  + ', '
314  + init_y
315  + ' to '
316  + final_x
317  + ', '
318  + final_y
319  + '.'
320  )
321  testers.append(tester)
322  return testers
323 
324 
325 def main(argv=sys.argv[1:]):
326  # The robot(s) positions from the input arguments
327  parser = argparse.ArgumentParser(description='System-level navigation tester node')
328  group = parser.add_mutually_exclusive_group(required=True)
329  group.add_argument(
330  '-r',
331  '--robot',
332  action='append',
333  nargs=4,
334  metavar=('init_x', 'init_y', 'final_x', 'final_y'),
335  help='The robot starting and final positions.',
336  )
337 
338  args, unknown = parser.parse_known_args()
339 
340  rclpy.init()
341 
342  # Create testers for each robot
343  testers = get_testers(args)
344 
345  # wait a few seconds to make sure entire stacks are up
346  time.sleep(10)
347 
348  for tester in testers:
349  passed = run_all_tests(tester)
350  if not passed:
351  break
352 
353  for tester in testers:
354  # stop and shutdown the nav stack to exit cleanly
355  tester.shutdown()
356 
357  testers[0].info_msg('Done Shutting Down.')
358 
359  if not passed:
360  testers[0].info_msg('Exiting failed')
361  exit(1)
362  else:
363  testers[0].info_msg('Exiting passed')
364  exit(0)
365 
366 
367 if __name__ == '__main__':
368  main()