Nav2 Navigation Stack - humble  humble
ROS 2 Navigation Stack
nav_to_pose_tester_node.py
1 #! /usr/bin/env python3
2 # Copyright 2018 Intel Corporation.
3 # Copyright 2020 Florian Gramss
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 
17 import argparse
18 import math
19 import sys
20 import time
21 
22 from typing import Optional
23 
24 from action_msgs.msg import GoalStatus
25 from geometry_msgs.msg import Pose
26 from geometry_msgs.msg import PoseStamped
27 from geometry_msgs.msg import PoseWithCovarianceStamped
28 from lifecycle_msgs.srv import GetState
29 from nav2_msgs.action import NavigateToPose
30 from nav2_msgs.srv import ManageLifecycleNodes
31 
32 import rclpy
33 
34 from rclpy.action import ActionClient
35 from rclpy.node import Node
36 from rclpy.qos import QoSDurabilityPolicy, QoSHistoryPolicy, QoSReliabilityPolicy
37 from rclpy.qos import QoSProfile
38 
39 
40 class NavTester(Node):
41 
42  def __init__(self, initial_pose: Pose, goal_pose: Pose, namespace: str = ''):
43  super().__init__(node_name='nav2_tester', namespace=namespace)
44  self.initial_pose_pubinitial_pose_pub = self.create_publisher(
45  PoseWithCovarianceStamped, 'initialpose', 10
46  )
47  self.goal_pubgoal_pub = self.create_publisher(PoseStamped, 'goal_pose', 10)
48 
49  pose_qos = QoSProfile(
50  durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
51  reliability=QoSReliabilityPolicy.RELIABLE,
52  history=QoSHistoryPolicy.KEEP_LAST,
53  depth=1,
54  )
55 
56  self.model_pose_submodel_pose_sub = self.create_subscription(
57  PoseWithCovarianceStamped, 'amcl_pose', self.poseCallbackposeCallback, pose_qos
58  )
59  self.initial_pose_receivedinitial_pose_received = False
60  self.initial_poseinitial_pose = initial_pose
61  self.goal_posegoal_pose = goal_pose
62  self.action_clientaction_client = ActionClient(self, NavigateToPose, 'navigate_to_pose')
63 
64  def info_msg(self, msg: str):
65  self.get_logger().info('\033[1;37;44m' + msg + '\033[0m')
66 
67  def warn_msg(self, msg: str):
68  self.get_logger().warn('\033[1;37;43m' + msg + '\033[0m')
69 
70  def error_msg(self, msg: str):
71  self.get_logger().error('\033[1;37;41m' + msg + '\033[0m')
72 
73  def setInitialPose(self):
74  msg = PoseWithCovarianceStamped()
75  msg.pose.pose = self.initial_poseinitial_pose
76  msg.header.frame_id = 'map'
77  self.info_msginfo_msg('Publishing Initial Pose')
78  self.initial_pose_pubinitial_pose_pub.publish(msg)
79  self.currentPosecurrentPose = self.initial_poseinitial_pose
80 
81  def getStampedPoseMsg(self, pose: Pose):
82  msg = PoseStamped()
83  msg.header.frame_id = 'map'
84  msg.pose = pose
85  return msg
86 
87  def publishGoalPose(self, goal_pose: Optional[Pose] = None):
88  self.goal_posegoal_pose = goal_pose if goal_pose is not None else self.goal_posegoal_pose
89  self.goal_pubgoal_pub.publish(self.getStampedPoseMsggetStampedPoseMsg(self.goal_posegoal_pose))
90 
91  def runNavigateAction(self, goal_pose: Optional[Pose] = None):
92  # Sends a `NavToPose` action request and waits for completion
93  self.info_msginfo_msg("Waiting for 'NavigateToPose' action server")
94  while not self.action_clientaction_client.wait_for_server(timeout_sec=1.0):
95  self.info_msginfo_msg("'NavigateToPose' action server not available, waiting...")
96 
97  self.goal_posegoal_pose = goal_pose if goal_pose is not None else self.goal_posegoal_pose
98  goal_msg = NavigateToPose.Goal()
99  goal_msg.pose = self.getStampedPoseMsggetStampedPoseMsg(self.goal_posegoal_pose)
100 
101  self.info_msginfo_msg('Sending goal request...')
102  send_goal_future = self.action_clientaction_client.send_goal_async(goal_msg)
103 
104  rclpy.spin_until_future_complete(self, send_goal_future)
105  goal_handle = send_goal_future.result()
106 
107  if not goal_handle.accepted:
108  self.error_msgerror_msg('Goal rejected')
109  return False
110 
111  self.info_msginfo_msg('Goal accepted')
112  get_result_future = goal_handle.get_result_async()
113 
114  future_return = True
115 
116  self.info_msginfo_msg("Waiting for 'NavigateToPose' action to complete")
117  rclpy.spin_until_future_complete(self, get_result_future)
118  status = get_result_future.result().status
119  if status != GoalStatus.STATUS_SUCCEEDED:
120  self.info_msginfo_msg(f'Goal failed with status code: {status}')
121  return False
122 
123  if not future_return:
124  return False
125 
126  self.info_msginfo_msg('Goal succeeded!')
127  return True
128 
129  def poseCallback(self, msg):
130  self.info_msginfo_msg('Received amcl_pose')
131  self.current_posecurrent_pose = msg.pose.pose
132  self.initial_pose_receivedinitial_pose_received = True
133 
134  def reachesGoal(self, timeout, distance):
135  goalReached = False
136  start_time = time.time()
137 
138  while not goalReached:
139  rclpy.spin_once(self, timeout_sec=1)
140  if self.distanceFromGoaldistanceFromGoal() < distance:
141  goalReached = True
142  self.info_msginfo_msg('*** GOAL REACHED ***')
143  return True
144  elif timeout is not None:
145  if (time.time() - start_time) > timeout:
146  self.error_msgerror_msg('Robot timed out reaching its goal!')
147  return False
148 
149  def distanceFromGoal(self):
150  d_x = self.current_posecurrent_pose.position.x - self.goal_posegoal_pose.position.x
151  d_y = self.current_posecurrent_pose.position.y - self.goal_posegoal_pose.position.y
152  distance = math.sqrt(d_x * d_x + d_y * d_y)
153  self.info_msginfo_msg(f'Distance from goal is: {distance}')
154  return distance
155 
156  def wait_for_node_active(self, node_name: str):
157  # Waits for the node within the tester namespace to become active
158  self.info_msginfo_msg(f'Waiting for {node_name} to become active')
159  node_service = f'{node_name}/get_state'
160  state_client = self.create_client(GetState, node_service)
161  while not state_client.wait_for_service(timeout_sec=1.0):
162  self.info_msginfo_msg(f'{node_service} service not available, waiting...')
163  req = GetState.Request() # empty request
164  state = 'UNKNOWN'
165  while state != 'active':
166  self.info_msginfo_msg(f'Getting {node_name} state...')
167  future = state_client.call_async(req)
168  rclpy.spin_until_future_complete(self, future)
169  if future.result() is not None:
170  state = future.result().current_state.label
171  self.info_msginfo_msg(f'Result of get_state: {state}')
172  else:
173  self.error_msgerror_msg(
174  f'Exception while calling service: {future.exception()!r}'
175  )
176  time.sleep(5)
177 
178  def shutdown(self):
179  self.info_msginfo_msg('Shutting down')
180  self.action_clientaction_client.destroy()
181 
182  transition_service = 'lifecycle_manager_navigation/manage_nodes'
183  mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
184  while not mgr_client.wait_for_service(timeout_sec=1.0):
185  self.info_msginfo_msg(f'{transition_service} service not available, waiting...')
186 
187  req = ManageLifecycleNodes.Request()
188  req.command = ManageLifecycleNodes.Request().SHUTDOWN
189  future = mgr_client.call_async(req)
190  try:
191  self.info_msginfo_msg('Shutting down navigation lifecycle manager...')
192  rclpy.spin_until_future_complete(self, future)
193  future.result()
194  self.info_msginfo_msg('Shutting down navigation lifecycle manager complete.')
195  except Exception as e: # noqa: B902
196  self.error_msgerror_msg(f'Service call failed {e!r}')
197  transition_service = 'lifecycle_manager_localization/manage_nodes'
198  mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
199  while not mgr_client.wait_for_service(timeout_sec=1.0):
200  self.info_msginfo_msg(f'{transition_service} service not available, waiting...')
201 
202  req = ManageLifecycleNodes.Request()
203  req.command = ManageLifecycleNodes.Request().SHUTDOWN
204  future = mgr_client.call_async(req)
205  try:
206  self.info_msginfo_msg('Shutting down localization lifecycle manager...')
207  rclpy.spin_until_future_complete(self, future)
208  future.result()
209  self.info_msginfo_msg('Shutting down localization lifecycle manager complete')
210  except Exception as e: # noqa: B902
211  self.error_msgerror_msg(f'Service call failed {e!r}')
212 
213  def wait_for_initial_pose(self):
214  self.initial_pose_receivedinitial_pose_received = False
215  while not self.initial_pose_receivedinitial_pose_received:
216  self.info_msginfo_msg('Setting initial pose')
217  self.setInitialPosesetInitialPose()
218  self.info_msginfo_msg('Waiting for amcl_pose to be received')
219  rclpy.spin_once(self, timeout_sec=1)
220 
221 
222 def test_RobotMovesToGoal(robot_tester):
223  robot_tester.info_msg('Setting goal pose')
224  robot_tester.publishGoalPose()
225  robot_tester.info_msg('Waiting 60 seconds for robot to reach goal')
226  return robot_tester.reachesGoal(timeout=60, distance=0.5)
227 
228 
229 def run_all_tests(robot_tester):
230  # set transforms to use_sim_time
231  result = True
232  if result:
233  robot_tester.wait_for_node_active('amcl')
234  robot_tester.wait_for_initial_pose()
235  robot_tester.wait_for_node_active('bt_navigator')
236  result = robot_tester.runNavigateAction()
237 
238  if result:
239  result = test_RobotMovesToGoal(robot_tester)
240 
241  # Add more tests here if desired
242 
243  if result:
244  robot_tester.info_msg('Test PASSED')
245  else:
246  robot_tester.error_msg('Test FAILED')
247 
248  return result
249 
250 
251 def fwd_pose(x=0.0, y=0.0, z=0.01):
252  initial_pose = Pose()
253  initial_pose.position.x = x
254  initial_pose.position.y = y
255  initial_pose.position.z = z
256  initial_pose.orientation.x = 0.0
257  initial_pose.orientation.y = 0.0
258  initial_pose.orientation.z = 0.0
259  initial_pose.orientation.w = 1.0
260  return initial_pose
261 
262 
263 def get_testers(args):
264  testers = []
265 
266  if args.robot:
267  # Requested tester for one robot
268  init_x, init_y, final_x, final_y = args.robot[0]
269  tester = NavTester(
270  initial_pose=fwd_pose(float(init_x), float(init_y)),
271  goal_pose=fwd_pose(float(final_x), float(final_y)),
272  )
273  tester.info_msg(
274  'Starting tester, robot going from '
275  + init_x
276  + ', '
277  + init_y
278  + ' to '
279  + final_x
280  + ', '
281  + final_y
282  + '.'
283  )
284  testers.append(tester)
285  return testers
286 
287  # Requested tester for multiple robots
288  for robot in args.robots:
289  namespace, init_x, init_y, final_x, final_y = robot
290  tester = NavTester(
291  namespace=namespace,
292  initial_pose=fwd_pose(float(init_x), float(init_y)),
293  goal_pose=fwd_pose(float(final_x), float(final_y)),
294  )
295  tester.info_msg(
296  'Starting tester for '
297  + namespace
298  + ' going from '
299  + init_x
300  + ', '
301  + init_y
302  + ' to '
303  + final_x
304  + ', '
305  + final_y
306  )
307  testers.append(tester)
308  return testers
309 
310 
311 def check_args(expect_failure: str):
312  # Check if --expect_failure is True or False
313  if expect_failure != 'True' and expect_failure != 'False':
314  print(
315  '\033[1;37;41m' + ' -e flag must be set to True or False only. ' + '\033[0m'
316  )
317  exit(1)
318  else:
319  return eval(expect_failure)
320 
321 
322 def main(argv=sys.argv[1:]):
323  # The robot(s) positions from the input arguments
324  parser = argparse.ArgumentParser(description='System-level navigation tester node')
325  parser.add_argument('-e', '--expect_failure')
326  group = parser.add_mutually_exclusive_group(required=True)
327  group.add_argument(
328  '-r',
329  '--robot',
330  action='append',
331  nargs=4,
332  metavar=('init_x', 'init_y', 'final_x', 'final_y'),
333  help='The robot starting and final positions.',
334  )
335  group.add_argument(
336  '-rs',
337  '--robots',
338  action='append',
339  nargs=5,
340  metavar=('name', 'init_x', 'init_y', 'final_x', 'final_y'),
341  help="The robot's namespace and starting and final positions. "
342  + 'Repeating the argument for multiple robots is supported.',
343  )
344 
345  args, unknown = parser.parse_known_args()
346 
347  expect_failure = check_args(args.expect_failure)
348 
349  rclpy.init()
350 
351  # Create testers for each robot
352  testers = get_testers(args)
353 
354  # wait a few seconds to make sure entire stacks are up
355  time.sleep(10)
356 
357  for tester in testers:
358  passed = run_all_tests(tester)
359  if passed != expect_failure:
360  break
361 
362  for tester in testers:
363  # stop and shutdown the nav stack to exit cleanly
364  tester.shutdown()
365 
366  testers[0].info_msg('Done Shutting Down.')
367 
368  if passed != expect_failure:
369  testers[0].info_msg('Exiting failed')
370  exit(1)
371  else:
372  testers[0].info_msg('Exiting passed')
373  exit(0)
374 
375 
376 if __name__ == '__main__':
377  main()