Nav2 Navigation Stack - rolling  main
ROS 2 Navigation Stack
tester_node.py
1 #!/usr/bin/env python3
2 
3 # Copyright (c) 2018 Intel Corporation.
4 # Copyright (c) 2020 Samsung Research Russia
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 
18 import argparse
19 from enum import Enum
20 import math
21 import sys
22 import time
23 from typing import Optional
24 
25 from action_msgs.msg import GoalStatus
26 from geometry_msgs.msg import Pose, PoseStamped, PoseWithCovarianceStamped
27 from lifecycle_msgs.srv import GetState
28 from nav2_msgs.action import NavigateToPose
29 from nav2_msgs.msg import SpeedLimit
30 from nav2_msgs.srv import ManageLifecycleNodes
31 from nav_msgs.msg import OccupancyGrid, Path
32 import rclpy
33 from rclpy.action import ActionClient
34 from rclpy.client import Client
35 from rclpy.node import Node
36 from rclpy.qos import QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy
37 from sensor_msgs.msg import PointCloud2
38 
39 
40 class TestType(Enum):
41  KEEPOUT = 0
42  SPEED = 1
43 
44 
45 class FilterMask:
46 
47  def __init__(self, filter_mask: OccupancyGrid):
48  self.filter_mask: OccupancyGrid = filter_mask
49 
50  # Converts world coordinates into filter mask map coordinate.
51  # Returns filter mask map coordinates or (-1, -1) in case
52  # if world coordinates are out of mask bounds.
53  def worldToMap(self, wx: float, wy: float) -> tuple[int, int]:
54  origin_x = self.filter_mask.info.origin.position.x
55  origin_y = self.filter_mask.info.origin.position.y
56  size_x = self.filter_mask.info.width
57  size_y = self.filter_mask.info.height
58  resolution = self.filter_mask.info.resolution
59 
60  if wx < origin_x or wy < origin_y:
61  return -1, -1
62 
63  mx = int((wx - origin_x) / resolution)
64  my = int((wy - origin_y) / resolution)
65 
66  if mx < size_x and my < size_y:
67  return mx, my
68 
69  return -1, -1
70 
71  # Gets filter_mask[mx, my] value
72  def getValue(self, mx: int, my: int): # type: ignore[no-untyped-def]
73  size_x = self.filter_mask.info.width
74  return self.filter_mask.data[mx + my * size_x]
75 
76 
77 class NavTester(Node):
78  def __init__(
79  self,
80  test_type: TestType,
81  initial_pose: Pose,
82  goal_pose: Pose,
83  namespace: str = '',
84  ):
85  super().__init__(node_name='nav2_tester', namespace=namespace)
86  self.initial_pose_pubinitial_pose_pub = self.create_publisher(
87  PoseWithCovarianceStamped, 'initialpose', 10
88  )
89  self.goal_pubgoal_pub = self.create_publisher(PoseStamped, 'goal_pose', 10)
90 
91  transient_local_qos = QoSProfile(
92  durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
93  reliability=QoSReliabilityPolicy.RELIABLE,
94  history=QoSHistoryPolicy.KEEP_LAST,
95  depth=1,
96  )
97 
98  volatile_qos = QoSProfile(
99  durability=QoSDurabilityPolicy.VOLATILE,
100  reliability=QoSReliabilityPolicy.RELIABLE,
101  history=QoSHistoryPolicy.KEEP_LAST,
102  depth=1,
103  )
104 
105  self.model_pose_submodel_pose_sub = self.create_subscription(
106  PoseWithCovarianceStamped,
107  'amcl_pose',
108  self.poseCallbackposeCallbackposeCallback,
109  transient_local_qos,
110  )
111  self.clearing_ep_subclearing_ep_sub = self.create_subscription(
112  PointCloud2,
113  'local_costmap/clearing_endpoints',
114  self.clearingEndpointsCallbackclearingEndpointsCallback,
115  transient_local_qos,
116  )
117  self.test_typetest_type = test_type
118  self.filter_test_resultfilter_test_result = True
119  self.clearing_endpoints_receivedclearing_endpoints_received = False
120  self.voxel_marked_receivedvoxel_marked_received = False
121  self.voxel_unknown_receivedvoxel_unknown_received = False
122  self.cost_cloud_receivedcost_cloud_received = False
123 
124  if self.test_typetest_type == TestType.KEEPOUT:
125  self.plan_subplan_sub = self.create_subscription(
126  Path, 'plan', self.planCallbackplanCallback, volatile_qos
127  )
128  self.voxel_marked_subvoxel_marked_sub = self.create_subscription(
129  PointCloud2, 'voxel_marked_cloud', self.voxelMarkedCallbackvoxelMarkedCallback, 1
130  )
131  self.voxel_unknown_subvoxel_unknown_sub = self.create_subscription(
132  PointCloud2, 'voxel_unknown_cloud', self.voxelUnknownCallbackvoxelUnknownCallback, 1
133  )
134  self.cost_cloud_subcost_cloud_sub = self.create_subscription(
135  PointCloud2, 'cost_cloud', self.dwbCostCloudCallbackdwbCostCloudCallback, 1
136  )
137  elif self.test_typetest_type == TestType.SPEED:
138  self.speed_itspeed_it = 0
139  # Expected chain of speed limits
140  self.limitslimits = [50.0, 0.0]
141  # Permissive array: all received speed limits must match to 'limits' from above
142  self.limit_passedlimit_passed = [False, False]
143  self.plan_subplan_sub = self.create_subscription(
144  SpeedLimit, 'speed_limit', self.speedLimitCallbackspeedLimitCallback, volatile_qos
145  )
146 
147  self.mask_receivedmask_received = False
148  self.mask_submask_sub = self.create_subscription(
149  OccupancyGrid, 'filter_mask', self.maskCallbackmaskCallback, transient_local_qos
150  )
151 
152  self.initial_pose_receivedinitial_pose_received = False
153  self.initial_poseinitial_pose = initial_pose
154  self.goal_posegoal_pose = goal_pose
155  self.action_client: ActionClient[
156  NavigateToPose.Goal, NavigateToPose.Result, NavigateToPose.Feedback] = \
157  ActionClient(self, NavigateToPose, 'navigate_to_pose')
158 
159  def info_msg(self, msg: str) -> None:
160  self.get_logger().info('\033[1;37;44m' + msg + '\033[0m')
161 
162  def warn_msg(self, msg: str) -> None:
163  self.get_logger().warning('\033[1;37;43m' + msg + '\033[0m')
164 
165  def error_msg(self, msg: str) -> None:
166  self.get_logger().error('\033[1;37;41m' + msg + '\033[0m')
167 
168  def setInitialPose(self) -> None:
169  msg = PoseWithCovarianceStamped()
170  msg.pose.pose = self.initial_poseinitial_pose
171  msg.header.frame_id = 'map'
172  self.info_msginfo_msginfo_msg('Publishing Initial Pose')
173  self.initial_pose_pubinitial_pose_pub.publish(msg)
174  self.currentPosecurrentPose = self.initial_poseinitial_pose
175 
176  def getStampedPoseMsg(self, pose: Pose) -> PoseStamped:
177  msg = PoseStamped()
178  msg.header.frame_id = 'map'
179  msg.pose = pose
180  return msg
181 
182  def publishGoalPose(self, goal_pose: Optional[Pose] = None) -> None:
183  self.goal_posegoal_pose = goal_pose if goal_pose is not None else self.goal_posegoal_pose
184  self.goal_pubgoal_pub.publish(self.getStampedPoseMsggetStampedPoseMsggetStampedPoseMsg(self.goal_posegoal_pose))
185 
186  def runNavigateAction(self, goal_pose: Optional[Pose] = None) -> bool:
187  # Sends a `NavToPose` action request and waits for completion
188  self.info_msginfo_msginfo_msg("Waiting for 'NavigateToPose' action server")
189  while not self.action_client.wait_for_server(timeout_sec=1.0):
190  self.info_msginfo_msginfo_msg("'NavigateToPose' action server not available, waiting...")
191 
192  self.goal_posegoal_pose = goal_pose if goal_pose is not None else self.goal_posegoal_pose
193  goal_msg = NavigateToPose.Goal()
194  goal_msg.pose = self.getStampedPoseMsggetStampedPoseMsggetStampedPoseMsg(self.goal_posegoal_pose)
195 
196  self.info_msginfo_msginfo_msg('Sending goal request...')
197  send_goal_future = self.action_client.send_goal_async(goal_msg)
198 
199  rclpy.spin_until_future_complete(self, send_goal_future)
200  goal_handle = send_goal_future.result()
201 
202  if not goal_handle or not goal_handle.accepted:
203  self.error_msgerror_msgerror_msg('Goal rejected')
204  return False
205 
206  self.info_msginfo_msginfo_msg('Goal accepted')
207  get_result_future = goal_handle.get_result_async()
208 
209  self.info_msginfo_msginfo_msg("Waiting for 'NavigateToPose' action to complete")
210  rclpy.spin_until_future_complete(self, get_result_future)
211  status = get_result_future.result().status # type: ignore[union-attr]
212  if status != GoalStatus.STATUS_SUCCEEDED:
213  self.info_msginfo_msginfo_msg(f'Goal failed with status code: {status}')
214  return False
215 
216  self.info_msginfo_msginfo_msg('Goal succeeded!')
217  return True
218 
219  def isInKeepout(self, x: float, y: float) -> bool:
220  mx, my = self.filter_maskfilter_mask.worldToMap(x, y)
221  if mx == -1 and my == -1: # Out of mask's area
222  return False
223  if self.filter_maskfilter_mask.getValue(mx, my) == 100: # Occupied
224  return True
225  return False
226 
227  # Checks that (x, y) position does not belong to a keepout zone.
228  def checkKeepout(self, x: float, y: float) -> bool:
229  if not self.mask_receivedmask_received:
230  self.warn_msgwarn_msgwarn_msg('Filter mask was not received')
231  elif self.isInKeepoutisInKeepout(x, y):
232  self.filter_test_resultfilter_test_result = False
233  self.error_msgerror_msgerror_msg(f'Pose ({x}, {y}) belongs to keepout zone')
234  return False
235  return True
236 
237  # Checks that currently received speed_limit is equal to the it-th item
238  # of expected speed 'limits' array.
239  # If so, sets it-th item of permissive array 'limit_passed' to be true.
240  # Otherwise it will be remained to be false.
241  # Also verifies that speed limit messages received no more than N-times
242  # (where N - is the length of 'limits' array),
243  # otherwise sets overall 'filter_test_result' to be false.
244  def checkSpeed(self, it: int, speed_limit: float) -> None:
245  if it >= len(self.limitslimits):
246  self.error_msgerror_msgerror_msg('Got excess speed limit')
247  self.filter_test_resultfilter_test_result = False
248  return
249  if speed_limit == self.limitslimits[it]:
250  self.limit_passedlimit_passed[it] = True
251  else:
252  self.error_msgerror_msgerror_msg(
253  'Incorrect speed limit received: '
254  + str(speed_limit)
255  + ', but should be: '
256  + str(self.limitslimits[it])
257  )
258 
259  def poseCallback(self, msg: PoseWithCovarianceStamped) -> None:
260  self.info_msginfo_msginfo_msg('Received amcl_pose')
261  self.current_posecurrent_pose = msg.pose.pose
262  self.initial_pose_receivedinitial_pose_received = True
263  if self.test_typetest_type == TestType.KEEPOUT:
264  if not self.checkKeepoutcheckKeepout(
265  msg.pose.pose.position.x, msg.pose.pose.position.y
266  ):
267  self.error_msgerror_msgerror_msg('Robot goes into keepout zone')
268 
269  def planCallback(self, msg: Path) -> None:
270  self.info_msginfo_msginfo_msg('Received plan')
271  for pose in msg.poses:
272  if not self.checkKeepoutcheckKeepout(pose.pose.position.x, pose.pose.position.y):
273  self.error_msgerror_msgerror_msg('Path plan intersects with keepout zone')
274  return
275 
276  def clearingEndpointsCallback(self, msg: PointCloud2) -> None:
277  if len(msg.data) > 0:
278  self.clearing_endpoints_receivedclearing_endpoints_received = True
279 
280  def voxelMarkedCallback(self, msg: PointCloud2) -> None:
281  if len(msg.data) > 0:
282  self.voxel_marked_receivedvoxel_marked_received = True
283 
284  def voxelUnknownCallback(self, msg: PointCloud2) -> None:
285  if len(msg.data) > 0:
286  self.voxel_unknown_receivedvoxel_unknown_received = True
287 
288  def dwbCostCloudCallback(self, msg: PointCloud2) -> None:
289  self.info_msginfo_msginfo_msg('Received cost_cloud points')
290  if len(msg.data) > 0:
291  self.cost_cloud_receivedcost_cloud_received = True
292 
293  def speedLimitCallback(self, msg: SpeedLimit) -> None:
294  self.info_msginfo_msginfo_msg(f'Received speed limit: {msg.speed_limit}')
295  self.checkSpeedcheckSpeed(self.speed_itspeed_it, msg.speed_limit)
296  self.speed_itspeed_it += 1
297 
298  def maskCallback(self, msg: OccupancyGrid) -> None:
299  self.info_msginfo_msginfo_msg('Received filter mask')
300  self.filter_maskfilter_mask = FilterMask(msg)
301  self.mask_receivedmask_received = True
302 
303  def wait_for_filter_mask(self, timeout: float) -> bool:
304  start_time = time.time()
305 
306  while not self.mask_receivedmask_received:
307  self.info_msginfo_msginfo_msg('Waiting for filter mask to be received ...')
308  rclpy.spin_once(self, timeout_sec=1)
309  if (time.time() - start_time) > timeout:
310  self.error_msgerror_msgerror_msg('Time out to waiting filter mask')
311  return False
312  return True
313 
314  def wait_for_pointcloud_subscribers(self, timeout: float) -> bool:
315  start_time = time.time()
316  while (
317  not self.voxel_unknown_receivedvoxel_unknown_received
318  or not self.voxel_marked_receivedvoxel_marked_received
319  or not self.clearing_endpoints_receivedclearing_endpoints_received
320  ):
321  self.info_msginfo_msginfo_msg(
322  'Waiting for voxel_marked_cloud/voxel_unknown_cloud/\
323  clearing_endpoints msg to be received ...'
324  )
325  rclpy.spin_once(self, timeout_sec=1)
326  if (time.time() - start_time) > timeout:
327  self.error_msgerror_msgerror_msg(
328  'Time out to waiting for voxel_marked_cloud/voxel_unknown_cloud/\
329  clearing_endpoints msgs'
330  )
331  return False
332  return True
333 
334  def reachesGoal(self, timeout: float, distance: float) -> bool:
335  goalReached = False
336  start_time = time.time()
337 
338  while not goalReached:
339  rclpy.spin_once(self, timeout_sec=1)
340  if self.distanceFromGoaldistanceFromGoaldistanceFromGoal() < distance:
341  goalReached = True
342  self.info_msginfo_msginfo_msg('*** GOAL REACHED ***')
343  return True
344  elif timeout is not None:
345  if (time.time() - start_time) > timeout:
346  self.error_msgerror_msgerror_msg('Robot timed out reaching its goal!')
347  return False
348 
349  self.info_msginfo_msginfo_msg('Robot reached its goal!')
350  return True
351 
352  def distanceFromGoal(self) -> float:
353  d_x = self.current_posecurrent_pose.position.x - self.goal_posegoal_pose.position.x
354  d_y = self.current_posecurrent_pose.position.y - self.goal_posegoal_pose.position.y
355  distance = math.sqrt(d_x * d_x + d_y * d_y)
356  self.info_msginfo_msginfo_msg(f'Distance from goal is: {distance}')
357  return distance
358 
359  def wait_for_node_active(self, node_name: str) -> None:
360  # Waits for the node within the tester namespace to become active
361  self.info_msginfo_msginfo_msg(f'Waiting for {node_name} to become active')
362  node_service = f'{node_name}/get_state'
363  state_client: Client[GetState.Request, GetState.Response] \
364  = self.create_client(GetState, node_service)
365  while not state_client.wait_for_service(timeout_sec=1.0):
366  self.info_msginfo_msginfo_msg(f'{node_service} service not available, waiting...')
367  req = GetState.Request() # empty request
368  state = 'UNKNOWN'
369  while state != 'active':
370  self.info_msginfo_msginfo_msg(f'Getting {node_name} state...')
371  future = state_client.call_async(req)
372  rclpy.spin_until_future_complete(self, future)
373  if future.result() is not None:
374  state = future.result().current_state.label # type: ignore[union-attr]
375  self.info_msginfo_msginfo_msg(f'Result of get_state: {state}')
376  else:
377  self.error_msgerror_msgerror_msg(
378  f'Exception while calling service: {future.exception()!r}'
379  )
380  time.sleep(5)
381 
382  def shutdown(self) -> None:
383  self.info_msginfo_msginfo_msg('Shutting down')
384  self.action_client.destroy()
385 
386  transition_service = 'lifecycle_manager_navigation/manage_nodes'
387  mgr_client: Client[ManageLifecycleNodes.Request, ManageLifecycleNodes.Response] \
388  = self.create_client(ManageLifecycleNodes, transition_service)
389  while not mgr_client.wait_for_service(timeout_sec=1.0):
390  self.info_msginfo_msginfo_msg(f'{transition_service} service not available, waiting...')
391 
392  req = ManageLifecycleNodes.Request()
393  req.command = ManageLifecycleNodes.Request().SHUTDOWN
394  future = mgr_client.call_async(req)
395  try:
396  self.info_msginfo_msginfo_msg('Shutting down navigation lifecycle manager...')
397  rclpy.spin_until_future_complete(self, future)
398  future.result()
399  self.info_msginfo_msginfo_msg('Shutting down navigation lifecycle manager complete.')
400  except Exception as e: # noqa: B902
401  self.error_msgerror_msgerror_msg(f'Service call failed {e!r}')
402  transition_service = 'lifecycle_manager_localization/manage_nodes'
403  mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
404  while not mgr_client.wait_for_service(timeout_sec=1.0):
405  self.info_msginfo_msginfo_msg(f'{transition_service} service not available, waiting...')
406 
407  req = ManageLifecycleNodes.Request()
408  req.command = ManageLifecycleNodes.Request().SHUTDOWN
409  future = mgr_client.call_async(req)
410  try:
411  self.info_msginfo_msginfo_msg('Shutting down localization lifecycle manager...')
412  rclpy.spin_until_future_complete(self, future)
413  future.result()
414  self.info_msginfo_msginfo_msg('Shutting down localization lifecycle manager complete')
415  except Exception as e: # noqa: B902
416  self.error_msgerror_msgerror_msg(f'Service call failed {e!r}')
417 
418  def wait_for_initial_pose(self) -> None:
419  self.initial_pose_receivedinitial_pose_received = False
420  while not self.initial_pose_receivedinitial_pose_received:
421  self.info_msginfo_msginfo_msg('Setting initial pose')
422  self.setInitialPosesetInitialPosesetInitialPose()
423  self.info_msginfo_msginfo_msg('Waiting for amcl_pose to be received')
424  rclpy.spin_once(self, timeout_sec=1)
425 
426 
427 def test_RobotMovesToGoal(robot_tester: NavTester) -> bool:
428  robot_tester.info_msg('Setting goal pose')
429  robot_tester.publishGoalPose()
430  robot_tester.info_msg('Waiting 60 seconds for robot to reach goal')
431  return robot_tester.reachesGoal(timeout=60, distance=0.5)
432 
433 
434 # Tests that all received speed limits are correct:
435 # If overall 'filter_test_result' is true
436 # checks that all items in 'limit_passed' permissive array are also true.
437 # In other words, it verifies that all speed limits are received
438 # exactly (by count and values) as expected by 'limits' array.
439 def test_SpeedLimitsAllCorrect(robot_tester: NavTester) -> bool:
440  if not robot_tester.filter_test_result:
441  return False
442  for passed in robot_tester.limit_passed:
443  if not passed:
444  robot_tester.error_msg('Did not meet one of the speed limit')
445  return False
446  return True
447 
448 
449 def run_all_tests(robot_tester: NavTester) -> bool:
450  # set transforms to use_sim_time
451  result = True
452  if result:
453  robot_tester.wait_for_node_active('amcl')
454  robot_tester.wait_for_initial_pose()
455  robot_tester.wait_for_node_active('bt_navigator')
456  result = robot_tester.wait_for_filter_mask(10)
457  if result:
458  result = robot_tester.runNavigateAction()
459 
460  if robot_tester.test_type == TestType.KEEPOUT:
461  result = result and robot_tester.wait_for_pointcloud_subscribers(10)
462 
463  if result:
464  result = test_RobotMovesToGoal(robot_tester)
465 
466  if result:
467  if robot_tester.test_type == TestType.KEEPOUT:
468  result = robot_tester.filter_test_result
469  result = result and robot_tester.cost_cloud_received
470  elif robot_tester.test_type == TestType.SPEED:
471  result = test_SpeedLimitsAllCorrect(robot_tester)
472 
473  # Add more tests here if desired
474 
475  if result:
476  robot_tester.info_msg('Test PASSED')
477  else:
478  robot_tester.error_msg('Test FAILED')
479 
480  return result
481 
482 
483 def fwd_pose(x: float = 0.0, y: float = 0.0, z: float = 0.01) -> Pose:
484  initial_pose = Pose()
485  initial_pose.position.x = x
486  initial_pose.position.y = y
487  initial_pose.position.z = z
488  initial_pose.orientation.x = 0.0
489  initial_pose.orientation.y = 0.0
490  initial_pose.orientation.z = 0.0
491  initial_pose.orientation.w = 1.0
492  return initial_pose
493 
494 
495 def get_tester(args: argparse.Namespace) -> NavTester:
496 
497  # Requested tester for one robot
498  type_str = args.type
499  init_x, init_y, final_x, final_y = args.robot[0]
500  test_type = TestType.KEEPOUT # Default value
501  if type_str == 'speed':
502  test_type = TestType.SPEED
503 
504  tester = NavTester(
505  test_type,
506  initial_pose=fwd_pose(float(init_x), float(init_y)),
507  goal_pose=fwd_pose(float(final_x), float(final_y)),
508  )
509  tester.info_msg(
510  'Starting tester, robot going from '
511  + init_x
512  + ', '
513  + init_y
514  + ' to '
515  + final_x
516  + ', '
517  + final_y
518  + '.'
519  )
520  return tester
521 
522 
523 def main(argv: list[str] = sys.argv[1:]): # type: ignore[no-untyped-def]
524  # The robot(s) positions from the input arguments
525  parser = argparse.ArgumentParser(
526  description='System-level costmap filters tester node'
527  )
528  parser.add_argument(
529  '-t',
530  '--type',
531  type=str,
532  action='store',
533  dest='type',
534  help='Type of costmap filter being tested.',
535  )
536  group = parser.add_mutually_exclusive_group(required=True)
537  group.add_argument(
538  '-r',
539  '--robot',
540  action='append',
541  nargs=4,
542  metavar=('init_x', 'init_y', 'final_x', 'final_y'),
543  help='The robot starting and final positions.',
544  )
545 
546  args, unknown = parser.parse_known_args()
547 
548  rclpy.init()
549 
550  # Create tester for the robot
551  tester = get_tester(args)
552 
553  # wait a few seconds to make sure entire stacks are up
554  time.sleep(10)
555 
556  passed = run_all_tests(tester)
557 
558  # stop and shutdown the nav stack to exit cleanly
559  tester.shutdown()
560  tester.info_msg('Done Shutting Down.')
561 
562  if not passed:
563  tester.info_msg('Exiting failed')
564  exit(1)
565  else:
566  tester.info_msg('Exiting passed')
567  exit(0)
568 
569 
570 if __name__ == '__main__':
571  main()
PoseStamped getStampedPoseMsg(self, Pose pose)
Definition: tester_node.py:176
None voxelUnknownCallback(self, PointCloud2 msg)
Definition: tester_node.py:284
None checkSpeed(self, int it, float speed_limit)
Definition: tester_node.py:244
None setInitialPose(self)
Definition: tester_node.py:168
bool isInKeepout(self, float x, float y)
Definition: tester_node.py:219
None clearingEndpointsCallback(self, PointCloud2 msg)
Definition: tester_node.py:276
None poseCallback(self, PoseWithCovarianceStamped msg)
Definition: tester_node.py:259
float distanceFromGoal(self)
Definition: tester_node.py:352
None warn_msg(self, str msg)
Definition: tester_node.py:162
None maskCallback(self, OccupancyGrid msg)
Definition: tester_node.py:298
None dwbCostCloudCallback(self, PointCloud2 msg)
Definition: tester_node.py:288
None voxelMarkedCallback(self, PointCloud2 msg)
Definition: tester_node.py:280
None speedLimitCallback(self, SpeedLimit msg)
Definition: tester_node.py:293
None planCallback(self, Path msg)
Definition: tester_node.py:269
None error_msg(self, str msg)
Definition: tester_node.py:165
bool checkKeepout(self, float x, float y)
Definition: tester_node.py:228
None info_msg(self, str msg)
Definition: tester_node.py:159