Nav2 Navigation Stack - jazzy  jazzy
ROS 2 Navigation Stack
nav_to_pose_tester_node.py
1 #! /usr/bin/env python3
2 # Copyright 2018 Intel Corporation.
3 # Copyright 2020 Florian Gramss
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 
17 import argparse
18 import math
19 import sys
20 import time
21 
22 from typing import Optional
23 
24 from action_msgs.msg import GoalStatus
25 from geometry_msgs.msg import Pose
26 from geometry_msgs.msg import PoseStamped
27 from geometry_msgs.msg import PoseWithCovarianceStamped
28 from lifecycle_msgs.srv import GetState
29 from nav2_msgs.action import NavigateToPose
30 from nav2_msgs.srv import ManageLifecycleNodes
31 
32 import rclpy
33 
34 from rclpy.action import ActionClient
35 from rclpy.node import Node
36 from rclpy.qos import QoSDurabilityPolicy, QoSHistoryPolicy, QoSReliabilityPolicy
37 from rclpy.qos import QoSProfile
38 
39 
40 class NavTester(Node):
41 
42  def __init__(self, initial_pose: Pose, goal_pose: Pose, namespace: str = ''):
43  super().__init__(node_name='nav2_tester', namespace=namespace)
44  self.initial_pose_pubinitial_pose_pub = self.create_publisher(
45  PoseWithCovarianceStamped, 'initialpose', 10
46  )
47  self.goal_pubgoal_pub = self.create_publisher(PoseStamped, 'goal_pose', 10)
48 
49  pose_qos = QoSProfile(
50  durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
51  reliability=QoSReliabilityPolicy.RELIABLE,
52  history=QoSHistoryPolicy.KEEP_LAST,
53  depth=1,
54  )
55 
56  self.model_pose_submodel_pose_sub = self.create_subscription(
57  PoseWithCovarianceStamped, 'amcl_pose', self.poseCallbackposeCallback, pose_qos
58  )
59  self.initial_pose_receivedinitial_pose_received = False
60  self.initial_poseinitial_pose = initial_pose
61  self.goal_posegoal_pose = goal_pose
62  self.set_initial_pose_timeoutset_initial_pose_timeout = 15
63  self.action_clientaction_client = ActionClient(self, NavigateToPose, 'navigate_to_pose')
64 
65  def info_msg(self, msg: str):
66  self.get_logger().info('\033[1;37;44m' + msg + '\033[0m')
67 
68  def warn_msg(self, msg: str):
69  self.get_logger().warn('\033[1;37;43m' + msg + '\033[0m')
70 
71  def error_msg(self, msg: str):
72  self.get_logger().error('\033[1;37;41m' + msg + '\033[0m')
73 
74  def setInitialPose(self):
75  msg = PoseWithCovarianceStamped()
76  msg.pose.pose = self.initial_poseinitial_pose
77  msg.header.frame_id = 'map'
78  self.info_msginfo_msg('Publishing Initial Pose')
79  self.initial_pose_pubinitial_pose_pub.publish(msg)
80  self.currentPosecurrentPose = self.initial_poseinitial_pose
81 
82  def getStampedPoseMsg(self, pose: Pose):
83  msg = PoseStamped()
84  msg.header.frame_id = 'map'
85  msg.pose = pose
86  return msg
87 
88  def publishGoalPose(self, goal_pose: Optional[Pose] = None):
89  self.goal_posegoal_pose = goal_pose if goal_pose is not None else self.goal_posegoal_pose
90  self.goal_pubgoal_pub.publish(self.getStampedPoseMsggetStampedPoseMsg(self.goal_posegoal_pose))
91 
92  def runNavigateAction(self, goal_pose: Optional[Pose] = None):
93  # Sends a `NavToPose` action request and waits for completion
94  self.info_msginfo_msg("Waiting for 'NavigateToPose' action server")
95  while not self.action_clientaction_client.wait_for_server(timeout_sec=1.0):
96  self.info_msginfo_msg("'NavigateToPose' action server not available, waiting...")
97 
98  self.goal_posegoal_pose = goal_pose if goal_pose is not None else self.goal_posegoal_pose
99  goal_msg = NavigateToPose.Goal()
100  goal_msg.pose = self.getStampedPoseMsggetStampedPoseMsg(self.goal_posegoal_pose)
101 
102  self.info_msginfo_msg('Sending goal request...')
103  send_goal_future = self.action_clientaction_client.send_goal_async(goal_msg)
104 
105  rclpy.spin_until_future_complete(self, send_goal_future)
106  goal_handle = send_goal_future.result()
107 
108  if not goal_handle.accepted:
109  self.error_msgerror_msg('Goal rejected')
110  return False
111 
112  self.info_msginfo_msg('Goal accepted')
113  get_result_future = goal_handle.get_result_async()
114 
115  future_return = True
116 
117  self.info_msginfo_msg("Waiting for 'NavigateToPose' action to complete")
118  rclpy.spin_until_future_complete(self, get_result_future)
119  status = get_result_future.result().status
120  if status != GoalStatus.STATUS_SUCCEEDED:
121  self.info_msginfo_msg(f'Goal failed with status code: {status}')
122  return False
123 
124  if not future_return:
125  return False
126 
127  self.info_msginfo_msg('Goal succeeded!')
128  return True
129 
130  def poseCallback(self, msg):
131  self.info_msginfo_msg('Received amcl_pose')
132  self.current_posecurrent_pose = msg.pose.pose
133  self.initial_pose_receivedinitial_pose_received = True
134 
135  def reachesGoal(self, timeout, distance):
136  goalReached = False
137  start_time = time.time()
138 
139  while not goalReached:
140  rclpy.spin_once(self, timeout_sec=1)
141  if self.distanceFromGoaldistanceFromGoal() < distance:
142  goalReached = True
143  self.info_msginfo_msg('*** GOAL REACHED ***')
144  return True
145  elif timeout is not None:
146  if (time.time() - start_time) > timeout:
147  self.error_msgerror_msg('Robot timed out reaching its goal!')
148  return False
149 
150  def distanceFromGoal(self):
151  d_x = self.current_posecurrent_pose.position.x - self.goal_posegoal_pose.position.x
152  d_y = self.current_posecurrent_pose.position.y - self.goal_posegoal_pose.position.y
153  distance = math.sqrt(d_x * d_x + d_y * d_y)
154  self.info_msginfo_msg(f'Distance from goal is: {distance}')
155  return distance
156 
157  def wait_for_node_active(self, node_name: str):
158  # Waits for the node within the tester namespace to become active
159  self.info_msginfo_msg(f'Waiting for {node_name} to become active')
160  node_service = f'{node_name}/get_state'
161  state_client = self.create_client(GetState, node_service)
162  while not state_client.wait_for_service(timeout_sec=1.0):
163  self.info_msginfo_msg(f'{node_service} service not available, waiting...')
164  req = GetState.Request() # empty request
165  state = 'UNKNOWN'
166  while state != 'active':
167  self.info_msginfo_msg(f'Getting {node_name} state...')
168  future = state_client.call_async(req)
169  rclpy.spin_until_future_complete(self, future)
170  if future.result() is not None:
171  state = future.result().current_state.label
172  self.info_msginfo_msg(f'Result of get_state: {state}')
173  else:
174  self.error_msgerror_msg(
175  f'Exception while calling service: {future.exception()!r}'
176  )
177  time.sleep(5)
178 
179  def shutdown(self):
180  self.info_msginfo_msg('Shutting down')
181  self.action_clientaction_client.destroy()
182 
183  transition_service = 'lifecycle_manager_navigation/manage_nodes'
184  mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
185  while not mgr_client.wait_for_service(timeout_sec=1.0):
186  self.info_msginfo_msg(f'{transition_service} service not available, waiting...')
187 
188  req = ManageLifecycleNodes.Request()
189  req.command = ManageLifecycleNodes.Request().SHUTDOWN
190  future = mgr_client.call_async(req)
191  try:
192  self.info_msginfo_msg('Shutting down navigation lifecycle manager...')
193  rclpy.spin_until_future_complete(self, future)
194  future.result()
195  self.info_msginfo_msg('Shutting down navigation lifecycle manager complete.')
196  except Exception as e: # noqa: B902
197  self.error_msgerror_msg(f'Service call failed {e!r}')
198  transition_service = 'lifecycle_manager_localization/manage_nodes'
199  mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
200  while not mgr_client.wait_for_service(timeout_sec=1.0):
201  self.info_msginfo_msg(f'{transition_service} service not available, waiting...')
202 
203  req = ManageLifecycleNodes.Request()
204  req.command = ManageLifecycleNodes.Request().SHUTDOWN
205  future = mgr_client.call_async(req)
206  try:
207  self.info_msginfo_msg('Shutting down localization lifecycle manager...')
208  rclpy.spin_until_future_complete(self, future)
209  future.result()
210  self.info_msginfo_msg('Shutting down localization lifecycle manager complete')
211  except Exception as e: # noqa: B902
212  self.error_msgerror_msg(f'Service call failed {e!r}')
213 
214  def wait_for_initial_pose(self):
215  self.initial_pose_receivedinitial_pose_received = False
216  # If the initial pose is not received within 100 seconds, return False
217  # this is because when setting a wrong initial pose, amcl_pose is not received
218  # and the test will hang indefinitely
219  start_time = time.time()
220  duration = 0
221  while not self.initial_pose_receivedinitial_pose_received:
222  self.info_msginfo_msg('Setting initial pose')
223  self.setInitialPosesetInitialPose()
224  self.info_msginfo_msg('Waiting for amcl_pose to be received')
225  duration = time.time() - start_time
226  if duration > self.set_initial_pose_timeoutset_initial_pose_timeout:
227  self.error_msgerror_msg('Timeout waiting for initial pose to be set')
228  return False
229  rclpy.spin_once(self, timeout_sec=1)
230  return True
231 
232 
233 def test_RobotMovesToGoal(robot_tester):
234  robot_tester.info_msg('Setting goal pose')
235  robot_tester.publishGoalPose()
236  robot_tester.info_msg('Waiting 60 seconds for robot to reach goal')
237  return robot_tester.reachesGoal(timeout=60, distance=0.5)
238 
239 
240 def run_all_tests(robot_tester):
241  # set transforms to use_sim_time
242  result = True
243  if result:
244  robot_tester.wait_for_node_active('amcl')
245  result = robot_tester.wait_for_initial_pose()
246  if result:
247  robot_tester.wait_for_node_active('bt_navigator')
248  result = robot_tester.runNavigateAction()
249 
250  if result:
251  result = test_RobotMovesToGoal(robot_tester)
252 
253  # Add more tests here if desired
254 
255  if result:
256  robot_tester.info_msg('Test PASSED')
257  else:
258  robot_tester.error_msg('Test FAILED')
259 
260  return result
261 
262 
263 def fwd_pose(x=0.0, y=0.0, z=0.01):
264  initial_pose = Pose()
265  initial_pose.position.x = x
266  initial_pose.position.y = y
267  initial_pose.position.z = z
268  initial_pose.orientation.x = 0.0
269  initial_pose.orientation.y = 0.0
270  initial_pose.orientation.z = 0.0
271  initial_pose.orientation.w = 1.0
272  return initial_pose
273 
274 
275 def get_testers(args):
276  testers = []
277 
278  if args.robot:
279  # Requested tester for one robot
280  init_x, init_y, final_x, final_y = args.robot[0]
281  tester = NavTester(
282  initial_pose=fwd_pose(float(init_x), float(init_y)),
283  goal_pose=fwd_pose(float(final_x), float(final_y)),
284  )
285  tester.info_msg(
286  'Starting tester, robot going from '
287  + init_x
288  + ', '
289  + init_y
290  + ' to '
291  + final_x
292  + ', '
293  + final_y
294  + '.'
295  )
296  testers.append(tester)
297  return testers
298 
299  # Requested tester for multiple robots
300  for robot in args.robots:
301  namespace, init_x, init_y, final_x, final_y = robot
302  tester = NavTester(
303  namespace=namespace,
304  initial_pose=fwd_pose(float(init_x), float(init_y)),
305  goal_pose=fwd_pose(float(final_x), float(final_y)),
306  )
307  tester.info_msg(
308  'Starting tester for '
309  + namespace
310  + ' going from '
311  + init_x
312  + ', '
313  + init_y
314  + ' to '
315  + final_x
316  + ', '
317  + final_y
318  )
319  testers.append(tester)
320  return testers
321 
322 
323 def check_args(expect_failure: str):
324  # Check if --expect_failure is True or False
325  if expect_failure != 'True' and expect_failure != 'False':
326  print(
327  '\033[1;37;41m' + ' -e flag must be set to True or False only. ' + '\033[0m'
328  )
329  exit(1)
330  else:
331  return eval(expect_failure)
332 
333 
334 def main(argv=sys.argv[1:]):
335  # The robot(s) positions from the input arguments
336  parser = argparse.ArgumentParser(description='System-level navigation tester node')
337  parser.add_argument('-e', '--expect_failure')
338  group = parser.add_mutually_exclusive_group(required=True)
339  group.add_argument(
340  '-r',
341  '--robot',
342  action='append',
343  nargs=4,
344  metavar=('init_x', 'init_y', 'final_x', 'final_y'),
345  help='The robot starting and final positions.',
346  )
347  group.add_argument(
348  '-rs',
349  '--robots',
350  action='append',
351  nargs=5,
352  metavar=('name', 'init_x', 'init_y', 'final_x', 'final_y'),
353  help="The robot's namespace and starting and final positions. "
354  + 'Repeating the argument for multiple robots is supported.',
355  )
356 
357  args, unknown = parser.parse_known_args()
358 
359  expect_failure = check_args(args.expect_failure)
360 
361  rclpy.init()
362 
363  # Create testers for each robot
364  testers = get_testers(args)
365 
366  # wait a few seconds to make sure entire stacks are up
367  time.sleep(10)
368 
369  for tester in testers:
370  passed = run_all_tests(tester)
371  if passed != expect_failure:
372  break
373 
374  for tester in testers:
375  # stop and shutdown the nav stack to exit cleanly
376  tester.shutdown()
377 
378  testers[0].info_msg('Done Shutting Down.')
379 
380  if passed != expect_failure:
381  testers[0].info_msg('Exiting failed')
382  exit(1)
383  else:
384  testers[0].info_msg('Exiting passed')
385  exit(0)
386 
387 
388 if __name__ == '__main__':
389  main()