Nav2 Navigation Stack - kilted  kilted
ROS 2 Navigation Stack
tester_node.py
1 #!/usr/bin/env python3
2 
3 # Copyright (c) 2018 Intel Corporation.
4 # Copyright (c) 2020 Samsung Research Russia
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 
18 import argparse
19 from enum import Enum
20 import math
21 import sys
22 import time
23 from typing import Optional
24 
25 from action_msgs.msg import GoalStatus
26 from geometry_msgs.msg import Pose, PoseStamped, PoseWithCovarianceStamped
27 from lifecycle_msgs.srv import GetState
28 from nav2_msgs.action import NavigateToPose
29 from nav2_msgs.msg import SpeedLimit
30 from nav2_msgs.srv import ManageLifecycleNodes
31 from nav_msgs.msg import OccupancyGrid, Path
32 import rclpy
33 from rclpy.action import ActionClient # type: ignore[attr-defined]
34 from rclpy.node import Node
35 from rclpy.qos import QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy
36 from sensor_msgs.msg import PointCloud2
37 
38 
39 class TestType(Enum):
40  KEEPOUT = 0
41  SPEED = 1
42 
43 
44 class FilterMask:
45 
46  def __init__(self, filter_mask: OccupancyGrid):
47  self.filter_mask: OccupancyGrid = filter_mask
48 
49  # Converts world coordinates into filter mask map coordinate.
50  # Returns filter mask map coordinates or (-1, -1) in case
51  # if world coordinates are out of mask bounds.
52  def worldToMap(self, wx: float, wy: float) -> tuple[int, int]:
53  origin_x = self.filter_mask.info.origin.position.x
54  origin_y = self.filter_mask.info.origin.position.y
55  size_x = self.filter_mask.info.width
56  size_y = self.filter_mask.info.height
57  resolution = self.filter_mask.info.resolution
58 
59  if wx < origin_x or wy < origin_y:
60  return -1, -1
61 
62  mx = int((wx - origin_x) / resolution)
63  my = int((wy - origin_y) / resolution)
64 
65  if mx < size_x and my < size_y:
66  return mx, my
67 
68  return -1, -1
69 
70  # Gets filter_mask[mx, my] value
71  def getValue(self, mx: int, my: int): # type: ignore[no-untyped-def]
72  size_x = self.filter_mask.info.width
73  return self.filter_mask.data[mx + my * size_x]
74 
75 
76 class NavTester(Node):
77  def __init__(
78  self,
79  test_type: TestType,
80  initial_pose: Pose,
81  goal_pose: Pose,
82  namespace: str = '',
83  ):
84  super().__init__(node_name='nav2_tester', namespace=namespace)
85  self.initial_pose_pubinitial_pose_pub = self.create_publisher(
86  PoseWithCovarianceStamped, 'initialpose', 10
87  )
88  self.goal_pubgoal_pub = self.create_publisher(PoseStamped, 'goal_pose', 10)
89 
90  transient_local_qos = QoSProfile(
91  durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
92  reliability=QoSReliabilityPolicy.RELIABLE,
93  history=QoSHistoryPolicy.KEEP_LAST,
94  depth=1,
95  )
96 
97  volatile_qos = QoSProfile(
98  durability=QoSDurabilityPolicy.VOLATILE,
99  reliability=QoSReliabilityPolicy.RELIABLE,
100  history=QoSHistoryPolicy.KEEP_LAST,
101  depth=1,
102  )
103 
104  self.model_pose_submodel_pose_sub = self.create_subscription(
105  PoseWithCovarianceStamped,
106  'amcl_pose',
107  self.poseCallbackposeCallbackposeCallback,
108  transient_local_qos,
109  )
110  self.clearing_ep_subclearing_ep_sub = self.create_subscription(
111  PointCloud2,
112  'local_costmap/clearing_endpoints',
113  self.clearingEndpointsCallbackclearingEndpointsCallback,
114  transient_local_qos,
115  )
116  self.test_typetest_type = test_type
117  self.filter_test_resultfilter_test_result = True
118  self.clearing_endpoints_receivedclearing_endpoints_received = False
119  self.voxel_marked_receivedvoxel_marked_received = False
120  self.voxel_unknown_receivedvoxel_unknown_received = False
121  self.cost_cloud_receivedcost_cloud_received = False
122 
123  if self.test_typetest_type == TestType.KEEPOUT:
124  self.plan_subplan_sub = self.create_subscription(
125  Path, 'plan', self.planCallbackplanCallback, volatile_qos
126  )
127  self.voxel_marked_subvoxel_marked_sub = self.create_subscription(
128  PointCloud2, 'voxel_marked_cloud', self.voxelMarkedCallbackvoxelMarkedCallback, 1
129  )
130  self.voxel_unknown_subvoxel_unknown_sub = self.create_subscription(
131  PointCloud2, 'voxel_unknown_cloud', self.voxelUnknownCallbackvoxelUnknownCallback, 1
132  )
133  self.cost_cloud_subcost_cloud_sub = self.create_subscription(
134  PointCloud2, 'cost_cloud', self.dwbCostCloudCallbackdwbCostCloudCallback, 1
135  )
136  elif self.test_typetest_type == TestType.SPEED:
137  self.speed_itspeed_it = 0
138  # Expected chain of speed limits
139  self.limitslimits = [50.0, 0.0]
140  # Permissive array: all received speed limits must match to 'limits' from above
141  self.limit_passedlimit_passed = [False, False]
142  self.plan_subplan_sub = self.create_subscription(
143  SpeedLimit, 'speed_limit', self.speedLimitCallbackspeedLimitCallback, volatile_qos
144  )
145 
146  self.mask_receivedmask_received = False
147  self.mask_submask_sub = self.create_subscription(
148  OccupancyGrid, 'filter_mask', self.maskCallbackmaskCallback, transient_local_qos
149  )
150 
151  self.initial_pose_receivedinitial_pose_received = False
152  self.initial_poseinitial_pose = initial_pose
153  self.goal_posegoal_pose = goal_pose
154  self.action_clientaction_client = ActionClient(self, NavigateToPose, 'navigate_to_pose')
155 
156  def info_msg(self, msg: str) -> None:
157  self.get_logger().info('\033[1;37;44m' + msg + '\033[0m')
158 
159  def warn_msg(self, msg: str) -> None:
160  self.get_logger().warn('\033[1;37;43m' + msg + '\033[0m')
161 
162  def error_msg(self, msg: str) -> None:
163  self.get_logger().error('\033[1;37;41m' + msg + '\033[0m')
164 
165  def setInitialPose(self) -> None:
166  msg = PoseWithCovarianceStamped()
167  msg.pose.pose = self.initial_poseinitial_pose
168  msg.header.frame_id = 'map'
169  self.info_msginfo_msginfo_msg('Publishing Initial Pose')
170  self.initial_pose_pubinitial_pose_pub.publish(msg)
171  self.currentPosecurrentPose = self.initial_poseinitial_pose
172 
173  def getStampedPoseMsg(self, pose: Pose) -> PoseStamped:
174  msg = PoseStamped()
175  msg.header.frame_id = 'map'
176  msg.pose = pose
177  return msg
178 
179  def publishGoalPose(self, goal_pose: Optional[Pose] = None) -> None:
180  self.goal_posegoal_pose = goal_pose if goal_pose is not None else self.goal_posegoal_pose
181  self.goal_pubgoal_pub.publish(self.getStampedPoseMsggetStampedPoseMsggetStampedPoseMsg(self.goal_posegoal_pose))
182 
183  def runNavigateAction(self, goal_pose: Optional[Pose] = None) -> bool:
184  # Sends a `NavToPose` action request and waits for completion
185  self.info_msginfo_msginfo_msg("Waiting for 'NavigateToPose' action server")
186  while not self.action_clientaction_client.wait_for_server(timeout_sec=1.0):
187  self.info_msginfo_msginfo_msg("'NavigateToPose' action server not available, waiting...")
188 
189  self.goal_posegoal_pose = goal_pose if goal_pose is not None else self.goal_posegoal_pose
190  goal_msg = NavigateToPose.Goal()
191  goal_msg.pose = self.getStampedPoseMsggetStampedPoseMsggetStampedPoseMsg(self.goal_posegoal_pose)
192 
193  self.info_msginfo_msginfo_msg('Sending goal request...')
194  send_goal_future = self.action_clientaction_client.send_goal_async(goal_msg)
195 
196  rclpy.spin_until_future_complete(self, send_goal_future)
197  goal_handle = send_goal_future.result()
198 
199  if not goal_handle or not goal_handle.accepted:
200  self.error_msgerror_msgerror_msg('Goal rejected')
201  return False
202 
203  self.info_msginfo_msginfo_msg('Goal accepted')
204  get_result_future = goal_handle.get_result_async()
205 
206  self.info_msginfo_msginfo_msg("Waiting for 'NavigateToPose' action to complete")
207  rclpy.spin_until_future_complete(self, get_result_future)
208  status = get_result_future.result().status # type: ignore[union-attr]
209  if status != GoalStatus.STATUS_SUCCEEDED:
210  self.info_msginfo_msginfo_msg(f'Goal failed with status code: {status}')
211  return False
212 
213  self.info_msginfo_msginfo_msg('Goal succeeded!')
214  return True
215 
216  def isInKeepout(self, x: float, y: float) -> bool:
217  mx, my = self.filter_maskfilter_mask.worldToMap(x, y)
218  if mx == -1 and my == -1: # Out of mask's area
219  return False
220  if self.filter_maskfilter_mask.getValue(mx, my) == 100: # Occupied
221  return True
222  return False
223 
224  # Checks that (x, y) position does not belong to a keepout zone.
225  def checkKeepout(self, x: float, y: float) -> bool:
226  if not self.mask_receivedmask_received:
227  self.warn_msgwarn_msgwarn_msg('Filter mask was not received')
228  elif self.isInKeepoutisInKeepout(x, y):
229  self.filter_test_resultfilter_test_result = False
230  self.error_msgerror_msgerror_msg(f'Pose ({x}, {y}) belongs to keepout zone')
231  return False
232  return True
233 
234  # Checks that currently received speed_limit is equal to the it-th item
235  # of expected speed 'limits' array.
236  # If so, sets it-th item of permissive array 'limit_passed' to be true.
237  # Otherwise it will be remained to be false.
238  # Also verifies that speed limit messages received no more than N-times
239  # (where N - is the length of 'limits' array),
240  # otherwise sets overall 'filter_test_result' to be false.
241  def checkSpeed(self, it: int, speed_limit: float) -> None:
242  if it >= len(self.limitslimits):
243  self.error_msgerror_msgerror_msg('Got excess speed limit')
244  self.filter_test_resultfilter_test_result = False
245  return
246  if speed_limit == self.limitslimits[it]:
247  self.limit_passedlimit_passed[it] = True
248  else:
249  self.error_msgerror_msgerror_msg(
250  'Incorrect speed limit received: '
251  + str(speed_limit)
252  + ', but should be: '
253  + str(self.limitslimits[it])
254  )
255 
256  def poseCallback(self, msg: PoseWithCovarianceStamped) -> None:
257  self.info_msginfo_msginfo_msg('Received amcl_pose')
258  self.current_posecurrent_pose = msg.pose.pose
259  self.initial_pose_receivedinitial_pose_received = True
260  if self.test_typetest_type == TestType.KEEPOUT:
261  if not self.checkKeepoutcheckKeepout(
262  msg.pose.pose.position.x, msg.pose.pose.position.y
263  ):
264  self.error_msgerror_msgerror_msg('Robot goes into keepout zone')
265 
266  def planCallback(self, msg: Path) -> None:
267  self.info_msginfo_msginfo_msg('Received plan')
268  for pose in msg.poses:
269  if not self.checkKeepoutcheckKeepout(pose.pose.position.x, pose.pose.position.y):
270  self.error_msgerror_msgerror_msg('Path plan intersects with keepout zone')
271  return
272 
273  def clearingEndpointsCallback(self, msg: PointCloud2) -> None:
274  if len(msg.data) > 0:
275  self.clearing_endpoints_receivedclearing_endpoints_received = True
276 
277  def voxelMarkedCallback(self, msg: PointCloud2) -> None:
278  if len(msg.data) > 0:
279  self.voxel_marked_receivedvoxel_marked_received = True
280 
281  def voxelUnknownCallback(self, msg: PointCloud2) -> None:
282  if len(msg.data) > 0:
283  self.voxel_unknown_receivedvoxel_unknown_received = True
284 
285  def dwbCostCloudCallback(self, msg: PointCloud2) -> None:
286  self.info_msginfo_msginfo_msg('Received cost_cloud points')
287  if len(msg.data) > 0:
288  self.cost_cloud_receivedcost_cloud_received = True
289 
290  def speedLimitCallback(self, msg: SpeedLimit) -> None:
291  self.info_msginfo_msginfo_msg(f'Received speed limit: {msg.speed_limit}')
292  self.checkSpeedcheckSpeed(self.speed_itspeed_it, msg.speed_limit)
293  self.speed_itspeed_it += 1
294 
295  def maskCallback(self, msg: OccupancyGrid) -> None:
296  self.info_msginfo_msginfo_msg('Received filter mask')
297  self.filter_maskfilter_mask = FilterMask(msg)
298  self.mask_receivedmask_received = True
299 
300  def wait_for_filter_mask(self, timeout: float) -> bool:
301  start_time = time.time()
302 
303  while not self.mask_receivedmask_received:
304  self.info_msginfo_msginfo_msg('Waiting for filter mask to be received ...')
305  rclpy.spin_once(self, timeout_sec=1)
306  if (time.time() - start_time) > timeout:
307  self.error_msgerror_msgerror_msg('Time out to waiting filter mask')
308  return False
309  return True
310 
311  def wait_for_pointcloud_subscribers(self, timeout: float) -> bool:
312  start_time = time.time()
313  while (
314  not self.voxel_unknown_receivedvoxel_unknown_received
315  or not self.voxel_marked_receivedvoxel_marked_received
316  or not self.clearing_endpoints_receivedclearing_endpoints_received
317  ):
318  self.info_msginfo_msginfo_msg(
319  'Waiting for voxel_marked_cloud/voxel_unknown_cloud/\
320  clearing_endpoints msg to be received ...'
321  )
322  rclpy.spin_once(self, timeout_sec=1)
323  if (time.time() - start_time) > timeout:
324  self.error_msgerror_msgerror_msg(
325  'Time out to waiting for voxel_marked_cloud/voxel_unknown_cloud/\
326  clearing_endpoints msgs'
327  )
328  return False
329  return True
330 
331  def reachesGoal(self, timeout: float, distance: float) -> bool:
332  goalReached = False
333  start_time = time.time()
334 
335  while not goalReached:
336  rclpy.spin_once(self, timeout_sec=1)
337  if self.distanceFromGoaldistanceFromGoaldistanceFromGoal() < distance:
338  goalReached = True
339  self.info_msginfo_msginfo_msg('*** GOAL REACHED ***')
340  return True
341  elif timeout is not None:
342  if (time.time() - start_time) > timeout:
343  self.error_msgerror_msgerror_msg('Robot timed out reaching its goal!')
344  return False
345 
346  self.info_msginfo_msginfo_msg('Robot reached its goal!')
347  return True
348 
349  def distanceFromGoal(self) -> float:
350  d_x = self.current_posecurrent_pose.position.x - self.goal_posegoal_pose.position.x
351  d_y = self.current_posecurrent_pose.position.y - self.goal_posegoal_pose.position.y
352  distance = math.sqrt(d_x * d_x + d_y * d_y)
353  self.info_msginfo_msginfo_msg(f'Distance from goal is: {distance}')
354  return distance
355 
356  def wait_for_node_active(self, node_name: str) -> None:
357  # Waits for the node within the tester namespace to become active
358  self.info_msginfo_msginfo_msg(f'Waiting for {node_name} to become active')
359  node_service = f'{node_name}/get_state'
360  state_client = self.create_client(GetState, node_service)
361  while not state_client.wait_for_service(timeout_sec=1.0):
362  self.info_msginfo_msginfo_msg(f'{node_service} service not available, waiting...')
363  req = GetState.Request() # empty request
364  state = 'UNKNOWN'
365  while state != 'active':
366  self.info_msginfo_msginfo_msg(f'Getting {node_name} state...')
367  future = state_client.call_async(req)
368  rclpy.spin_until_future_complete(self, future)
369  if future.result() is not None:
370  state = future.result().current_state.label # type: ignore[union-attr]
371  self.info_msginfo_msginfo_msg(f'Result of get_state: {state}')
372  else:
373  self.error_msgerror_msgerror_msg(
374  f'Exception while calling service: {future.exception()!r}'
375  )
376  time.sleep(5)
377 
378  def shutdown(self) -> None:
379  self.info_msginfo_msginfo_msg('Shutting down')
380  self.action_clientaction_client.destroy()
381 
382  transition_service = 'lifecycle_manager_navigation/manage_nodes'
383  mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
384  while not mgr_client.wait_for_service(timeout_sec=1.0):
385  self.info_msginfo_msginfo_msg(f'{transition_service} service not available, waiting...')
386 
387  req = ManageLifecycleNodes.Request()
388  req.command = ManageLifecycleNodes.Request().SHUTDOWN
389  future = mgr_client.call_async(req)
390  try:
391  self.info_msginfo_msginfo_msg('Shutting down navigation lifecycle manager...')
392  rclpy.spin_until_future_complete(self, future)
393  future.result()
394  self.info_msginfo_msginfo_msg('Shutting down navigation lifecycle manager complete.')
395  except Exception as e: # noqa: B902
396  self.error_msgerror_msgerror_msg(f'Service call failed {e!r}')
397  transition_service = 'lifecycle_manager_localization/manage_nodes'
398  mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
399  while not mgr_client.wait_for_service(timeout_sec=1.0):
400  self.info_msginfo_msginfo_msg(f'{transition_service} service not available, waiting...')
401 
402  req = ManageLifecycleNodes.Request()
403  req.command = ManageLifecycleNodes.Request().SHUTDOWN
404  future = mgr_client.call_async(req)
405  try:
406  self.info_msginfo_msginfo_msg('Shutting down localization lifecycle manager...')
407  rclpy.spin_until_future_complete(self, future)
408  future.result()
409  self.info_msginfo_msginfo_msg('Shutting down localization lifecycle manager complete')
410  except Exception as e: # noqa: B902
411  self.error_msgerror_msgerror_msg(f'Service call failed {e!r}')
412 
413  def wait_for_initial_pose(self) -> None:
414  self.initial_pose_receivedinitial_pose_received = False
415  while not self.initial_pose_receivedinitial_pose_received:
416  self.info_msginfo_msginfo_msg('Setting initial pose')
417  self.setInitialPosesetInitialPosesetInitialPose()
418  self.info_msginfo_msginfo_msg('Waiting for amcl_pose to be received')
419  rclpy.spin_once(self, timeout_sec=1)
420 
421 
422 def test_RobotMovesToGoal(robot_tester: NavTester) -> bool:
423  robot_tester.info_msg('Setting goal pose')
424  robot_tester.publishGoalPose()
425  robot_tester.info_msg('Waiting 60 seconds for robot to reach goal')
426  return robot_tester.reachesGoal(timeout=60, distance=0.5)
427 
428 
429 # Tests that all received speed limits are correct:
430 # If overall 'filter_test_result' is true
431 # checks that all items in 'limit_passed' permissive array are also true.
432 # In other words, it verifies that all speed limits are received
433 # exactly (by count and values) as expected by 'limits' array.
434 def test_SpeedLimitsAllCorrect(robot_tester: NavTester) -> bool:
435  if not robot_tester.filter_test_result:
436  return False
437  for passed in robot_tester.limit_passed:
438  if not passed:
439  robot_tester.error_msg('Did not meet one of the speed limit')
440  return False
441  return True
442 
443 
444 def run_all_tests(robot_tester: NavTester) -> bool:
445  # set transforms to use_sim_time
446  result = True
447  if result:
448  robot_tester.wait_for_node_active('amcl')
449  robot_tester.wait_for_initial_pose()
450  robot_tester.wait_for_node_active('bt_navigator')
451  result = robot_tester.wait_for_filter_mask(10)
452  if result:
453  result = robot_tester.runNavigateAction()
454 
455  if robot_tester.test_type == TestType.KEEPOUT:
456  result = result and robot_tester.wait_for_pointcloud_subscribers(10)
457 
458  if result:
459  result = test_RobotMovesToGoal(robot_tester)
460 
461  if result:
462  if robot_tester.test_type == TestType.KEEPOUT:
463  result = robot_tester.filter_test_result
464  result = result and robot_tester.cost_cloud_received
465  elif robot_tester.test_type == TestType.SPEED:
466  result = test_SpeedLimitsAllCorrect(robot_tester)
467 
468  # Add more tests here if desired
469 
470  if result:
471  robot_tester.info_msg('Test PASSED')
472  else:
473  robot_tester.error_msg('Test FAILED')
474 
475  return result
476 
477 
478 def fwd_pose(x: float = 0.0, y: float = 0.0, z: float = 0.01) -> Pose:
479  initial_pose = Pose()
480  initial_pose.position.x = x
481  initial_pose.position.y = y
482  initial_pose.position.z = z
483  initial_pose.orientation.x = 0.0
484  initial_pose.orientation.y = 0.0
485  initial_pose.orientation.z = 0.0
486  initial_pose.orientation.w = 1.0
487  return initial_pose
488 
489 
490 def get_tester(args: argparse.Namespace) -> NavTester:
491 
492  # Requested tester for one robot
493  type_str = args.type
494  init_x, init_y, final_x, final_y = args.robot[0]
495  test_type = TestType.KEEPOUT # Default value
496  if type_str == 'speed':
497  test_type = TestType.SPEED
498 
499  tester = NavTester(
500  test_type,
501  initial_pose=fwd_pose(float(init_x), float(init_y)),
502  goal_pose=fwd_pose(float(final_x), float(final_y)),
503  )
504  tester.info_msg(
505  'Starting tester, robot going from '
506  + init_x
507  + ', '
508  + init_y
509  + ' to '
510  + final_x
511  + ', '
512  + final_y
513  + '.'
514  )
515  return tester
516 
517 
518 def main(argv: list[str] = sys.argv[1:]): # type: ignore[no-untyped-def]
519  # The robot(s) positions from the input arguments
520  parser = argparse.ArgumentParser(
521  description='System-level costmap filters tester node'
522  )
523  parser.add_argument(
524  '-t',
525  '--type',
526  type=str,
527  action='store',
528  dest='type',
529  help='Type of costmap filter being tested.',
530  )
531  group = parser.add_mutually_exclusive_group(required=True)
532  group.add_argument(
533  '-r',
534  '--robot',
535  action='append',
536  nargs=4,
537  metavar=('init_x', 'init_y', 'final_x', 'final_y'),
538  help='The robot starting and final positions.',
539  )
540 
541  args, unknown = parser.parse_known_args()
542 
543  rclpy.init()
544 
545  # Create tester for the robot
546  tester = get_tester(args)
547 
548  # wait a few seconds to make sure entire stacks are up
549  time.sleep(10)
550 
551  passed = run_all_tests(tester)
552 
553  # stop and shutdown the nav stack to exit cleanly
554  tester.shutdown()
555  tester.info_msg('Done Shutting Down.')
556 
557  if not passed:
558  tester.info_msg('Exiting failed')
559  exit(1)
560  else:
561  tester.info_msg('Exiting passed')
562  exit(0)
563 
564 
565 if __name__ == '__main__':
566  main()
PoseStamped getStampedPoseMsg(self, Pose pose)
Definition: tester_node.py:173
None voxelUnknownCallback(self, PointCloud2 msg)
Definition: tester_node.py:281
None checkSpeed(self, int it, float speed_limit)
Definition: tester_node.py:241
None setInitialPose(self)
Definition: tester_node.py:165
bool isInKeepout(self, float x, float y)
Definition: tester_node.py:216
None clearingEndpointsCallback(self, PointCloud2 msg)
Definition: tester_node.py:273
None poseCallback(self, PoseWithCovarianceStamped msg)
Definition: tester_node.py:256
float distanceFromGoal(self)
Definition: tester_node.py:349
None warn_msg(self, str msg)
Definition: tester_node.py:159
None maskCallback(self, OccupancyGrid msg)
Definition: tester_node.py:295
None dwbCostCloudCallback(self, PointCloud2 msg)
Definition: tester_node.py:285
None voxelMarkedCallback(self, PointCloud2 msg)
Definition: tester_node.py:277
None speedLimitCallback(self, SpeedLimit msg)
Definition: tester_node.py:290
None planCallback(self, Path msg)
Definition: tester_node.py:266
None error_msg(self, str msg)
Definition: tester_node.py:162
bool checkKeepout(self, float x, float y)
Definition: tester_node.py:225
None info_msg(self, str msg)
Definition: tester_node.py:156