Nav2 Navigation Stack - kilted  kilted
ROS 2 Navigation Stack
tester_node.py
1 #! /usr/bin/env python3
2 # Copyright 2018 Intel Corporation.
3 # Copyright 2020 Samsung Research America
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 
17 import argparse
18 import math
19 import sys
20 import time
21 from typing import Optional
22 
23 from action_msgs.msg import GoalStatus
24 from geometry_msgs.msg import Pose, PoseStamped, PoseWithCovarianceStamped
25 from lifecycle_msgs.srv import GetState
26 from nav2_msgs.action import NavigateToPose
27 from nav2_msgs.srv import ManageLifecycleNodes
28 import rclpy
29 from rclpy.action import ActionClient # type: ignore[attr-defined]
30 from rclpy.node import Node
31 from rclpy.qos import QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy
32 
33 
34 class NavTester(Node):
35 
36  def __init__(self, initial_pose: Pose, goal_pose: Pose, namespace: str = ''):
37  super().__init__(node_name='nav2_tester', namespace=namespace)
38  self.initial_pose_pub = self.create_publisher(
39  PoseWithCovarianceStamped, 'initialpose', 10
40  )
41  self.goal_pub = self.create_publisher(PoseStamped, 'goal_pose', 10)
42 
43  pose_qos = QoSProfile(
44  durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
45  reliability=QoSReliabilityPolicy.RELIABLE,
46  history=QoSHistoryPolicy.KEEP_LAST,
47  depth=1,
48  )
49 
50  self.model_pose_sub = self.create_subscription(
51  PoseWithCovarianceStamped, 'amcl_pose', self.poseCallback, pose_qos
52  )
53  self.initial_pose_received = False
54  self.initial_pose = initial_pose
55  self.goal_pose = goal_pose
56  self.action_client = ActionClient(self, NavigateToPose, 'navigate_to_pose')
57 
58  def info_msg(self, msg: str) -> None:
59  self.get_logger().info('\033[1;37;44m' + msg + '\033[0m')
60 
61  def warn_msg(self, msg: str) -> None:
62  self.get_logger().warn('\033[1;37;43m' + msg + '\033[0m')
63 
64  def error_msg(self, msg: str) -> None:
65  self.get_logger().error('\033[1;37;41m' + msg + '\033[0m')
66 
67  def setInitialPose(self) -> None:
68  msg = PoseWithCovarianceStamped()
69  msg.pose.pose = self.initial_pose
70  msg.header.frame_id = 'map'
71  self.info_msg('Publishing Initial Pose')
72  self.initial_pose_pub.publish(msg)
73  self.currentPose = self.initial_pose
74 
75  def getStampedPoseMsg(self, pose: Pose) -> PoseStamped:
76  msg = PoseStamped()
77  msg.header.frame_id = 'map'
78  msg.pose = pose
79  return msg
80 
81  def publishGoalPose(self, goal_pose: Optional[Pose] = None) -> None:
82  self.goal_pose = goal_pose if goal_pose is not None else self.goal_pose
83  self.goal_pub.publish(self.getStampedPoseMsg(self.goal_pose))
84 
85  def runNavigateAction(self, goal_pose: Optional[Pose] = None) -> bool:
86  # Sends a `NavToPose` action request and waits for completion
87  self.info_msg("Waiting for 'NavigateToPose' action server")
88  while not self.action_client.wait_for_server(timeout_sec=1.0):
89  self.info_msg("'NavigateToPose' action server not available, waiting...")
90 
91  self.goal_pose = goal_pose if goal_pose is not None else self.goal_pose
92  goal_msg = NavigateToPose.Goal()
93  goal_msg.pose = self.getStampedPoseMsg(self.goal_pose)
94 
95  self.info_msg('Sending goal request...')
96  send_goal_future = self.action_client.send_goal_async(goal_msg)
97 
98  rclpy.spin_until_future_complete(self, send_goal_future)
99  goal_handle = send_goal_future.result()
100 
101  if not goal_handle or not goal_handle.accepted:
102  self.error_msg('Goal rejected')
103  return False
104 
105  self.info_msg('Goal accepted')
106  get_result_future = goal_handle.get_result_async()
107 
108  self.info_msg("Waiting for 'NavigateToPose' action to complete")
109  rclpy.spin_until_future_complete(self, get_result_future)
110  status = get_result_future.result().status # type: ignore[union-attr]
111  if status != GoalStatus.STATUS_ABORTED:
112  self.info_msg(f'Goal failed with status code: {status}')
113  return False
114 
115  self.info_msg('Goal failed, as expected!')
116  return True
117 
118  def poseCallback(self, msg: PoseWithCovarianceStamped) -> None:
119  self.info_msg('Received amcl_pose')
120  self.current_pose = msg.pose.pose
121  self.initial_pose_received = True
122 
123  def reachesGoal(self, timeout: float, distance: float) -> bool:
124  goalReached = False
125  start_time = time.time()
126 
127  while not goalReached:
128  rclpy.spin_once(self, timeout_sec=1)
129  if self.distanceFromGoal() < distance:
130  goalReached = True
131  self.info_msg('*** GOAL REACHED ***')
132  return True
133  elif timeout is not None:
134  if (time.time() - start_time) > timeout:
135  self.error_msg('Robot timed out reaching its goal!')
136  return False
137 
138  self.info_msg('Robot reached its goal!')
139  return True
140 
141  def distanceFromGoal(self) -> float:
142  d_x = self.current_pose.position.x - self.goal_pose.position.x
143  d_y = self.current_pose.position.y - self.goal_pose.position.y
144  distance = math.sqrt(d_x * d_x + d_y * d_y)
145  self.info_msg(f'Distance from goal is: {distance}')
146  return distance
147 
148  def wait_for_node_active(self, node_name: str) -> None:
149  # Waits for the node within the tester namespace to become active
150  self.info_msg(f'Waiting for {node_name} to become active')
151  node_service = f'{node_name}/get_state'
152  state_client = self.create_client(GetState, node_service)
153  while not state_client.wait_for_service(timeout_sec=1.0):
154  self.info_msg(f'{node_service} service not available, waiting...')
155  req = GetState.Request() # empty request
156  state = 'UNKNOWN'
157  while state != 'active':
158  self.info_msg(f'Getting {node_name} state...')
159  future = state_client.call_async(req)
160  rclpy.spin_until_future_complete(self, future)
161  if future.result() is not None:
162  state = future.result().current_state.label # type: ignore[union-attr]
163  self.info_msg(f'Result of get_state: {state}')
164  else:
165  self.error_msg(
166  f'Exception while calling service: {future.exception()!r}'
167  )
168  time.sleep(5)
169 
170  def shutdown(self) -> None:
171  self.info_msg('Shutting down')
172  self.action_client.destroy()
173 
174  transition_service = 'lifecycle_manager_navigation/manage_nodes'
175  mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
176  while not mgr_client.wait_for_service(timeout_sec=1.0):
177  self.info_msg(f'{transition_service} service not available, waiting...')
178 
179  req = ManageLifecycleNodes.Request()
180  req.command = ManageLifecycleNodes.Request().SHUTDOWN
181  future = mgr_client.call_async(req)
182  try:
183  self.info_msg('Shutting down navigation lifecycle manager...')
184  rclpy.spin_until_future_complete(self, future)
185  future.result()
186  self.info_msg('Shutting down navigation lifecycle manager complete.')
187  except Exception as e: # noqa: B902
188  self.error_msg(f'Service call failed {e!r}')
189  transition_service = 'lifecycle_manager_localization/manage_nodes'
190  mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
191  while not mgr_client.wait_for_service(timeout_sec=1.0):
192  self.info_msg(f'{transition_service} service not available, waiting...')
193 
194  req = ManageLifecycleNodes.Request()
195  req.command = ManageLifecycleNodes.Request().SHUTDOWN
196  future = mgr_client.call_async(req)
197  try:
198  self.info_msg('Shutting down localization lifecycle manager...')
199  rclpy.spin_until_future_complete(self, future)
200  future.result()
201  self.info_msg('Shutting down localization lifecycle manager complete')
202  except Exception as e: # noqa: B902
203  self.error_msg(f'Service call failed {e!r}')
204 
205  def wait_for_initial_pose(self) -> None:
206  self.initial_pose_received = False
207  while not self.initial_pose_received:
208  self.info_msg('Setting initial pose')
209  self.setInitialPose()
210  self.info_msg('Waiting for amcl_pose to be received')
211  rclpy.spin_once(self, timeout_sec=1)
212 
213 
214 def run_all_tests(robot_tester: NavTester) -> bool:
215  # set transforms to use_sim_time
216  result = True
217  if result:
218  robot_tester.wait_for_node_active('amcl')
219  robot_tester.wait_for_initial_pose()
220  robot_tester.wait_for_node_active('bt_navigator')
221  result = robot_tester.runNavigateAction()
222 
223  # Add more tests here if desired
224 
225  if result:
226  robot_tester.info_msg('Test PASSED')
227  else:
228  robot_tester.error_msg('Test FAILED')
229 
230  return result
231 
232 
233 def fwd_pose(x: float = 0.0, y: float = 0.0, z: float = 0.01) -> Pose:
234  initial_pose = Pose()
235  initial_pose.position.x = x
236  initial_pose.position.y = y
237  initial_pose.position.z = z
238  initial_pose.orientation.x = 0.0
239  initial_pose.orientation.y = 0.0
240  initial_pose.orientation.z = 0.0
241  initial_pose.orientation.w = 1.0
242  return initial_pose
243 
244 
245 def get_testers(args: argparse.Namespace) -> list[NavTester]:
246  testers = []
247 
248  if args.robot:
249  # Requested tester for one robot
250  init_x, init_y, final_x, final_y = args.robot[0]
251  tester = NavTester(
252  initial_pose=fwd_pose(float(init_x), float(init_y)),
253  goal_pose=fwd_pose(float(final_x), float(final_y)),
254  )
255  tester.info_msg(
256  'Starting tester, robot going from '
257  + init_x
258  + ', '
259  + init_y
260  + ' to '
261  + final_x
262  + ', '
263  + final_y
264  + '.'
265  )
266  testers.append(tester)
267  return testers
268 
269  return testers
270 
271 
272 def main(argv: list[str] = sys.argv[1:]): # type: ignore[no-untyped-def]
273  # The robot(s) positions from the input arguments
274  parser = argparse.ArgumentParser(description='System-level navigation tester node')
275  group = parser.add_mutually_exclusive_group(required=True)
276  group.add_argument(
277  '-r',
278  '--robot',
279  action='append',
280  nargs=4,
281  metavar=('init_x', 'init_y', 'final_x', 'final_y'),
282  help='The robot starting and final positions.',
283  )
284  group.add_argument(
285  '-rs',
286  '--robots',
287  action='append',
288  nargs=5,
289  metavar=('name', 'init_x', 'init_y', 'final_x', 'final_y'),
290  help="The robot's namespace and starting and final positions. "
291  + 'Repeating the argument for multiple robots is supported.',
292  )
293 
294  args, unknown = parser.parse_known_args()
295 
296  rclpy.init()
297 
298  # Create testers for each robot
299  testers = get_testers(args)
300 
301  # wait a few seconds to make sure entire stacks are up
302  time.sleep(10)
303 
304  for tester in testers:
305  passed = run_all_tests(tester)
306  if not passed:
307  break
308 
309  for tester in testers:
310  # stop and shutdown the nav stack to exit cleanly
311  tester.shutdown()
312 
313  testers[0].info_msg('Done Shutting Down.')
314 
315  if not passed:
316  testers[0].info_msg('Exiting failed')
317  exit(1)
318  else:
319  testers[0].info_msg('Exiting passed')
320  exit(0)
321 
322 
323 if __name__ == '__main__':
324  main()