23 from typing
import Optional
25 from action_msgs.msg
import GoalStatus
26 from geometry_msgs.msg
import Pose
27 from geometry_msgs.msg
import PoseStamped
28 from geometry_msgs.msg
import PoseWithCovarianceStamped
29 from lifecycle_msgs.srv
import GetState
30 from nav2_msgs.action
import NavigateToPose
31 from nav2_msgs.msg
import SpeedLimit
32 from nav2_msgs.srv
import ManageLifecycleNodes
33 from nav_msgs.msg
import OccupancyGrid
34 from nav_msgs.msg
import Path
37 from rclpy.action
import ActionClient
38 from rclpy.node
import Node
39 from rclpy.qos
import QoSDurabilityPolicy, QoSHistoryPolicy, QoSReliabilityPolicy
40 from rclpy.qos
import QoSProfile
41 from sensor_msgs.msg
import PointCloud2
51 def __init__(self, filter_mask: OccupancyGrid):
57 def worldToMap(self, wx: float, wy: float):
58 origin_x = self.
filter_maskfilter_mask.info.origin.position.x
59 origin_y = self.
filter_maskfilter_mask.info.origin.position.y
62 resolution = self.
filter_maskfilter_mask.info.resolution
64 if wx < origin_x
or wy < origin_y:
67 mx = int((wx - origin_x) / resolution)
68 my = int((wy - origin_y) / resolution)
70 if mx < size_x
and my < size_y:
76 def getValue(self, mx, my):
78 return self.
filter_maskfilter_mask.data[mx + my * size_x]
89 super().__init__(node_name=
'nav2_tester', namespace=namespace)
91 PoseWithCovarianceStamped,
'initialpose', 10
93 self.
goal_pubgoal_pub = self.create_publisher(PoseStamped,
'goal_pose', 10)
95 transient_local_qos = QoSProfile(
96 durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
97 reliability=QoSReliabilityPolicy.RELIABLE,
98 history=QoSHistoryPolicy.KEEP_LAST,
102 volatile_qos = QoSProfile(
103 durability=QoSDurabilityPolicy.RMW_QOS_POLICY_DURABILITY_VOLATILE,
104 reliability=QoSReliabilityPolicy.RELIABLE,
105 history=QoSHistoryPolicy.RMW_QOS_POLICY_HISTORY_KEEP_LAST,
110 PoseWithCovarianceStamped,
117 'local_costmap/clearing_endpoints',
128 if self.
test_typetest_type == TestType.KEEPOUT:
129 self.
plan_subplan_sub = self.create_subscription(
130 Path,
'plan', self.
planCallbackplanCallback, volatile_qos
141 elif self.
test_typetest_type == TestType.SPEED:
144 self.
limitslimits = [50.0, 0.0]
147 self.
plan_subplan_sub = self.create_subscription(
152 self.
mask_submask_sub = self.create_subscription(
153 OccupancyGrid,
'filter_mask', self.
maskCallbackmaskCallback, transient_local_qos
159 self.
action_clientaction_client = ActionClient(self, NavigateToPose,
'navigate_to_pose')
161 def info_msg(self, msg: str):
162 self.get_logger().info(
'\033[1;37;44m' + msg +
'\033[0m')
164 def warn_msg(self, msg: str):
165 self.get_logger().warn(
'\033[1;37;43m' + msg +
'\033[0m')
167 def error_msg(self, msg: str):
168 self.get_logger().error(
'\033[1;37;41m' + msg +
'\033[0m')
170 def setInitialPose(self):
171 msg = PoseWithCovarianceStamped()
173 msg.header.frame_id =
'map'
178 def getStampedPoseMsg(self, pose: Pose):
180 msg.header.frame_id =
'map'
184 def publishGoalPose(self, goal_pose: Optional[Pose] =
None):
185 self.
goal_posegoal_pose = goal_pose
if goal_pose
is not None else self.
goal_posegoal_pose
188 def runNavigateAction(self, goal_pose: Optional[Pose] =
None):
191 while not self.
action_clientaction_client.wait_for_server(timeout_sec=1.0):
192 self.
info_msginfo_msginfo_msg(
"'NavigateToPose' action server not available, waiting...")
194 self.
goal_posegoal_pose = goal_pose
if goal_pose
is not None else self.
goal_posegoal_pose
195 goal_msg = NavigateToPose.Goal()
199 send_goal_future = self.
action_clientaction_client.send_goal_async(goal_msg)
201 rclpy.spin_until_future_complete(self, send_goal_future)
202 goal_handle = send_goal_future.result()
204 if not goal_handle.accepted:
209 get_result_future = goal_handle.get_result_async()
211 self.
info_msginfo_msginfo_msg(
"Waiting for 'NavigateToPose' action to complete")
212 rclpy.spin_until_future_complete(self, get_result_future)
213 status = get_result_future.result().status
214 if status != GoalStatus.STATUS_SUCCEEDED:
221 def isInKeepout(self, x, y):
222 mx, my = self.
filter_maskfilter_mask.worldToMap(x, y)
223 if mx == -1
and my == -1:
225 if self.
filter_maskfilter_mask.getValue(mx, my) == 100:
230 def checkKeepout(self, x, y):
246 def checkSpeed(self, it, speed_limit):
247 if it >= len(self.
limitslimits):
251 if speed_limit == self.
limitslimits[it]:
255 'Incorrect speed limit received: '
257 +
', but should be: '
258 + str(self.
limitslimits[it])
261 def poseCallback(self, msg):
265 if self.
test_typetest_type == TestType.KEEPOUT:
267 msg.pose.pose.position.x, msg.pose.pose.position.y
271 def planCallback(self, msg):
273 for pose
in msg.poses:
274 if not self.
checkKeepoutcheckKeepout(pose.pose.position.x, pose.pose.position.y):
278 def clearingEndpointsCallback(self, msg):
279 if len(msg.data) > 0:
282 def voxelMarkedCallback(self, msg):
283 if len(msg.data) > 0:
286 def voxelUnknownCallback(self, msg):
287 if len(msg.data) > 0:
290 def dwbCostCloudCallback(self, msg):
292 if len(msg.data) > 0:
295 def speedLimitCallback(self, msg):
300 def maskCallback(self, msg):
305 def wait_for_filter_mask(self, timeout):
306 start_time = time.time()
310 rclpy.spin_once(self, timeout_sec=1)
311 if (time.time() - start_time) > timeout:
316 def wait_for_pointcloud_subscribers(self, timeout):
317 start_time = time.time()
324 'Waiting for voxel_marked_cloud/voxel_unknown_cloud/\
325 clearing_endpoints msg to be received ...'
327 rclpy.spin_once(self, timeout_sec=1)
328 if (time.time() - start_time) > timeout:
330 'Time out to waiting for voxel_marked_cloud/voxel_unknown_cloud/\
331 clearing_endpoints msgs'
336 def reachesGoal(self, timeout, distance):
338 start_time = time.time()
340 while not goalReached:
341 rclpy.spin_once(self, timeout_sec=1)
346 elif timeout
is not None:
347 if (time.time() - start_time) > timeout:
351 def distanceFromGoal(self):
354 distance = math.sqrt(d_x * d_x + d_y * d_y)
358 def wait_for_node_active(self, node_name: str):
361 node_service = f
'{node_name}/get_state'
362 state_client = self.create_client(GetState, node_service)
363 while not state_client.wait_for_service(timeout_sec=1.0):
364 self.
info_msginfo_msginfo_msg(f
'{node_service} service not available, waiting...')
365 req = GetState.Request()
367 while state !=
'active':
369 future = state_client.call_async(req)
370 rclpy.spin_until_future_complete(self, future)
371 if future.result()
is not None:
372 state = future.result().current_state.label
376 'Exception while calling service: %r' % future.exception()
384 transition_service =
'lifecycle_manager_navigation/manage_nodes'
385 mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
386 while not mgr_client.wait_for_service(timeout_sec=1.0):
387 self.
info_msginfo_msginfo_msg(f
'{transition_service} service not available, waiting...')
389 req = ManageLifecycleNodes.Request()
390 req.command = ManageLifecycleNodes.Request().SHUTDOWN
391 future = mgr_client.call_async(req)
393 self.
info_msginfo_msginfo_msg(
'Shutting down navigation lifecycle manager...')
394 rclpy.spin_until_future_complete(self, future)
396 self.
info_msginfo_msginfo_msg(
'Shutting down navigation lifecycle manager complete.')
397 except Exception
as e:
399 transition_service =
'lifecycle_manager_localization/manage_nodes'
400 mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
401 while not mgr_client.wait_for_service(timeout_sec=1.0):
402 self.
info_msginfo_msginfo_msg(f
'{transition_service} service not available, waiting...')
404 req = ManageLifecycleNodes.Request()
405 req.command = ManageLifecycleNodes.Request().SHUTDOWN
406 future = mgr_client.call_async(req)
408 self.
info_msginfo_msginfo_msg(
'Shutting down localization lifecycle manager...')
409 rclpy.spin_until_future_complete(self, future)
411 self.
info_msginfo_msginfo_msg(
'Shutting down localization lifecycle manager complete')
412 except Exception
as e:
415 def wait_for_initial_pose(self):
421 rclpy.spin_once(self, timeout_sec=1)
424 def test_RobotMovesToGoal(robot_tester):
425 robot_tester.info_msg(
'Setting goal pose')
426 robot_tester.publishGoalPose()
427 robot_tester.info_msg(
'Waiting 60 seconds for robot to reach goal')
428 return robot_tester.reachesGoal(timeout=60, distance=0.5)
436 def test_SpeedLimitsAllCorrect(robot_tester):
437 if not robot_tester.filter_test_result:
439 for passed
in robot_tester.limit_passed:
441 robot_tester.error_msg(
'Did not meet one of the speed limit')
446 def run_all_tests(robot_tester):
450 robot_tester.wait_for_node_active(
'amcl')
451 robot_tester.wait_for_initial_pose()
452 robot_tester.wait_for_node_active(
'bt_navigator')
453 result = robot_tester.wait_for_filter_mask(10)
455 result = robot_tester.runNavigateAction()
457 if robot_tester.test_type == TestType.KEEPOUT:
458 result = result
and robot_tester.wait_for_pointcloud_subscribers(10)
461 result = test_RobotMovesToGoal(robot_tester)
464 if robot_tester.test_type == TestType.KEEPOUT:
465 result = robot_tester.filter_test_result
466 result = result
and robot_tester.cost_cloud_received
467 elif robot_tester.test_type == TestType.SPEED:
468 result = test_SpeedLimitsAllCorrect(robot_tester)
473 robot_tester.info_msg(
'Test PASSED')
475 robot_tester.error_msg(
'Test FAILED')
480 def fwd_pose(x=0.0, y=0.0, z=0.01):
481 initial_pose = Pose()
482 initial_pose.position.x = x
483 initial_pose.position.y = y
484 initial_pose.position.z = z
485 initial_pose.orientation.x = 0.0
486 initial_pose.orientation.y = 0.0
487 initial_pose.orientation.z = 0.0
488 initial_pose.orientation.w = 1.0
492 def get_tester(args):
496 init_x, init_y, final_x, final_y = args.robot[0]
497 test_type = TestType.KEEPOUT
498 if type_str ==
'speed':
499 test_type = TestType.SPEED
503 initial_pose=fwd_pose(float(init_x), float(init_y)),
504 goal_pose=fwd_pose(float(final_x), float(final_y)),
507 'Starting tester, robot going from '
520 def main(argv=sys.argv[1:]):
522 parser = argparse.ArgumentParser(
523 description=
'System-level costmap filters tester node'
531 help=
'Type of costmap filter being tested.',
533 group = parser.add_mutually_exclusive_group(required=
True)
539 metavar=(
'init_x',
'init_y',
'final_x',
'final_y'),
540 help=
'The robot starting and final positions.',
543 args, unknown = parser.parse_known_args()
548 tester = get_tester(args)
553 passed = run_all_tests(tester)
557 tester.info_msg(
'Done Shutting Down.')
560 tester.info_msg(
'Exiting failed')
563 tester.info_msg(
'Exiting passed')
567 if __name__ ==
'__main__':
def dwbCostCloudCallback(self, msg)
clearing_endpoints_received
def info_msg(self, str msg)
def getStampedPoseMsg(self, Pose pose)
def planCallback(self, msg)
def voxelMarkedCallback(self, msg)
def distanceFromGoal(self)
def error_msg(self, str msg)
def speedLimitCallback(self, msg)
def clearingEndpointsCallback(self, msg)
def maskCallback(self, msg)
def voxelUnknownCallback(self, msg)
def checkSpeed(self, it, speed_limit)
def warn_msg(self, str msg)
def isInKeepout(self, x, y)
def checkKeepout(self, x, y)
def poseCallback(self, msg)