Nav2 Navigation Stack - humble  humble
ROS 2 Navigation Stack
tester_node.py
1 #!/usr/bin/env python3
2 
3 # Copyright (c) 2018 Intel Corporation.
4 # Copyright (c) 2020 Samsung Research Russia
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 
18 import argparse
19 from enum import Enum
20 import math
21 import sys
22 import time
23 from typing import Optional
24 
25 from action_msgs.msg import GoalStatus
26 from geometry_msgs.msg import Pose
27 from geometry_msgs.msg import PoseStamped
28 from geometry_msgs.msg import PoseWithCovarianceStamped
29 from lifecycle_msgs.srv import GetState
30 from nav2_msgs.action import NavigateToPose
31 from nav2_msgs.msg import SpeedLimit
32 from nav2_msgs.srv import ManageLifecycleNodes
33 from nav_msgs.msg import OccupancyGrid
34 from nav_msgs.msg import Path
35 
36 import rclpy
37 from rclpy.action import ActionClient
38 from rclpy.node import Node
39 from rclpy.qos import QoSDurabilityPolicy, QoSHistoryPolicy, QoSReliabilityPolicy
40 from rclpy.qos import QoSProfile
41 from sensor_msgs.msg import PointCloud2
42 
43 
44 class TestType(Enum):
45  KEEPOUT = 0
46  SPEED = 1
47 
48 
49 class FilterMask():
50 
51  def __init__(
52  self,
53  filter_mask: OccupancyGrid
54  ):
55  self.filter_maskfilter_mask = filter_mask
56 
57  # Converts world coordinates into filter mask map coordinate.
58  # Returns filter mask map coordinates or (-1, -1) in case
59  # if world coordinates are out of mask bounds.
60  def worldToMap(self, wx: float, wy: float):
61  origin_x = self.filter_maskfilter_mask.info.origin.position.x
62  origin_y = self.filter_maskfilter_mask.info.origin.position.y
63  size_x = self.filter_maskfilter_mask.info.width
64  size_y = self.filter_maskfilter_mask.info.height
65  resolution = self.filter_maskfilter_mask.info.resolution
66 
67  if wx < origin_x or wy < origin_y:
68  return -1, -1
69 
70  mx = int((wx - origin_x) / resolution)
71  my = int((wy - origin_y) / resolution)
72 
73  if mx < size_x and my < size_y:
74  return mx, my
75 
76  return -1, -1
77 
78  # Gets filter_mask[mx, my] value
79  def getValue(self, mx, my):
80  size_x = self.filter_maskfilter_mask.info.width
81  return self.filter_maskfilter_mask.data[mx + my * size_x]
82 
83 
84 class NavTester(Node):
85 
86  def __init__(
87  self,
88  test_type: TestType,
89  initial_pose: Pose,
90  goal_pose: Pose,
91  namespace: str = ''
92  ):
93  super().__init__(node_name='nav2_tester', namespace=namespace)
94  self.initial_pose_pubinitial_pose_pub = self.create_publisher(PoseWithCovarianceStamped,
95  'initialpose', 10)
96  self.goal_pubgoal_pub = self.create_publisher(PoseStamped, 'goal_pose', 10)
97 
98  transient_local_qos = QoSProfile(
99  durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
100  reliability=QoSReliabilityPolicy.RELIABLE,
101  history=QoSHistoryPolicy.KEEP_LAST,
102  depth=1)
103 
104  volatile_qos = QoSProfile(
105  durability=QoSDurabilityPolicy.RMW_QOS_POLICY_DURABILITY_VOLATILE,
106  reliability=QoSReliabilityPolicy.RELIABLE,
107  history=QoSHistoryPolicy.RMW_QOS_POLICY_HISTORY_KEEP_LAST,
108  depth=1)
109 
110  self.model_pose_submodel_pose_sub = self.create_subscription(PoseWithCovarianceStamped,
111  'amcl_pose', self.poseCallbackposeCallbackposeCallback,
112  transient_local_qos)
113  self.clearing_ep_subclearing_ep_sub = self.create_subscription(PointCloud2,
114  'local_costmap/clearing_endpoints',
115  self.clearingEndpointsCallbackclearingEndpointsCallback,
116  transient_local_qos)
117  self.test_typetest_type = test_type
118  self.filter_test_resultfilter_test_result = True
119  self.clearing_endpoints_receivedclearing_endpoints_received = False
120  self.voxel_marked_receivedvoxel_marked_received = False
121  self.voxel_unknown_receivedvoxel_unknown_received = False
122  self.cost_cloud_receivedcost_cloud_received = False
123 
124  if self.test_typetest_type == TestType.KEEPOUT:
125  self.plan_subplan_sub = self.create_subscription(Path, 'plan',
126  self.planCallbackplanCallback, volatile_qos)
127  self.voxel_marked_subvoxel_marked_sub = self.create_subscription(PointCloud2,
128  'voxel_marked_cloud',
129  self.voxelMarkedCallbackvoxelMarkedCallback,
130  1)
131  self.voxel_unknown_subvoxel_unknown_sub = self.create_subscription(PointCloud2,
132  'voxel_unknown_cloud',
133  self.voxelUnknownCallbackvoxelUnknownCallback,
134  1)
135  self.cost_cloud_subcost_cloud_sub = self.create_subscription(PointCloud2,
136  'cost_cloud',
137  self.dwbCostCloudCallbackdwbCostCloudCallback,
138  1)
139  elif self.test_typetest_type == TestType.SPEED:
140  self.speed_itspeed_it = 0
141  # Expected chain of speed limits
142  self.limitslimits = [50.0, 0.0]
143  # Permissive array: all received speed limits must match to "limits" from above
144  self.limit_passedlimit_passed = [False, False]
145  self.plan_subplan_sub = self.create_subscription(SpeedLimit, 'speed_limit',
146  self.speedLimitCallbackspeedLimitCallback, volatile_qos)
147 
148  self.mask_receivedmask_received = False
149  self.mask_submask_sub = self.create_subscription(OccupancyGrid, 'filter_mask',
150  self.maskCallbackmaskCallback, transient_local_qos)
151 
152  self.initial_pose_receivedinitial_pose_received = False
153  self.initial_poseinitial_pose = initial_pose
154  self.goal_posegoal_pose = goal_pose
155  self.action_clientaction_client = ActionClient(
156  self, NavigateToPose, 'navigate_to_pose')
157 
158  def info_msg(self, msg: str):
159  self.get_logger().info('\033[1;37;44m' + msg + '\033[0m')
160 
161  def warn_msg(self, msg: str):
162  self.get_logger().warn('\033[1;37;43m' + msg + '\033[0m')
163 
164  def error_msg(self, msg: str):
165  self.get_logger().error('\033[1;37;41m' + msg + '\033[0m')
166 
167  def setInitialPose(self):
168  msg = PoseWithCovarianceStamped()
169  msg.pose.pose = self.initial_poseinitial_pose
170  msg.header.frame_id = 'map'
171  self.info_msginfo_msginfo_msg('Publishing Initial Pose')
172  self.initial_pose_pubinitial_pose_pub.publish(msg)
173  self.currentPosecurrentPose = self.initial_poseinitial_pose
174 
175  def getStampedPoseMsg(self, pose: Pose):
176  msg = PoseStamped()
177  msg.header.frame_id = 'map'
178  msg.pose = pose
179  return msg
180 
181  def publishGoalPose(self, goal_pose: Optional[Pose] = None):
182  self.goal_posegoal_pose = goal_pose if goal_pose is not None else self.goal_posegoal_pose
183  self.goal_pubgoal_pub.publish(self.getStampedPoseMsggetStampedPoseMsggetStampedPoseMsg(self.goal_posegoal_pose))
184 
185  def runNavigateAction(self, goal_pose: Optional[Pose] = None):
186  # Sends a `NavToPose` action request and waits for completion
187  self.info_msginfo_msginfo_msg("Waiting for 'NavigateToPose' action server")
188  while not self.action_clientaction_client.wait_for_server(timeout_sec=1.0):
189  self.info_msginfo_msginfo_msg(
190  "'NavigateToPose' action server not available, waiting...")
191 
192  self.goal_posegoal_pose = goal_pose if goal_pose is not None else self.goal_posegoal_pose
193  goal_msg = NavigateToPose.Goal()
194  goal_msg.pose = self.getStampedPoseMsggetStampedPoseMsggetStampedPoseMsg(self.goal_posegoal_pose)
195 
196  self.info_msginfo_msginfo_msg('Sending goal request...')
197  send_goal_future = self.action_clientaction_client.send_goal_async(goal_msg)
198 
199  rclpy.spin_until_future_complete(self, send_goal_future)
200  goal_handle = send_goal_future.result()
201 
202  if not goal_handle.accepted:
203  self.error_msgerror_msgerror_msg('Goal rejected')
204  return False
205 
206  self.info_msginfo_msginfo_msg('Goal accepted')
207  get_result_future = goal_handle.get_result_async()
208 
209  self.info_msginfo_msginfo_msg("Waiting for 'NavigateToPose' action to complete")
210  rclpy.spin_until_future_complete(self, get_result_future)
211  status = get_result_future.result().status
212  if status != GoalStatus.STATUS_SUCCEEDED:
213  self.info_msginfo_msginfo_msg(f'Goal failed with status code: {status}')
214  return False
215 
216  self.info_msginfo_msginfo_msg('Goal succeeded!')
217  return True
218 
219  def isInKeepout(self, x, y):
220  mx, my = self.filter_maskfilter_mask.worldToMap(x, y)
221  if mx == -1 and my == -1: # Out of mask's area
222  return False
223  if self.filter_maskfilter_mask.getValue(mx, my) == 100: # Occupied
224  return True
225  return False
226 
227  # Checks that (x, y) position does not belong to a keepout zone.
228  def checkKeepout(self, x, y):
229  if not self.mask_receivedmask_received:
230  self.warn_msgwarn_msgwarn_msg('Filter mask was not received')
231  elif self.isInKeepoutisInKeepout(x, y):
232  self.filter_test_resultfilter_test_result = False
233  self.error_msgerror_msgerror_msg(f'Pose ({x}, {y}) belongs to keepout zone')
234  return False
235  return True
236 
237  # Checks that currently received speed_limit is equal to the it-th item
238  # of expected speed "limits" array.
239  # If so, sets it-th item of permissive array "limit_passed" to be true.
240  # Otherwise it will be remained to be false.
241  # Also verifies that speed limit messages received no more than N-times
242  # (where N - is the length of "limits" array),
243  # otherwise sets overall "filter_test_result" to be false.
244  def checkSpeed(self, it, speed_limit):
245  if it >= len(self.limitslimits):
246  self.error_msgerror_msgerror_msg('Got excess speed limit')
247  self.filter_test_resultfilter_test_result = False
248  return
249  if speed_limit == self.limitslimits[it]:
250  self.limit_passedlimit_passed[it] = True
251  else:
252  self.error_msgerror_msgerror_msg('Incorrect speed limit received: ' + str(speed_limit) +
253  ', but should be: ' + str(self.limitslimits[it]))
254 
255  def poseCallback(self, msg):
256  self.info_msginfo_msginfo_msg('Received amcl_pose')
257  self.current_posecurrent_pose = msg.pose.pose
258  self.initial_pose_receivedinitial_pose_received = True
259  if self.test_typetest_type == TestType.KEEPOUT:
260  if not self.checkKeepoutcheckKeepout(msg.pose.pose.position.x, msg.pose.pose.position.y):
261  self.error_msgerror_msgerror_msg('Robot goes into keepout zone')
262 
263  def planCallback(self, msg):
264  self.info_msginfo_msginfo_msg('Received plan')
265  for pose in msg.poses:
266  if not self.checkKeepoutcheckKeepout(pose.pose.position.x, pose.pose.position.y):
267  self.error_msgerror_msgerror_msg('Path plan intersects with keepout zone')
268  return
269 
270  def clearingEndpointsCallback(self, msg):
271  if len(msg.data) > 0:
272  self.clearing_endpoints_receivedclearing_endpoints_received = True
273 
274  def voxelMarkedCallback(self, msg):
275  if len(msg.data) > 0:
276  self.voxel_marked_receivedvoxel_marked_received = True
277 
278  def voxelUnknownCallback(self, msg):
279  if len(msg.data) > 0:
280  self.voxel_unknown_receivedvoxel_unknown_received = True
281 
282  def dwbCostCloudCallback(self, msg):
283  self.info_msginfo_msginfo_msg('Received cost_cloud points')
284  if len(msg.data) > 0:
285  self.cost_cloud_receivedcost_cloud_received = True
286 
287  def speedLimitCallback(self, msg):
288  self.info_msginfo_msginfo_msg(f'Received speed limit: {msg.speed_limit}')
289  self.checkSpeedcheckSpeed(self.speed_itspeed_it, msg.speed_limit)
290  self.speed_itspeed_it += 1
291 
292  def maskCallback(self, msg):
293  self.info_msginfo_msginfo_msg('Received filter mask')
294  self.filter_maskfilter_mask = FilterMask(msg)
295  self.mask_receivedmask_received = True
296 
297  def wait_for_filter_mask(self, timeout):
298  start_time = time.time()
299 
300  while not self.mask_receivedmask_received:
301  self.info_msginfo_msginfo_msg('Waiting for filter mask to be received ...')
302  rclpy.spin_once(self, timeout_sec=1)
303  if (time.time() - start_time) > timeout:
304  self.error_msgerror_msgerror_msg('Time out to waiting filter mask')
305  return False
306  return True
307 
308  def wait_for_pointcloud_subscribers(self, timeout):
309  start_time = time.time()
310  while not self.voxel_unknown_receivedvoxel_unknown_received or not self.voxel_marked_receivedvoxel_marked_received \
311  or not self.clearing_endpoints_receivedclearing_endpoints_received:
312  self.info_msginfo_msginfo_msg(
313  'Waiting for voxel_marked_cloud/voxel_unknown_cloud/\
314  clearing_endpoints msg to be received ...')
315  rclpy.spin_once(self, timeout_sec=1)
316  if (time.time() - start_time) > timeout:
317  self.error_msgerror_msgerror_msg(
318  'Time out to waiting for voxel_marked_cloud/voxel_unknown_cloud/\
319  clearing_endpoints msgs')
320  return False
321  return True
322 
323  def reachesGoal(self, timeout, distance):
324  goalReached = False
325  start_time = time.time()
326 
327  while not goalReached:
328  rclpy.spin_once(self, timeout_sec=1)
329  if self.distanceFromGoaldistanceFromGoaldistanceFromGoal() < distance:
330  goalReached = True
331  self.info_msginfo_msginfo_msg('*** GOAL REACHED ***')
332  return True
333  elif timeout is not None:
334  if (time.time() - start_time) > timeout:
335  self.error_msgerror_msgerror_msg('Robot timed out reaching its goal!')
336  return False
337 
338  def distanceFromGoal(self):
339  d_x = self.current_posecurrent_pose.position.x - self.goal_posegoal_pose.position.x
340  d_y = self.current_posecurrent_pose.position.y - self.goal_posegoal_pose.position.y
341  distance = math.sqrt(d_x * d_x + d_y * d_y)
342  self.info_msginfo_msginfo_msg(f'Distance from goal is: {distance}')
343  return distance
344 
345  def wait_for_node_active(self, node_name: str):
346  # Waits for the node within the tester namespace to become active
347  self.info_msginfo_msginfo_msg(f'Waiting for {node_name} to become active')
348  node_service = f'{node_name}/get_state'
349  state_client = self.create_client(GetState, node_service)
350  while not state_client.wait_for_service(timeout_sec=1.0):
351  self.info_msginfo_msginfo_msg(f'{node_service} service not available, waiting...')
352  req = GetState.Request() # empty request
353  state = 'UNKNOWN'
354  while (state != 'active'):
355  self.info_msginfo_msginfo_msg(f'Getting {node_name} state...')
356  future = state_client.call_async(req)
357  rclpy.spin_until_future_complete(self, future)
358  if future.result() is not None:
359  state = future.result().current_state.label
360  self.info_msginfo_msginfo_msg(f'Result of get_state: {state}')
361  else:
362  self.error_msgerror_msgerror_msg('Exception while calling service: %r' %
363  future.exception())
364  time.sleep(5)
365 
366  def shutdown(self):
367  self.info_msginfo_msginfo_msg('Shutting down')
368  self.action_clientaction_client.destroy()
369 
370  transition_service = 'lifecycle_manager_navigation/manage_nodes'
371  mgr_client = self.create_client(
372  ManageLifecycleNodes, transition_service)
373  while not mgr_client.wait_for_service(timeout_sec=1.0):
374  self.info_msginfo_msginfo_msg(f'{transition_service} service not available, waiting...')
375 
376  req = ManageLifecycleNodes.Request()
377  req.command = ManageLifecycleNodes.Request().SHUTDOWN
378  future = mgr_client.call_async(req)
379  try:
380  self.info_msginfo_msginfo_msg('Shutting down navigation lifecycle manager...')
381  rclpy.spin_until_future_complete(self, future)
382  future.result()
383  self.info_msginfo_msginfo_msg(
384  'Shutting down navigation lifecycle manager complete.')
385  except Exception as e: # noqa: B902
386  self.error_msgerror_msgerror_msg(f'Service call failed {e!r}')
387  transition_service = 'lifecycle_manager_localization/manage_nodes'
388  mgr_client = self.create_client(
389  ManageLifecycleNodes, transition_service)
390  while not mgr_client.wait_for_service(timeout_sec=1.0):
391  self.info_msginfo_msginfo_msg(f'{transition_service} service not available, waiting...')
392 
393  req = ManageLifecycleNodes.Request()
394  req.command = ManageLifecycleNodes.Request().SHUTDOWN
395  future = mgr_client.call_async(req)
396  try:
397  self.info_msginfo_msginfo_msg('Shutting down localization lifecycle manager...')
398  rclpy.spin_until_future_complete(self, future)
399  future.result()
400  self.info_msginfo_msginfo_msg(
401  'Shutting down localization lifecycle manager complete')
402  except Exception as e: # noqa: B902
403  self.error_msgerror_msgerror_msg(f'Service call failed {e!r}')
404 
405  def wait_for_initial_pose(self):
406  self.initial_pose_receivedinitial_pose_received = False
407  while not self.initial_pose_receivedinitial_pose_received:
408  self.info_msginfo_msginfo_msg('Setting initial pose')
409  self.setInitialPosesetInitialPosesetInitialPose()
410  self.info_msginfo_msginfo_msg('Waiting for amcl_pose to be received')
411  rclpy.spin_once(self, timeout_sec=1)
412 
413 
414 def test_RobotMovesToGoal(robot_tester):
415  robot_tester.info_msg('Setting goal pose')
416  robot_tester.publishGoalPose()
417  robot_tester.info_msg('Waiting 60 seconds for robot to reach goal')
418  return robot_tester.reachesGoal(timeout=60, distance=0.5)
419 
420 
421 # Tests that all received speed limits are correct:
422 # If overall "filter_test_result" is true
423 # checks that all items in "limit_passed" permissive array are also true.
424 # In other words, it verifies that all speed limits are received
425 # exactly (by count and values) as expected by "limits" array.
426 def test_SpeedLimitsAllCorrect(robot_tester):
427  if not robot_tester.filter_test_result:
428  return False
429  for passed in robot_tester.limit_passed:
430  if not passed:
431  robot_tester.error_msg('Did not meet one of the speed limit')
432  return False
433  return True
434 
435 
436 def run_all_tests(robot_tester):
437  # set transforms to use_sim_time
438  result = True
439  if (result):
440  robot_tester.wait_for_node_active('amcl')
441  robot_tester.wait_for_initial_pose()
442  robot_tester.wait_for_node_active('bt_navigator')
443  result = robot_tester.wait_for_filter_mask(10)
444  if (result):
445  result = robot_tester.runNavigateAction()
446 
447  if robot_tester.test_type == TestType.KEEPOUT:
448  result = result and robot_tester.wait_for_pointcloud_subscribers(10)
449 
450  if (result):
451  result = test_RobotMovesToGoal(robot_tester)
452 
453  if (result):
454  if robot_tester.test_type == TestType.KEEPOUT:
455  result = robot_tester.filter_test_result
456  result = result and robot_tester.cost_cloud_received
457  elif robot_tester.test_type == TestType.SPEED:
458  result = test_SpeedLimitsAllCorrect(robot_tester)
459 
460  # Add more tests here if desired
461 
462  if (result):
463  robot_tester.info_msg('Test PASSED')
464  else:
465  robot_tester.error_msg('Test FAILED')
466 
467  return result
468 
469 
470 def fwd_pose(x=0.0, y=0.0, z=0.01):
471  initial_pose = Pose()
472  initial_pose.position.x = x
473  initial_pose.position.y = y
474  initial_pose.position.z = z
475  initial_pose.orientation.x = 0.0
476  initial_pose.orientation.y = 0.0
477  initial_pose.orientation.z = 0.0
478  initial_pose.orientation.w = 1.0
479  return initial_pose
480 
481 
482 def get_tester(args):
483 
484  # Requested tester for one robot
485  type_str = args.type
486  init_x, init_y, final_x, final_y = args.robot[0]
487  test_type = TestType.KEEPOUT # Default value
488  if type_str == 'speed':
489  test_type = TestType.SPEED
490 
491  tester = NavTester(
492  test_type,
493  initial_pose=fwd_pose(float(init_x), float(init_y)),
494  goal_pose=fwd_pose(float(final_x), float(final_y)))
495  tester.info_msg(
496  'Starting tester, robot going from ' + init_x + ', ' + init_y +
497  ' to ' + final_x + ', ' + final_y + '.')
498  return tester
499 
500 
501 def main(argv=sys.argv[1:]):
502  # The robot(s) positions from the input arguments
503  parser = argparse.ArgumentParser(
504  description='System-level costmap filters tester node')
505  parser.add_argument('-t', '--type', type=str, action='store', dest='type',
506  help='Type of costmap filter being tested.')
507  group = parser.add_mutually_exclusive_group(required=True)
508  group.add_argument('-r', '--robot', action='append', nargs=4,
509  metavar=('init_x', 'init_y', 'final_x', 'final_y'),
510  help='The robot starting and final positions.')
511 
512  args, unknown = parser.parse_known_args()
513 
514  rclpy.init()
515 
516  # Create tester for the robot
517  tester = get_tester(args)
518 
519  # wait a few seconds to make sure entire stacks are up
520  time.sleep(10)
521 
522  passed = run_all_tests(tester)
523 
524  # stop and shutdown the nav stack to exit cleanly
525  tester.shutdown()
526  tester.info_msg('Done Shutting Down.')
527 
528  if not passed:
529  tester.info_msg('Exiting failed')
530  exit(1)
531  else:
532  tester.info_msg('Exiting passed')
533  exit(0)
534 
535 
536 if __name__ == '__main__':
537  main()
def dwbCostCloudCallback(self, msg)
Definition: tester_node.py:282
def info_msg(self, str msg)
Definition: tester_node.py:158
def getStampedPoseMsg(self, Pose pose)
Definition: tester_node.py:175
def planCallback(self, msg)
Definition: tester_node.py:263
def voxelMarkedCallback(self, msg)
Definition: tester_node.py:274
def setInitialPose(self)
Definition: tester_node.py:167
def distanceFromGoal(self)
Definition: tester_node.py:338
def error_msg(self, str msg)
Definition: tester_node.py:164
def speedLimitCallback(self, msg)
Definition: tester_node.py:287
def clearingEndpointsCallback(self, msg)
Definition: tester_node.py:270
def maskCallback(self, msg)
Definition: tester_node.py:292
def voxelUnknownCallback(self, msg)
Definition: tester_node.py:278
def checkSpeed(self, it, speed_limit)
Definition: tester_node.py:244
def warn_msg(self, str msg)
Definition: tester_node.py:161
def isInKeepout(self, x, y)
Definition: tester_node.py:219
def checkKeepout(self, x, y)
Definition: tester_node.py:228
def poseCallback(self, msg)
Definition: tester_node.py:255