Nav2 Navigation Stack - humble  humble
ROS 2 Navigation Stack
tester_node.py
1 #! /usr/bin/env python3
2 # Copyright 2018 Intel Corporation.
3 # Copyright 2020 Samsung Research America
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 
17 import argparse
18 import math
19 import sys
20 import time
21 from typing import Optional
22 
23 from action_msgs.msg import GoalStatus
24 from geometry_msgs.msg import Pose
25 from geometry_msgs.msg import PoseStamped
26 from geometry_msgs.msg import PoseWithCovarianceStamped
27 from lifecycle_msgs.srv import GetState
28 from nav2_msgs.action import NavigateToPose
29 from nav2_msgs.srv import ManageLifecycleNodes
30 
31 import rclpy
32 
33 from rclpy.action import ActionClient
34 from rclpy.node import Node
35 from rclpy.qos import QoSDurabilityPolicy, QoSHistoryPolicy, QoSReliabilityPolicy
36 from rclpy.qos import QoSProfile
37 
38 
39 class NavTester(Node):
40 
41  def __init__(
42  self,
43  initial_pose: Pose,
44  goal_pose: Pose,
45  namespace: str = ''
46  ):
47  super().__init__(node_name='nav2_tester', namespace=namespace)
48  self.initial_pose_pub = self.create_publisher(PoseWithCovarianceStamped,
49  'initialpose', 10)
50  self.goal_pub = self.create_publisher(PoseStamped, 'goal_pose', 10)
51 
52  pose_qos = QoSProfile(
53  durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
54  reliability=QoSReliabilityPolicy.RELIABLE,
55  history=QoSHistoryPolicy.KEEP_LAST,
56  depth=1)
57 
58  self.model_pose_sub = self.create_subscription(PoseWithCovarianceStamped,
59  'amcl_pose', self.poseCallback, pose_qos)
60  self.initial_pose_received = False
61  self.initial_pose = initial_pose
62  self.goal_pose = goal_pose
63  self.action_client = ActionClient(self, NavigateToPose, 'navigate_to_pose')
64 
65  def info_msg(self, msg: str):
66  self.get_logger().info('\033[1;37;44m' + msg + '\033[0m')
67 
68  def warn_msg(self, msg: str):
69  self.get_logger().warn('\033[1;37;43m' + msg + '\033[0m')
70 
71  def error_msg(self, msg: str):
72  self.get_logger().error('\033[1;37;41m' + msg + '\033[0m')
73 
74  def setInitialPose(self):
75  msg = PoseWithCovarianceStamped()
76  msg.pose.pose = self.initial_pose
77  msg.header.frame_id = 'map'
78  self.info_msg('Publishing Initial Pose')
79  self.initial_pose_pub.publish(msg)
80  self.currentPose = self.initial_pose
81 
82  def getStampedPoseMsg(self, pose: Pose):
83  msg = PoseStamped()
84  msg.header.frame_id = 'map'
85  msg.pose = pose
86  return msg
87 
88  def publishGoalPose(self, goal_pose: Optional[Pose] = None):
89  self.goal_pose = goal_pose if goal_pose is not None else self.goal_pose
90  self.goal_pub.publish(self.getStampedPoseMsg(self.goal_pose))
91 
92  def runNavigateAction(self, goal_pose: Optional[Pose] = None):
93  # Sends a `NavToPose` action request and waits for completion
94  self.info_msg("Waiting for 'NavigateToPose' action server")
95  while not self.action_client.wait_for_server(timeout_sec=1.0):
96  self.info_msg("'NavigateToPose' action server not available, waiting...")
97 
98  self.goal_pose = goal_pose if goal_pose is not None else self.goal_pose
99  goal_msg = NavigateToPose.Goal()
100  goal_msg.pose = self.getStampedPoseMsg(self.goal_pose)
101 
102  self.info_msg('Sending goal request...')
103  send_goal_future = self.action_client.send_goal_async(goal_msg)
104 
105  rclpy.spin_until_future_complete(self, send_goal_future)
106  goal_handle = send_goal_future.result()
107 
108  if not goal_handle.accepted:
109  self.error_msg('Goal rejected')
110  return False
111 
112  self.info_msg('Goal accepted')
113  get_result_future = goal_handle.get_result_async()
114 
115  self.info_msg("Waiting for 'NavigateToPose' action to complete")
116  rclpy.spin_until_future_complete(self, get_result_future)
117  status = get_result_future.result().status
118  if status != GoalStatus.STATUS_ABORTED:
119  self.info_msg(f'Goal failed with status code: {status}')
120  return False
121 
122  self.info_msg('Goal failed, as expected!')
123  return True
124 
125  def poseCallback(self, msg):
126  self.info_msg('Received amcl_pose')
127  self.current_pose = msg.pose.pose
128  self.initial_pose_received = True
129 
130  def reachesGoal(self, timeout, distance):
131  goalReached = False
132  start_time = time.time()
133 
134  while not goalReached:
135  rclpy.spin_once(self, timeout_sec=1)
136  if self.distanceFromGoal() < distance:
137  goalReached = True
138  self.info_msg('*** GOAL REACHED ***')
139  return True
140  elif timeout is not None:
141  if (time.time() - start_time) > timeout:
142  self.error_msg('Robot timed out reaching its goal!')
143  return False
144 
145  def distanceFromGoal(self):
146  d_x = self.current_pose.position.x - self.goal_pose.position.x
147  d_y = self.current_pose.position.y - self.goal_pose.position.y
148  distance = math.sqrt(d_x * d_x + d_y * d_y)
149  self.info_msg(f'Distance from goal is: {distance}')
150  return distance
151 
152  def wait_for_node_active(self, node_name: str):
153  # Waits for the node within the tester namespace to become active
154  self.info_msg(f'Waiting for {node_name} to become active')
155  node_service = f'{node_name}/get_state'
156  state_client = self.create_client(GetState, node_service)
157  while not state_client.wait_for_service(timeout_sec=1.0):
158  self.info_msg(f'{node_service} service not available, waiting...')
159  req = GetState.Request() # empty request
160  state = 'UNKNOWN'
161  while (state != 'active'):
162  self.info_msg(f'Getting {node_name} state...')
163  future = state_client.call_async(req)
164  rclpy.spin_until_future_complete(self, future)
165  if future.result() is not None:
166  state = future.result().current_state.label
167  self.info_msg(f'Result of get_state: {state}')
168  else:
169  self.error_msg(f'Exception while calling service: {future.exception()!r}')
170  time.sleep(5)
171 
172  def shutdown(self):
173  self.info_msg('Shutting down')
174  self.action_client.destroy()
175 
176  transition_service = 'lifecycle_manager_navigation/manage_nodes'
177  mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
178  while not mgr_client.wait_for_service(timeout_sec=1.0):
179  self.info_msg(f'{transition_service} service not available, waiting...')
180 
181  req = ManageLifecycleNodes.Request()
182  req.command = ManageLifecycleNodes.Request().SHUTDOWN
183  future = mgr_client.call_async(req)
184  try:
185  self.info_msg('Shutting down navigation lifecycle manager...')
186  rclpy.spin_until_future_complete(self, future)
187  future.result()
188  self.info_msg('Shutting down navigation lifecycle manager complete.')
189  except Exception as e: # noqa: B902
190  self.error_msg(f'Service call failed {e!r}')
191  transition_service = 'lifecycle_manager_localization/manage_nodes'
192  mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
193  while not mgr_client.wait_for_service(timeout_sec=1.0):
194  self.info_msg(f'{transition_service} service not available, waiting...')
195 
196  req = ManageLifecycleNodes.Request()
197  req.command = ManageLifecycleNodes.Request().SHUTDOWN
198  future = mgr_client.call_async(req)
199  try:
200  self.info_msg('Shutting down localization lifecycle manager...')
201  rclpy.spin_until_future_complete(self, future)
202  future.result()
203  self.info_msg('Shutting down localization lifecycle manager complete')
204  except Exception as e: # noqa: B902
205  self.error_msg(f'Service call failed {e!r}')
206 
207  def wait_for_initial_pose(self):
208  self.initial_pose_received = False
209  while not self.initial_pose_received:
210  self.info_msg('Setting initial pose')
211  self.setInitialPose()
212  self.info_msg('Waiting for amcl_pose to be received')
213  rclpy.spin_once(self, timeout_sec=1)
214 
215 
216 def run_all_tests(robot_tester):
217  # set transforms to use_sim_time
218  result = True
219  if (result):
220  robot_tester.wait_for_node_active('amcl')
221  robot_tester.wait_for_initial_pose()
222  robot_tester.wait_for_node_active('bt_navigator')
223  result = robot_tester.runNavigateAction()
224 
225  # Add more tests here if desired
226 
227  if (result):
228  robot_tester.info_msg('Test PASSED')
229  else:
230  robot_tester.error_msg('Test FAILED')
231 
232  return result
233 
234 
235 def fwd_pose(x=0.0, y=0.0, z=0.01):
236  initial_pose = Pose()
237  initial_pose.position.x = x
238  initial_pose.position.y = y
239  initial_pose.position.z = z
240  initial_pose.orientation.x = 0.0
241  initial_pose.orientation.y = 0.0
242  initial_pose.orientation.z = 0.0
243  initial_pose.orientation.w = 1.0
244  return initial_pose
245 
246 
247 def get_testers(args):
248  testers = []
249 
250  if args.robot:
251  # Requested tester for one robot
252  init_x, init_y, final_x, final_y = args.robot[0]
253  tester = NavTester(
254  initial_pose=fwd_pose(float(init_x), float(init_y)),
255  goal_pose=fwd_pose(float(final_x), float(final_y)))
256  tester.info_msg(
257  'Starting tester, robot going from ' + init_x + ', ' + init_y +
258  ' to ' + final_x + ', ' + final_y + '.')
259  testers.append(tester)
260  return testers
261 
262  return testers
263 
264 
265 def main(argv=sys.argv[1:]):
266  # The robot(s) positions from the input arguments
267  parser = argparse.ArgumentParser(description='System-level navigation tester node')
268  group = parser.add_mutually_exclusive_group(required=True)
269  group.add_argument('-r', '--robot', action='append', nargs=4,
270  metavar=('init_x', 'init_y', 'final_x', 'final_y'),
271  help='The robot starting and final positions.')
272  group.add_argument('-rs', '--robots', action='append', nargs=5,
273  metavar=('name', 'init_x', 'init_y', 'final_x', 'final_y'),
274  help="The robot's namespace and starting and final positions. " +
275  'Repeating the argument for multiple robots is supported.')
276 
277  args, unknown = parser.parse_known_args()
278 
279  rclpy.init()
280 
281  # Create testers for each robot
282  testers = get_testers(args)
283 
284  # wait a few seconds to make sure entire stacks are up
285  time.sleep(10)
286 
287  for tester in testers:
288  passed = run_all_tests(tester)
289  if not passed:
290  break
291 
292  for tester in testers:
293  # stop and shutdown the nav stack to exit cleanly
294  tester.shutdown()
295 
296  testers[0].info_msg('Done Shutting Down.')
297 
298  if not passed:
299  testers[0].info_msg('Exiting failed')
300  exit(1)
301  else:
302  testers[0].info_msg('Exiting passed')
303  exit(0)
304 
305 
306 if __name__ == '__main__':
307  main()