Nav2 Navigation Stack - jazzy  jazzy
ROS 2 Navigation Stack
tester_node.py
1 #! /usr/bin/env python3
2 # Copyright 2018 Intel Corporation.
3 # Copyright 2020 Samsung Research America
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 
17 import argparse
18 import math
19 import sys
20 import time
21 from typing import Optional
22 
23 from action_msgs.msg import GoalStatus
24 from geometry_msgs.msg import Pose
25 from geometry_msgs.msg import PoseStamped
26 from geometry_msgs.msg import PoseWithCovarianceStamped
27 from lifecycle_msgs.srv import GetState
28 from nav2_msgs.action import NavigateToPose
29 from nav2_msgs.srv import ManageLifecycleNodes
30 
31 import rclpy
32 
33 from rclpy.action import ActionClient
34 from rclpy.node import Node
35 from rclpy.qos import QoSDurabilityPolicy, QoSHistoryPolicy, QoSReliabilityPolicy
36 from rclpy.qos import QoSProfile
37 
38 
39 class NavTester(Node):
40 
41  def __init__(self, initial_pose: Pose, goal_pose: Pose, namespace: str = ''):
42  super().__init__(node_name='nav2_tester', namespace=namespace)
43  self.initial_pose_pub = self.create_publisher(
44  PoseWithCovarianceStamped, 'initialpose', 10
45  )
46  self.goal_pub = self.create_publisher(PoseStamped, 'goal_pose', 10)
47 
48  pose_qos = QoSProfile(
49  durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
50  reliability=QoSReliabilityPolicy.RELIABLE,
51  history=QoSHistoryPolicy.KEEP_LAST,
52  depth=1,
53  )
54 
55  self.model_pose_sub = self.create_subscription(
56  PoseWithCovarianceStamped, 'amcl_pose', self.poseCallback, pose_qos
57  )
58  self.initial_pose_received = False
59  self.initial_pose = initial_pose
60  self.goal_pose = goal_pose
61  self.action_client = ActionClient(self, NavigateToPose, 'navigate_to_pose')
62 
63  def info_msg(self, msg: str):
64  self.get_logger().info('\033[1;37;44m' + msg + '\033[0m')
65 
66  def warn_msg(self, msg: str):
67  self.get_logger().warn('\033[1;37;43m' + msg + '\033[0m')
68 
69  def error_msg(self, msg: str):
70  self.get_logger().error('\033[1;37;41m' + msg + '\033[0m')
71 
72  def setInitialPose(self):
73  msg = PoseWithCovarianceStamped()
74  msg.pose.pose = self.initial_pose
75  msg.header.frame_id = 'map'
76  self.info_msg('Publishing Initial Pose')
77  self.initial_pose_pub.publish(msg)
78  self.currentPose = self.initial_pose
79 
80  def getStampedPoseMsg(self, pose: Pose):
81  msg = PoseStamped()
82  msg.header.frame_id = 'map'
83  msg.pose = pose
84  return msg
85 
86  def publishGoalPose(self, goal_pose: Optional[Pose] = None):
87  self.goal_pose = goal_pose if goal_pose is not None else self.goal_pose
88  self.goal_pub.publish(self.getStampedPoseMsg(self.goal_pose))
89 
90  def runNavigateAction(self, goal_pose: Optional[Pose] = None):
91  # Sends a `NavToPose` action request and waits for completion
92  self.info_msg("Waiting for 'NavigateToPose' action server")
93  while not self.action_client.wait_for_server(timeout_sec=1.0):
94  self.info_msg("'NavigateToPose' action server not available, waiting...")
95 
96  self.goal_pose = goal_pose if goal_pose is not None else self.goal_pose
97  goal_msg = NavigateToPose.Goal()
98  goal_msg.pose = self.getStampedPoseMsg(self.goal_pose)
99 
100  self.info_msg('Sending goal request...')
101  send_goal_future = self.action_client.send_goal_async(goal_msg)
102 
103  rclpy.spin_until_future_complete(self, send_goal_future)
104  goal_handle = send_goal_future.result()
105 
106  if not goal_handle.accepted:
107  self.error_msg('Goal rejected')
108  return False
109 
110  self.info_msg('Goal accepted')
111  get_result_future = goal_handle.get_result_async()
112 
113  self.info_msg("Waiting for 'NavigateToPose' action to complete")
114  rclpy.spin_until_future_complete(self, get_result_future)
115  status = get_result_future.result().status
116  if status != GoalStatus.STATUS_ABORTED:
117  self.info_msg(f'Goal failed with status code: {status}')
118  return False
119 
120  self.info_msg('Goal failed, as expected!')
121  return True
122 
123  def poseCallback(self, msg):
124  self.info_msg('Received amcl_pose')
125  self.current_pose = msg.pose.pose
126  self.initial_pose_received = True
127 
128  def reachesGoal(self, timeout, distance):
129  goalReached = False
130  start_time = time.time()
131 
132  while not goalReached:
133  rclpy.spin_once(self, timeout_sec=1)
134  if self.distanceFromGoal() < distance:
135  goalReached = True
136  self.info_msg('*** GOAL REACHED ***')
137  return True
138  elif timeout is not None:
139  if (time.time() - start_time) > timeout:
140  self.error_msg('Robot timed out reaching its goal!')
141  return False
142 
143  def distanceFromGoal(self):
144  d_x = self.current_pose.position.x - self.goal_pose.position.x
145  d_y = self.current_pose.position.y - self.goal_pose.position.y
146  distance = math.sqrt(d_x * d_x + d_y * d_y)
147  self.info_msg(f'Distance from goal is: {distance}')
148  return distance
149 
150  def wait_for_node_active(self, node_name: str):
151  # Waits for the node within the tester namespace to become active
152  self.info_msg(f'Waiting for {node_name} to become active')
153  node_service = f'{node_name}/get_state'
154  state_client = self.create_client(GetState, node_service)
155  while not state_client.wait_for_service(timeout_sec=1.0):
156  self.info_msg(f'{node_service} service not available, waiting...')
157  req = GetState.Request() # empty request
158  state = 'UNKNOWN'
159  while state != 'active':
160  self.info_msg(f'Getting {node_name} state...')
161  future = state_client.call_async(req)
162  rclpy.spin_until_future_complete(self, future)
163  if future.result() is not None:
164  state = future.result().current_state.label
165  self.info_msg(f'Result of get_state: {state}')
166  else:
167  self.error_msg(
168  f'Exception while calling service: {future.exception()!r}'
169  )
170  time.sleep(5)
171 
172  def shutdown(self):
173  self.info_msg('Shutting down')
174  self.action_client.destroy()
175 
176  transition_service = 'lifecycle_manager_navigation/manage_nodes'
177  mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
178  while not mgr_client.wait_for_service(timeout_sec=1.0):
179  self.info_msg(f'{transition_service} service not available, waiting...')
180 
181  req = ManageLifecycleNodes.Request()
182  req.command = ManageLifecycleNodes.Request().SHUTDOWN
183  future = mgr_client.call_async(req)
184  try:
185  self.info_msg('Shutting down navigation lifecycle manager...')
186  rclpy.spin_until_future_complete(self, future)
187  future.result()
188  self.info_msg('Shutting down navigation lifecycle manager complete.')
189  except Exception as e: # noqa: B902
190  self.error_msg(f'Service call failed {e!r}')
191  transition_service = 'lifecycle_manager_localization/manage_nodes'
192  mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
193  while not mgr_client.wait_for_service(timeout_sec=1.0):
194  self.info_msg(f'{transition_service} service not available, waiting...')
195 
196  req = ManageLifecycleNodes.Request()
197  req.command = ManageLifecycleNodes.Request().SHUTDOWN
198  future = mgr_client.call_async(req)
199  try:
200  self.info_msg('Shutting down localization lifecycle manager...')
201  rclpy.spin_until_future_complete(self, future)
202  future.result()
203  self.info_msg('Shutting down localization lifecycle manager complete')
204  except Exception as e: # noqa: B902
205  self.error_msg(f'Service call failed {e!r}')
206 
207  def wait_for_initial_pose(self):
208  self.initial_pose_received = False
209  while not self.initial_pose_received:
210  self.info_msg('Setting initial pose')
211  self.setInitialPose()
212  self.info_msg('Waiting for amcl_pose to be received')
213  rclpy.spin_once(self, timeout_sec=1)
214 
215 
216 def run_all_tests(robot_tester):
217  # set transforms to use_sim_time
218  result = True
219  if result:
220  robot_tester.wait_for_node_active('amcl')
221  robot_tester.wait_for_initial_pose()
222  robot_tester.wait_for_node_active('bt_navigator')
223  result = robot_tester.runNavigateAction()
224 
225  # Add more tests here if desired
226 
227  if result:
228  robot_tester.info_msg('Test PASSED')
229  else:
230  robot_tester.error_msg('Test FAILED')
231 
232  return result
233 
234 
235 def fwd_pose(x=0.0, y=0.0, z=0.01):
236  initial_pose = Pose()
237  initial_pose.position.x = x
238  initial_pose.position.y = y
239  initial_pose.position.z = z
240  initial_pose.orientation.x = 0.0
241  initial_pose.orientation.y = 0.0
242  initial_pose.orientation.z = 0.0
243  initial_pose.orientation.w = 1.0
244  return initial_pose
245 
246 
247 def get_testers(args):
248  testers = []
249 
250  if args.robot:
251  # Requested tester for one robot
252  init_x, init_y, final_x, final_y = args.robot[0]
253  tester = NavTester(
254  initial_pose=fwd_pose(float(init_x), float(init_y)),
255  goal_pose=fwd_pose(float(final_x), float(final_y)),
256  )
257  tester.info_msg(
258  'Starting tester, robot going from '
259  + init_x
260  + ', '
261  + init_y
262  + ' to '
263  + final_x
264  + ', '
265  + final_y
266  + '.'
267  )
268  testers.append(tester)
269  return testers
270 
271  return testers
272 
273 
274 def main(argv=sys.argv[1:]):
275  # The robot(s) positions from the input arguments
276  parser = argparse.ArgumentParser(description='System-level navigation tester node')
277  group = parser.add_mutually_exclusive_group(required=True)
278  group.add_argument(
279  '-r',
280  '--robot',
281  action='append',
282  nargs=4,
283  metavar=('init_x', 'init_y', 'final_x', 'final_y'),
284  help='The robot starting and final positions.',
285  )
286  group.add_argument(
287  '-rs',
288  '--robots',
289  action='append',
290  nargs=5,
291  metavar=('name', 'init_x', 'init_y', 'final_x', 'final_y'),
292  help="The robot's namespace and starting and final positions. "
293  + 'Repeating the argument for multiple robots is supported.',
294  )
295 
296  args, unknown = parser.parse_known_args()
297 
298  rclpy.init()
299 
300  # Create testers for each robot
301  testers = get_testers(args)
302 
303  # wait a few seconds to make sure entire stacks are up
304  time.sleep(10)
305 
306  for tester in testers:
307  passed = run_all_tests(tester)
308  if not passed:
309  break
310 
311  for tester in testers:
312  # stop and shutdown the nav stack to exit cleanly
313  tester.shutdown()
314 
315  testers[0].info_msg('Done Shutting Down.')
316 
317  if not passed:
318  testers[0].info_msg('Exiting failed')
319  exit(1)
320  else:
321  testers[0].info_msg('Exiting passed')
322  exit(0)
323 
324 
325 if __name__ == '__main__':
326  main()