Nav2 Navigation Stack - rolling  main
ROS 2 Navigation Stack
tester_node.py
1 #! /usr/bin/env python3
2 # Copyright 2018 Intel Corporation.
3 # Copyright 2020 Samsung Research America
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 
17 import argparse
18 import math
19 import sys
20 import time
21 from typing import Optional
22 
23 from action_msgs.msg import GoalStatus
24 from geometry_msgs.msg import Pose, PoseStamped, PoseWithCovarianceStamped
25 from lifecycle_msgs.srv import GetState
26 from nav2_msgs.action import NavigateToPose
27 from nav2_msgs.srv import ManageLifecycleNodes
28 import rclpy
29 from rclpy.action import ActionClient
30 from rclpy.client import Client
31 from rclpy.node import Node
32 from rclpy.qos import QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy
33 
34 
35 class NavTester(Node):
36 
37  def __init__(self, initial_pose: Pose, goal_pose: Pose, namespace: str = ''):
38  super().__init__(node_name='nav2_tester', namespace=namespace)
39  self.initial_pose_pub = self.create_publisher(
40  PoseWithCovarianceStamped, 'initialpose', 10
41  )
42  self.goal_pub = self.create_publisher(PoseStamped, 'goal_pose', 10)
43 
44  pose_qos = QoSProfile(
45  durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
46  reliability=QoSReliabilityPolicy.RELIABLE,
47  history=QoSHistoryPolicy.KEEP_LAST,
48  depth=1,
49  )
50 
51  self.model_pose_sub = self.create_subscription(
52  PoseWithCovarianceStamped, 'amcl_pose', self.poseCallback, pose_qos
53  )
54  self.initial_pose_received = False
55  self.initial_pose = initial_pose
56  self.goal_pose = goal_pose
57  self.action_client: ActionClient[
58  NavigateToPose.Goal,
59  NavigateToPose.Result,
60  NavigateToPose.Feedback
61  ] = ActionClient(self, NavigateToPose, 'navigate_to_pose')
62 
63  def info_msg(self, msg: str) -> None:
64  self.get_logger().info('\033[1;37;44m' + msg + '\033[0m')
65 
66  def warn_msg(self, msg: str) -> None:
67  self.get_logger().warning('\033[1;37;43m' + msg + '\033[0m')
68 
69  def error_msg(self, msg: str) -> None:
70  self.get_logger().error('\033[1;37;41m' + msg + '\033[0m')
71 
72  def setInitialPose(self) -> None:
73  msg = PoseWithCovarianceStamped()
74  msg.pose.pose = self.initial_pose
75  msg.header.frame_id = 'map'
76  self.info_msg('Publishing Initial Pose')
77  self.initial_pose_pub.publish(msg)
78  self.currentPose = self.initial_pose
79 
80  def getStampedPoseMsg(self, pose: Pose) -> PoseStamped:
81  msg = PoseStamped()
82  msg.header.frame_id = 'map'
83  msg.pose = pose
84  return msg
85 
86  def publishGoalPose(self, goal_pose: Optional[Pose] = None) -> None:
87  self.goal_pose = goal_pose if goal_pose is not None else self.goal_pose
88  self.goal_pub.publish(self.getStampedPoseMsg(self.goal_pose))
89 
90  def runNavigateAction(self, goal_pose: Optional[Pose] = None) -> bool:
91  # Sends a `NavToPose` action request and waits for completion
92  self.info_msg("Waiting for 'NavigateToPose' action server")
93  while not self.action_client.wait_for_server(timeout_sec=1.0):
94  self.info_msg("'NavigateToPose' action server not available, waiting...")
95 
96  self.goal_pose = goal_pose if goal_pose is not None else self.goal_pose
97  goal_msg = NavigateToPose.Goal()
98  goal_msg.pose = self.getStampedPoseMsg(self.goal_pose)
99 
100  self.info_msg('Sending goal request...')
101  send_goal_future = self.action_client.send_goal_async(goal_msg)
102 
103  rclpy.spin_until_future_complete(self, send_goal_future)
104  goal_handle = send_goal_future.result()
105 
106  if not goal_handle or not goal_handle.accepted:
107  self.error_msg('Goal rejected')
108  return False
109 
110  self.info_msg('Goal accepted')
111  get_result_future = goal_handle.get_result_async()
112 
113  self.info_msg("Waiting for 'NavigateToPose' action to complete")
114  rclpy.spin_until_future_complete(self, get_result_future)
115  status = get_result_future.result().status # type: ignore[union-attr]
116  if status != GoalStatus.STATUS_ABORTED:
117  self.info_msg(f'Goal failed with status code: {status}')
118  return False
119 
120  self.info_msg('Goal failed, as expected!')
121  return True
122 
123  def poseCallback(self, msg: PoseWithCovarianceStamped) -> None:
124  self.info_msg('Received amcl_pose')
125  self.current_pose = msg.pose.pose
126  self.initial_pose_received = True
127 
128  def reachesGoal(self, timeout: float, distance: float) -> bool:
129  goalReached = False
130  start_time = time.time()
131 
132  while not goalReached:
133  rclpy.spin_once(self, timeout_sec=1)
134  if self.distanceFromGoal() < distance:
135  goalReached = True
136  self.info_msg('*** GOAL REACHED ***')
137  return True
138  elif timeout is not None:
139  if (time.time() - start_time) > timeout:
140  self.error_msg('Robot timed out reaching its goal!')
141  return False
142 
143  self.info_msg('Robot reached its goal!')
144  return True
145 
146  def distanceFromGoal(self) -> float:
147  d_x = self.current_pose.position.x - self.goal_pose.position.x
148  d_y = self.current_pose.position.y - self.goal_pose.position.y
149  distance = math.sqrt(d_x * d_x + d_y * d_y)
150  self.info_msg(f'Distance from goal is: {distance}')
151  return distance
152 
153  def wait_for_node_active(self, node_name: str) -> None:
154  # Waits for the node within the tester namespace to become active
155  self.info_msg(f'Waiting for {node_name} to become active')
156  node_service = f'{node_name}/get_state'
157  state_client: Client[GetState.Request, GetState.Response] = \
158  self.create_client(GetState, node_service)
159  while not state_client.wait_for_service(timeout_sec=1.0):
160  self.info_msg(f'{node_service} service not available, waiting...')
161  req = GetState.Request() # empty request
162  state = 'UNKNOWN'
163  while state != 'active':
164  self.info_msg(f'Getting {node_name} state...')
165  future = state_client.call_async(req)
166  rclpy.spin_until_future_complete(self, future)
167  if future.result() is not None:
168  state = future.result().current_state.label # type: ignore[union-attr]
169  self.info_msg(f'Result of get_state: {state}')
170  else:
171  self.error_msg(
172  f'Exception while calling service: {future.exception()!r}'
173  )
174  time.sleep(5)
175 
176  def shutdown(self) -> None:
177  self.info_msg('Shutting down')
178  self.action_client.destroy()
179 
180  transition_service = 'lifecycle_manager_navigation/manage_nodes'
181  mgr_client: Client[ManageLifecycleNodes.Request, ManageLifecycleNodes.Response] = \
182  self.create_client(ManageLifecycleNodes, transition_service)
183  while not mgr_client.wait_for_service(timeout_sec=1.0):
184  self.info_msg(f'{transition_service} service not available, waiting...')
185 
186  req = ManageLifecycleNodes.Request()
187  req.command = ManageLifecycleNodes.Request().SHUTDOWN
188  future = mgr_client.call_async(req)
189  try:
190  self.info_msg('Shutting down navigation lifecycle manager...')
191  rclpy.spin_until_future_complete(self, future)
192  future.result()
193  self.info_msg('Shutting down navigation lifecycle manager complete.')
194  except Exception as e: # noqa: B902
195  self.error_msg(f'Service call failed {e!r}')
196  transition_service = 'lifecycle_manager_localization/manage_nodes'
197  mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
198  while not mgr_client.wait_for_service(timeout_sec=1.0):
199  self.info_msg(f'{transition_service} service not available, waiting...')
200 
201  req = ManageLifecycleNodes.Request()
202  req.command = ManageLifecycleNodes.Request().SHUTDOWN
203  future = mgr_client.call_async(req)
204  try:
205  self.info_msg('Shutting down localization lifecycle manager...')
206  rclpy.spin_until_future_complete(self, future)
207  future.result()
208  self.info_msg('Shutting down localization lifecycle manager complete')
209  except Exception as e: # noqa: B902
210  self.error_msg(f'Service call failed {e!r}')
211 
212  def wait_for_initial_pose(self) -> None:
213  self.initial_pose_received = False
214  while not self.initial_pose_received:
215  self.info_msg('Setting initial pose')
216  self.setInitialPose()
217  self.info_msg('Waiting for amcl_pose to be received')
218  rclpy.spin_once(self, timeout_sec=1)
219 
220 
221 def run_all_tests(robot_tester: NavTester) -> bool:
222  # set transforms to use_sim_time
223  result = True
224  if result:
225  robot_tester.wait_for_node_active('amcl')
226  robot_tester.wait_for_initial_pose()
227  robot_tester.wait_for_node_active('bt_navigator')
228  result = robot_tester.runNavigateAction()
229 
230  # Add more tests here if desired
231 
232  if result:
233  robot_tester.info_msg('Test PASSED')
234  else:
235  robot_tester.error_msg('Test FAILED')
236 
237  return result
238 
239 
240 def fwd_pose(x: float = 0.0, y: float = 0.0, z: float = 0.01) -> Pose:
241  initial_pose = Pose()
242  initial_pose.position.x = x
243  initial_pose.position.y = y
244  initial_pose.position.z = z
245  initial_pose.orientation.x = 0.0
246  initial_pose.orientation.y = 0.0
247  initial_pose.orientation.z = 0.0
248  initial_pose.orientation.w = 1.0
249  return initial_pose
250 
251 
252 def get_testers(args: argparse.Namespace) -> list[NavTester]:
253  testers = []
254 
255  if args.robot:
256  # Requested tester for one robot
257  init_x, init_y, final_x, final_y = args.robot[0]
258  tester = NavTester(
259  initial_pose=fwd_pose(float(init_x), float(init_y)),
260  goal_pose=fwd_pose(float(final_x), float(final_y)),
261  )
262  tester.info_msg(
263  'Starting tester, robot going from '
264  + init_x
265  + ', '
266  + init_y
267  + ' to '
268  + final_x
269  + ', '
270  + final_y
271  + '.'
272  )
273  testers.append(tester)
274  return testers
275 
276  return testers
277 
278 
279 def main(argv: list[str] = sys.argv[1:]): # type: ignore[no-untyped-def]
280  # The robot(s) positions from the input arguments
281  parser = argparse.ArgumentParser(description='System-level navigation tester node')
282  group = parser.add_mutually_exclusive_group(required=True)
283  group.add_argument(
284  '-r',
285  '--robot',
286  action='append',
287  nargs=4,
288  metavar=('init_x', 'init_y', 'final_x', 'final_y'),
289  help='The robot starting and final positions.',
290  )
291  group.add_argument(
292  '-rs',
293  '--robots',
294  action='append',
295  nargs=5,
296  metavar=('name', 'init_x', 'init_y', 'final_x', 'final_y'),
297  help="The robot's namespace and starting and final positions. "
298  + 'Repeating the argument for multiple robots is supported.',
299  )
300 
301  args, unknown = parser.parse_known_args()
302 
303  rclpy.init()
304 
305  # Create testers for each robot
306  testers = get_testers(args)
307 
308  # wait a few seconds to make sure entire stacks are up
309  time.sleep(10)
310 
311  for tester in testers:
312  passed = run_all_tests(tester)
313  if not passed:
314  break
315 
316  for tester in testers:
317  # stop and shutdown the nav stack to exit cleanly
318  tester.shutdown()
319 
320  testers[0].info_msg('Done Shutting Down.')
321 
322  if not passed:
323  testers[0].info_msg('Exiting failed')
324  exit(1)
325  else:
326  testers[0].info_msg('Exiting passed')
327  exit(0)
328 
329 
330 if __name__ == '__main__':
331  main()