Nav2 Navigation Stack - jazzy  jazzy
ROS 2 Navigation Stack
tester_node.py
1 #!/usr/bin/env python3
2 
3 # Copyright (c) 2018 Intel Corporation.
4 # Copyright (c) 2020 Samsung Research Russia
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 
18 import argparse
19 from enum import Enum
20 import math
21 import sys
22 import time
23 from typing import Optional
24 
25 from action_msgs.msg import GoalStatus
26 from geometry_msgs.msg import Pose
27 from geometry_msgs.msg import PoseStamped
28 from geometry_msgs.msg import PoseWithCovarianceStamped
29 from lifecycle_msgs.srv import GetState
30 from nav2_msgs.action import NavigateToPose
31 from nav2_msgs.msg import SpeedLimit
32 from nav2_msgs.srv import ManageLifecycleNodes
33 from nav_msgs.msg import OccupancyGrid
34 from nav_msgs.msg import Path
35 
36 import rclpy
37 from rclpy.action import ActionClient
38 from rclpy.node import Node
39 from rclpy.qos import QoSDurabilityPolicy, QoSHistoryPolicy, QoSReliabilityPolicy
40 from rclpy.qos import QoSProfile
41 from sensor_msgs.msg import PointCloud2
42 
43 
44 class TestType(Enum):
45  KEEPOUT = 0
46  SPEED = 1
47 
48 
49 class FilterMask:
50 
51  def __init__(self, filter_mask: OccupancyGrid):
52  self.filter_maskfilter_mask = filter_mask
53 
54  # Converts world coordinates into filter mask map coordinate.
55  # Returns filter mask map coordinates or (-1, -1) in case
56  # if world coordinates are out of mask bounds.
57  def worldToMap(self, wx: float, wy: float):
58  origin_x = self.filter_maskfilter_mask.info.origin.position.x
59  origin_y = self.filter_maskfilter_mask.info.origin.position.y
60  size_x = self.filter_maskfilter_mask.info.width
61  size_y = self.filter_maskfilter_mask.info.height
62  resolution = self.filter_maskfilter_mask.info.resolution
63 
64  if wx < origin_x or wy < origin_y:
65  return -1, -1
66 
67  mx = int((wx - origin_x) / resolution)
68  my = int((wy - origin_y) / resolution)
69 
70  if mx < size_x and my < size_y:
71  return mx, my
72 
73  return -1, -1
74 
75  # Gets filter_mask[mx, my] value
76  def getValue(self, mx, my):
77  size_x = self.filter_maskfilter_mask.info.width
78  return self.filter_maskfilter_mask.data[mx + my * size_x]
79 
80 
81 class NavTester(Node):
82  def __init__(
83  self,
84  test_type: TestType,
85  initial_pose: Pose,
86  goal_pose: Pose,
87  namespace: str = '',
88  ):
89  super().__init__(node_name='nav2_tester', namespace=namespace)
90  self.initial_pose_pubinitial_pose_pub = self.create_publisher(
91  PoseWithCovarianceStamped, 'initialpose', 10
92  )
93  self.goal_pubgoal_pub = self.create_publisher(PoseStamped, 'goal_pose', 10)
94 
95  transient_local_qos = QoSProfile(
96  durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
97  reliability=QoSReliabilityPolicy.RELIABLE,
98  history=QoSHistoryPolicy.KEEP_LAST,
99  depth=1,
100  )
101 
102  volatile_qos = QoSProfile(
103  durability=QoSDurabilityPolicy.RMW_QOS_POLICY_DURABILITY_VOLATILE,
104  reliability=QoSReliabilityPolicy.RELIABLE,
105  history=QoSHistoryPolicy.RMW_QOS_POLICY_HISTORY_KEEP_LAST,
106  depth=1,
107  )
108 
109  self.model_pose_submodel_pose_sub = self.create_subscription(
110  PoseWithCovarianceStamped,
111  'amcl_pose',
112  self.poseCallbackposeCallbackposeCallback,
113  transient_local_qos,
114  )
115  self.clearing_ep_subclearing_ep_sub = self.create_subscription(
116  PointCloud2,
117  'local_costmap/clearing_endpoints',
118  self.clearingEndpointsCallbackclearingEndpointsCallback,
119  transient_local_qos,
120  )
121  self.test_typetest_type = test_type
122  self.filter_test_resultfilter_test_result = True
123  self.clearing_endpoints_receivedclearing_endpoints_received = False
124  self.voxel_marked_receivedvoxel_marked_received = False
125  self.voxel_unknown_receivedvoxel_unknown_received = False
126  self.cost_cloud_receivedcost_cloud_received = False
127 
128  if self.test_typetest_type == TestType.KEEPOUT:
129  self.plan_subplan_sub = self.create_subscription(
130  Path, 'plan', self.planCallbackplanCallback, volatile_qos
131  )
132  self.voxel_marked_subvoxel_marked_sub = self.create_subscription(
133  PointCloud2, 'voxel_marked_cloud', self.voxelMarkedCallbackvoxelMarkedCallback, 1
134  )
135  self.voxel_unknown_subvoxel_unknown_sub = self.create_subscription(
136  PointCloud2, 'voxel_unknown_cloud', self.voxelUnknownCallbackvoxelUnknownCallback, 1
137  )
138  self.cost_cloud_subcost_cloud_sub = self.create_subscription(
139  PointCloud2, 'cost_cloud', self.dwbCostCloudCallbackdwbCostCloudCallback, 1
140  )
141  elif self.test_typetest_type == TestType.SPEED:
142  self.speed_itspeed_it = 0
143  # Expected chain of speed limits
144  self.limitslimits = [50.0, 0.0]
145  # Permissive array: all received speed limits must match to 'limits' from above
146  self.limit_passedlimit_passed = [False, False]
147  self.plan_subplan_sub = self.create_subscription(
148  SpeedLimit, 'speed_limit', self.speedLimitCallbackspeedLimitCallback, volatile_qos
149  )
150 
151  self.mask_receivedmask_received = False
152  self.mask_submask_sub = self.create_subscription(
153  OccupancyGrid, 'filter_mask', self.maskCallbackmaskCallback, transient_local_qos
154  )
155 
156  self.initial_pose_receivedinitial_pose_received = False
157  self.initial_poseinitial_pose = initial_pose
158  self.goal_posegoal_pose = goal_pose
159  self.action_clientaction_client = ActionClient(self, NavigateToPose, 'navigate_to_pose')
160 
161  def info_msg(self, msg: str):
162  self.get_logger().info('\033[1;37;44m' + msg + '\033[0m')
163 
164  def warn_msg(self, msg: str):
165  self.get_logger().warn('\033[1;37;43m' + msg + '\033[0m')
166 
167  def error_msg(self, msg: str):
168  self.get_logger().error('\033[1;37;41m' + msg + '\033[0m')
169 
170  def setInitialPose(self):
171  msg = PoseWithCovarianceStamped()
172  msg.pose.pose = self.initial_poseinitial_pose
173  msg.header.frame_id = 'map'
174  self.info_msginfo_msginfo_msg('Publishing Initial Pose')
175  self.initial_pose_pubinitial_pose_pub.publish(msg)
176  self.currentPosecurrentPose = self.initial_poseinitial_pose
177 
178  def getStampedPoseMsg(self, pose: Pose):
179  msg = PoseStamped()
180  msg.header.frame_id = 'map'
181  msg.pose = pose
182  return msg
183 
184  def publishGoalPose(self, goal_pose: Optional[Pose] = None):
185  self.goal_posegoal_pose = goal_pose if goal_pose is not None else self.goal_posegoal_pose
186  self.goal_pubgoal_pub.publish(self.getStampedPoseMsggetStampedPoseMsggetStampedPoseMsg(self.goal_posegoal_pose))
187 
188  def runNavigateAction(self, goal_pose: Optional[Pose] = None):
189  # Sends a `NavToPose` action request and waits for completion
190  self.info_msginfo_msginfo_msg("Waiting for 'NavigateToPose' action server")
191  while not self.action_clientaction_client.wait_for_server(timeout_sec=1.0):
192  self.info_msginfo_msginfo_msg("'NavigateToPose' action server not available, waiting...")
193 
194  self.goal_posegoal_pose = goal_pose if goal_pose is not None else self.goal_posegoal_pose
195  goal_msg = NavigateToPose.Goal()
196  goal_msg.pose = self.getStampedPoseMsggetStampedPoseMsggetStampedPoseMsg(self.goal_posegoal_pose)
197 
198  self.info_msginfo_msginfo_msg('Sending goal request...')
199  send_goal_future = self.action_clientaction_client.send_goal_async(goal_msg)
200 
201  rclpy.spin_until_future_complete(self, send_goal_future)
202  goal_handle = send_goal_future.result()
203 
204  if not goal_handle.accepted:
205  self.error_msgerror_msgerror_msg('Goal rejected')
206  return False
207 
208  self.info_msginfo_msginfo_msg('Goal accepted')
209  get_result_future = goal_handle.get_result_async()
210 
211  self.info_msginfo_msginfo_msg("Waiting for 'NavigateToPose' action to complete")
212  rclpy.spin_until_future_complete(self, get_result_future)
213  status = get_result_future.result().status
214  if status != GoalStatus.STATUS_SUCCEEDED:
215  self.info_msginfo_msginfo_msg(f'Goal failed with status code: {status}')
216  return False
217 
218  self.info_msginfo_msginfo_msg('Goal succeeded!')
219  return True
220 
221  def isInKeepout(self, x, y):
222  mx, my = self.filter_maskfilter_mask.worldToMap(x, y)
223  if mx == -1 and my == -1: # Out of mask's area
224  return False
225  if self.filter_maskfilter_mask.getValue(mx, my) == 100: # Occupied
226  return True
227  return False
228 
229  # Checks that (x, y) position does not belong to a keepout zone.
230  def checkKeepout(self, x, y):
231  if not self.mask_receivedmask_received:
232  self.warn_msgwarn_msgwarn_msg('Filter mask was not received')
233  elif self.isInKeepoutisInKeepout(x, y):
234  self.filter_test_resultfilter_test_result = False
235  self.error_msgerror_msgerror_msg(f'Pose ({x}, {y}) belongs to keepout zone')
236  return False
237  return True
238 
239  # Checks that currently received speed_limit is equal to the it-th item
240  # of expected speed 'limits' array.
241  # If so, sets it-th item of permissive array 'limit_passed' to be true.
242  # Otherwise it will be remained to be false.
243  # Also verifies that speed limit messages received no more than N-times
244  # (where N - is the length of 'limits' array),
245  # otherwise sets overall 'filter_test_result' to be false.
246  def checkSpeed(self, it, speed_limit):
247  if it >= len(self.limitslimits):
248  self.error_msgerror_msgerror_msg('Got excess speed limit')
249  self.filter_test_resultfilter_test_result = False
250  return
251  if speed_limit == self.limitslimits[it]:
252  self.limit_passedlimit_passed[it] = True
253  else:
254  self.error_msgerror_msgerror_msg(
255  'Incorrect speed limit received: '
256  + str(speed_limit)
257  + ', but should be: '
258  + str(self.limitslimits[it])
259  )
260 
261  def poseCallback(self, msg):
262  self.info_msginfo_msginfo_msg('Received amcl_pose')
263  self.current_posecurrent_pose = msg.pose.pose
264  self.initial_pose_receivedinitial_pose_received = True
265  if self.test_typetest_type == TestType.KEEPOUT:
266  if not self.checkKeepoutcheckKeepout(
267  msg.pose.pose.position.x, msg.pose.pose.position.y
268  ):
269  self.error_msgerror_msgerror_msg('Robot goes into keepout zone')
270 
271  def planCallback(self, msg):
272  self.info_msginfo_msginfo_msg('Received plan')
273  for pose in msg.poses:
274  if not self.checkKeepoutcheckKeepout(pose.pose.position.x, pose.pose.position.y):
275  self.error_msgerror_msgerror_msg('Path plan intersects with keepout zone')
276  return
277 
278  def clearingEndpointsCallback(self, msg):
279  if len(msg.data) > 0:
280  self.clearing_endpoints_receivedclearing_endpoints_received = True
281 
282  def voxelMarkedCallback(self, msg):
283  if len(msg.data) > 0:
284  self.voxel_marked_receivedvoxel_marked_received = True
285 
286  def voxelUnknownCallback(self, msg):
287  if len(msg.data) > 0:
288  self.voxel_unknown_receivedvoxel_unknown_received = True
289 
290  def dwbCostCloudCallback(self, msg):
291  self.info_msginfo_msginfo_msg('Received cost_cloud points')
292  if len(msg.data) > 0:
293  self.cost_cloud_receivedcost_cloud_received = True
294 
295  def speedLimitCallback(self, msg):
296  self.info_msginfo_msginfo_msg(f'Received speed limit: {msg.speed_limit}')
297  self.checkSpeedcheckSpeed(self.speed_itspeed_it, msg.speed_limit)
298  self.speed_itspeed_it += 1
299 
300  def maskCallback(self, msg):
301  self.info_msginfo_msginfo_msg('Received filter mask')
302  self.filter_maskfilter_mask = FilterMask(msg)
303  self.mask_receivedmask_received = True
304 
305  def wait_for_filter_mask(self, timeout):
306  start_time = time.time()
307 
308  while not self.mask_receivedmask_received:
309  self.info_msginfo_msginfo_msg('Waiting for filter mask to be received ...')
310  rclpy.spin_once(self, timeout_sec=1)
311  if (time.time() - start_time) > timeout:
312  self.error_msgerror_msgerror_msg('Time out to waiting filter mask')
313  return False
314  return True
315 
316  def wait_for_pointcloud_subscribers(self, timeout):
317  start_time = time.time()
318  while (
319  not self.voxel_unknown_receivedvoxel_unknown_received
320  or not self.voxel_marked_receivedvoxel_marked_received
321  or not self.clearing_endpoints_receivedclearing_endpoints_received
322  ):
323  self.info_msginfo_msginfo_msg(
324  'Waiting for voxel_marked_cloud/voxel_unknown_cloud/\
325  clearing_endpoints msg to be received ...'
326  )
327  rclpy.spin_once(self, timeout_sec=1)
328  if (time.time() - start_time) > timeout:
329  self.error_msgerror_msgerror_msg(
330  'Time out to waiting for voxel_marked_cloud/voxel_unknown_cloud/\
331  clearing_endpoints msgs'
332  )
333  return False
334  return True
335 
336  def reachesGoal(self, timeout, distance):
337  goalReached = False
338  start_time = time.time()
339 
340  while not goalReached:
341  rclpy.spin_once(self, timeout_sec=1)
342  if self.distanceFromGoaldistanceFromGoaldistanceFromGoal() < distance:
343  goalReached = True
344  self.info_msginfo_msginfo_msg('*** GOAL REACHED ***')
345  return True
346  elif timeout is not None:
347  if (time.time() - start_time) > timeout:
348  self.error_msgerror_msgerror_msg('Robot timed out reaching its goal!')
349  return False
350 
351  def distanceFromGoal(self):
352  d_x = self.current_posecurrent_pose.position.x - self.goal_posegoal_pose.position.x
353  d_y = self.current_posecurrent_pose.position.y - self.goal_posegoal_pose.position.y
354  distance = math.sqrt(d_x * d_x + d_y * d_y)
355  self.info_msginfo_msginfo_msg(f'Distance from goal is: {distance}')
356  return distance
357 
358  def wait_for_node_active(self, node_name: str):
359  # Waits for the node within the tester namespace to become active
360  self.info_msginfo_msginfo_msg(f'Waiting for {node_name} to become active')
361  node_service = f'{node_name}/get_state'
362  state_client = self.create_client(GetState, node_service)
363  while not state_client.wait_for_service(timeout_sec=1.0):
364  self.info_msginfo_msginfo_msg(f'{node_service} service not available, waiting...')
365  req = GetState.Request() # empty request
366  state = 'UNKNOWN'
367  while state != 'active':
368  self.info_msginfo_msginfo_msg(f'Getting {node_name} state...')
369  future = state_client.call_async(req)
370  rclpy.spin_until_future_complete(self, future)
371  if future.result() is not None:
372  state = future.result().current_state.label
373  self.info_msginfo_msginfo_msg(f'Result of get_state: {state}')
374  else:
375  self.error_msgerror_msgerror_msg(
376  'Exception while calling service: %r' % future.exception()
377  )
378  time.sleep(5)
379 
380  def shutdown(self):
381  self.info_msginfo_msginfo_msg('Shutting down')
382  self.action_clientaction_client.destroy()
383 
384  transition_service = 'lifecycle_manager_navigation/manage_nodes'
385  mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
386  while not mgr_client.wait_for_service(timeout_sec=1.0):
387  self.info_msginfo_msginfo_msg(f'{transition_service} service not available, waiting...')
388 
389  req = ManageLifecycleNodes.Request()
390  req.command = ManageLifecycleNodes.Request().SHUTDOWN
391  future = mgr_client.call_async(req)
392  try:
393  self.info_msginfo_msginfo_msg('Shutting down navigation lifecycle manager...')
394  rclpy.spin_until_future_complete(self, future)
395  future.result()
396  self.info_msginfo_msginfo_msg('Shutting down navigation lifecycle manager complete.')
397  except Exception as e: # noqa: B902
398  self.error_msgerror_msgerror_msg(f'Service call failed {e!r}')
399  transition_service = 'lifecycle_manager_localization/manage_nodes'
400  mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
401  while not mgr_client.wait_for_service(timeout_sec=1.0):
402  self.info_msginfo_msginfo_msg(f'{transition_service} service not available, waiting...')
403 
404  req = ManageLifecycleNodes.Request()
405  req.command = ManageLifecycleNodes.Request().SHUTDOWN
406  future = mgr_client.call_async(req)
407  try:
408  self.info_msginfo_msginfo_msg('Shutting down localization lifecycle manager...')
409  rclpy.spin_until_future_complete(self, future)
410  future.result()
411  self.info_msginfo_msginfo_msg('Shutting down localization lifecycle manager complete')
412  except Exception as e: # noqa: B902
413  self.error_msgerror_msgerror_msg(f'Service call failed {e!r}')
414 
415  def wait_for_initial_pose(self):
416  self.initial_pose_receivedinitial_pose_received = False
417  while not self.initial_pose_receivedinitial_pose_received:
418  self.info_msginfo_msginfo_msg('Setting initial pose')
419  self.setInitialPosesetInitialPosesetInitialPose()
420  self.info_msginfo_msginfo_msg('Waiting for amcl_pose to be received')
421  rclpy.spin_once(self, timeout_sec=1)
422 
423 
424 def test_RobotMovesToGoal(robot_tester):
425  robot_tester.info_msg('Setting goal pose')
426  robot_tester.publishGoalPose()
427  robot_tester.info_msg('Waiting 60 seconds for robot to reach goal')
428  return robot_tester.reachesGoal(timeout=60, distance=0.5)
429 
430 
431 # Tests that all received speed limits are correct:
432 # If overall 'filter_test_result' is true
433 # checks that all items in 'limit_passed' permissive array are also true.
434 # In other words, it verifies that all speed limits are received
435 # exactly (by count and values) as expected by 'limits' array.
436 def test_SpeedLimitsAllCorrect(robot_tester):
437  if not robot_tester.filter_test_result:
438  return False
439  for passed in robot_tester.limit_passed:
440  if not passed:
441  robot_tester.error_msg('Did not meet one of the speed limit')
442  return False
443  return True
444 
445 
446 def run_all_tests(robot_tester):
447  # set transforms to use_sim_time
448  result = True
449  if result:
450  robot_tester.wait_for_node_active('amcl')
451  robot_tester.wait_for_initial_pose()
452  robot_tester.wait_for_node_active('bt_navigator')
453  result = robot_tester.wait_for_filter_mask(10)
454  if result:
455  result = robot_tester.runNavigateAction()
456 
457  if robot_tester.test_type == TestType.KEEPOUT:
458  result = result and robot_tester.wait_for_pointcloud_subscribers(10)
459 
460  if result:
461  result = test_RobotMovesToGoal(robot_tester)
462 
463  if result:
464  if robot_tester.test_type == TestType.KEEPOUT:
465  result = robot_tester.filter_test_result
466  result = result and robot_tester.cost_cloud_received
467  elif robot_tester.test_type == TestType.SPEED:
468  result = test_SpeedLimitsAllCorrect(robot_tester)
469 
470  # Add more tests here if desired
471 
472  if result:
473  robot_tester.info_msg('Test PASSED')
474  else:
475  robot_tester.error_msg('Test FAILED')
476 
477  return result
478 
479 
480 def fwd_pose(x=0.0, y=0.0, z=0.01):
481  initial_pose = Pose()
482  initial_pose.position.x = x
483  initial_pose.position.y = y
484  initial_pose.position.z = z
485  initial_pose.orientation.x = 0.0
486  initial_pose.orientation.y = 0.0
487  initial_pose.orientation.z = 0.0
488  initial_pose.orientation.w = 1.0
489  return initial_pose
490 
491 
492 def get_tester(args):
493 
494  # Requested tester for one robot
495  type_str = args.type
496  init_x, init_y, final_x, final_y = args.robot[0]
497  test_type = TestType.KEEPOUT # Default value
498  if type_str == 'speed':
499  test_type = TestType.SPEED
500 
501  tester = NavTester(
502  test_type,
503  initial_pose=fwd_pose(float(init_x), float(init_y)),
504  goal_pose=fwd_pose(float(final_x), float(final_y)),
505  )
506  tester.info_msg(
507  'Starting tester, robot going from '
508  + init_x
509  + ', '
510  + init_y
511  + ' to '
512  + final_x
513  + ', '
514  + final_y
515  + '.'
516  )
517  return tester
518 
519 
520 def main(argv=sys.argv[1:]):
521  # The robot(s) positions from the input arguments
522  parser = argparse.ArgumentParser(
523  description='System-level costmap filters tester node'
524  )
525  parser.add_argument(
526  '-t',
527  '--type',
528  type=str,
529  action='store',
530  dest='type',
531  help='Type of costmap filter being tested.',
532  )
533  group = parser.add_mutually_exclusive_group(required=True)
534  group.add_argument(
535  '-r',
536  '--robot',
537  action='append',
538  nargs=4,
539  metavar=('init_x', 'init_y', 'final_x', 'final_y'),
540  help='The robot starting and final positions.',
541  )
542 
543  args, unknown = parser.parse_known_args()
544 
545  rclpy.init()
546 
547  # Create tester for the robot
548  tester = get_tester(args)
549 
550  # wait a few seconds to make sure entire stacks are up
551  time.sleep(10)
552 
553  passed = run_all_tests(tester)
554 
555  # stop and shutdown the nav stack to exit cleanly
556  tester.shutdown()
557  tester.info_msg('Done Shutting Down.')
558 
559  if not passed:
560  tester.info_msg('Exiting failed')
561  exit(1)
562  else:
563  tester.info_msg('Exiting passed')
564  exit(0)
565 
566 
567 if __name__ == '__main__':
568  main()
def dwbCostCloudCallback(self, msg)
Definition: tester_node.py:290
def info_msg(self, str msg)
Definition: tester_node.py:161
def getStampedPoseMsg(self, Pose pose)
Definition: tester_node.py:178
def planCallback(self, msg)
Definition: tester_node.py:271
def voxelMarkedCallback(self, msg)
Definition: tester_node.py:282
def setInitialPose(self)
Definition: tester_node.py:170
def distanceFromGoal(self)
Definition: tester_node.py:351
def error_msg(self, str msg)
Definition: tester_node.py:167
def speedLimitCallback(self, msg)
Definition: tester_node.py:295
def clearingEndpointsCallback(self, msg)
Definition: tester_node.py:278
def maskCallback(self, msg)
Definition: tester_node.py:300
def voxelUnknownCallback(self, msg)
Definition: tester_node.py:286
def checkSpeed(self, it, speed_limit)
Definition: tester_node.py:246
def warn_msg(self, str msg)
Definition: tester_node.py:164
def isInKeepout(self, x, y)
Definition: tester_node.py:221
def checkKeepout(self, x, y)
Definition: tester_node.py:230
def poseCallback(self, msg)
Definition: tester_node.py:261