23 from typing
import Optional
25 from action_msgs.msg
import GoalStatus
26 from geometry_msgs.msg
import Pose
27 from geometry_msgs.msg
import PoseStamped
28 from geometry_msgs.msg
import PoseWithCovarianceStamped
29 from lifecycle_msgs.srv
import GetState
30 from nav2_msgs.action
import NavigateToPose
31 from nav2_msgs.msg
import SpeedLimit
32 from nav2_msgs.srv
import ManageLifecycleNodes
33 from nav_msgs.msg
import OccupancyGrid
34 from nav_msgs.msg
import Path
37 from rclpy.action
import ActionClient
38 from rclpy.node
import Node
39 from rclpy.qos
import QoSDurabilityPolicy, QoSHistoryPolicy, QoSReliabilityPolicy
40 from rclpy.qos
import QoSProfile
41 from sensor_msgs.msg
import PointCloud2
53 filter_mask: OccupancyGrid
60 def worldToMap(self, wx: float, wy: float):
61 origin_x = self.
filter_maskfilter_mask.info.origin.position.x
62 origin_y = self.
filter_maskfilter_mask.info.origin.position.y
65 resolution = self.
filter_maskfilter_mask.info.resolution
67 if wx < origin_x
or wy < origin_y:
70 mx = int((wx - origin_x) / resolution)
71 my = int((wy - origin_y) / resolution)
73 if mx < size_x
and my < size_y:
79 def getValue(self, mx, my):
81 return self.
filter_maskfilter_mask.data[mx + my * size_x]
93 super().__init__(node_name=
'nav2_tester', namespace=namespace)
94 self.
initial_pose_pubinitial_pose_pub = self.create_publisher(PoseWithCovarianceStamped,
96 self.
goal_pubgoal_pub = self.create_publisher(PoseStamped,
'goal_pose', 10)
98 transient_local_qos = QoSProfile(
99 durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
100 reliability=QoSReliabilityPolicy.RELIABLE,
101 history=QoSHistoryPolicy.KEEP_LAST,
104 volatile_qos = QoSProfile(
105 durability=QoSDurabilityPolicy.RMW_QOS_POLICY_DURABILITY_VOLATILE,
106 reliability=QoSReliabilityPolicy.RELIABLE,
107 history=QoSHistoryPolicy.RMW_QOS_POLICY_HISTORY_KEEP_LAST,
110 self.
model_pose_submodel_pose_sub = self.create_subscription(PoseWithCovarianceStamped,
113 self.
clearing_ep_subclearing_ep_sub = self.create_subscription(PointCloud2,
114 'local_costmap/clearing_endpoints',
124 if self.
test_typetest_type == TestType.KEEPOUT:
125 self.
plan_subplan_sub = self.create_subscription(Path,
'plan',
127 self.
voxel_marked_subvoxel_marked_sub = self.create_subscription(PointCloud2,
128 'voxel_marked_cloud',
132 'voxel_unknown_cloud',
135 self.
cost_cloud_subcost_cloud_sub = self.create_subscription(PointCloud2,
139 elif self.
test_typetest_type == TestType.SPEED:
142 self.
limitslimits = [50.0, 0.0]
145 self.
plan_subplan_sub = self.create_subscription(SpeedLimit,
'speed_limit',
149 self.
mask_submask_sub = self.create_subscription(OccupancyGrid,
'filter_mask',
156 self, NavigateToPose,
'navigate_to_pose')
158 def info_msg(self, msg: str):
159 self.get_logger().info(
'\033[1;37;44m' + msg +
'\033[0m')
161 def warn_msg(self, msg: str):
162 self.get_logger().warn(
'\033[1;37;43m' + msg +
'\033[0m')
164 def error_msg(self, msg: str):
165 self.get_logger().error(
'\033[1;37;41m' + msg +
'\033[0m')
167 def setInitialPose(self):
168 msg = PoseWithCovarianceStamped()
170 msg.header.frame_id =
'map'
175 def getStampedPoseMsg(self, pose: Pose):
177 msg.header.frame_id =
'map'
181 def publishGoalPose(self, goal_pose: Optional[Pose] =
None):
182 self.
goal_posegoal_pose = goal_pose
if goal_pose
is not None else self.
goal_posegoal_pose
185 def runNavigateAction(self, goal_pose: Optional[Pose] =
None):
188 while not self.
action_clientaction_client.wait_for_server(timeout_sec=1.0):
190 "'NavigateToPose' action server not available, waiting...")
192 self.
goal_posegoal_pose = goal_pose
if goal_pose
is not None else self.
goal_posegoal_pose
193 goal_msg = NavigateToPose.Goal()
197 send_goal_future = self.
action_clientaction_client.send_goal_async(goal_msg)
199 rclpy.spin_until_future_complete(self, send_goal_future)
200 goal_handle = send_goal_future.result()
202 if not goal_handle.accepted:
207 get_result_future = goal_handle.get_result_async()
209 self.
info_msginfo_msginfo_msg(
"Waiting for 'NavigateToPose' action to complete")
210 rclpy.spin_until_future_complete(self, get_result_future)
211 status = get_result_future.result().status
212 if status != GoalStatus.STATUS_SUCCEEDED:
219 def isInKeepout(self, x, y):
220 mx, my = self.
filter_maskfilter_mask.worldToMap(x, y)
221 if mx == -1
and my == -1:
223 if self.
filter_maskfilter_mask.getValue(mx, my) == 100:
228 def checkKeepout(self, x, y):
244 def checkSpeed(self, it, speed_limit):
245 if it >= len(self.
limitslimits):
249 if speed_limit == self.
limitslimits[it]:
252 self.
error_msgerror_msgerror_msg(
'Incorrect speed limit received: ' + str(speed_limit) +
253 ', but should be: ' + str(self.
limitslimits[it]))
255 def poseCallback(self, msg):
259 if self.
test_typetest_type == TestType.KEEPOUT:
260 if not self.
checkKeepoutcheckKeepout(msg.pose.pose.position.x, msg.pose.pose.position.y):
263 def planCallback(self, msg):
265 for pose
in msg.poses:
266 if not self.
checkKeepoutcheckKeepout(pose.pose.position.x, pose.pose.position.y):
270 def clearingEndpointsCallback(self, msg):
271 if len(msg.data) > 0:
274 def voxelMarkedCallback(self, msg):
275 if len(msg.data) > 0:
278 def voxelUnknownCallback(self, msg):
279 if len(msg.data) > 0:
282 def dwbCostCloudCallback(self, msg):
284 if len(msg.data) > 0:
287 def speedLimitCallback(self, msg):
292 def maskCallback(self, msg):
297 def wait_for_filter_mask(self, timeout):
298 start_time = time.time()
302 rclpy.spin_once(self, timeout_sec=1)
303 if (time.time() - start_time) > timeout:
308 def wait_for_pointcloud_subscribers(self, timeout):
309 start_time = time.time()
313 'Waiting for voxel_marked_cloud/voxel_unknown_cloud/\
314 clearing_endpoints msg to be received ...')
315 rclpy.spin_once(self, timeout_sec=1)
316 if (time.time() - start_time) > timeout:
318 'Time out to waiting for voxel_marked_cloud/voxel_unknown_cloud/\
319 clearing_endpoints msgs')
323 def reachesGoal(self, timeout, distance):
325 start_time = time.time()
327 while not goalReached:
328 rclpy.spin_once(self, timeout_sec=1)
333 elif timeout
is not None:
334 if (time.time() - start_time) > timeout:
338 def distanceFromGoal(self):
341 distance = math.sqrt(d_x * d_x + d_y * d_y)
345 def wait_for_node_active(self, node_name: str):
348 node_service = f
'{node_name}/get_state'
349 state_client = self.create_client(GetState, node_service)
350 while not state_client.wait_for_service(timeout_sec=1.0):
351 self.
info_msginfo_msginfo_msg(f
'{node_service} service not available, waiting...')
352 req = GetState.Request()
354 while (state !=
'active'):
356 future = state_client.call_async(req)
357 rclpy.spin_until_future_complete(self, future)
358 if future.result()
is not None:
359 state = future.result().current_state.label
370 transition_service =
'lifecycle_manager_navigation/manage_nodes'
371 mgr_client = self.create_client(
372 ManageLifecycleNodes, transition_service)
373 while not mgr_client.wait_for_service(timeout_sec=1.0):
374 self.
info_msginfo_msginfo_msg(f
'{transition_service} service not available, waiting...')
376 req = ManageLifecycleNodes.Request()
377 req.command = ManageLifecycleNodes.Request().SHUTDOWN
378 future = mgr_client.call_async(req)
380 self.
info_msginfo_msginfo_msg(
'Shutting down navigation lifecycle manager...')
381 rclpy.spin_until_future_complete(self, future)
384 'Shutting down navigation lifecycle manager complete.')
385 except Exception
as e:
387 transition_service =
'lifecycle_manager_localization/manage_nodes'
388 mgr_client = self.create_client(
389 ManageLifecycleNodes, transition_service)
390 while not mgr_client.wait_for_service(timeout_sec=1.0):
391 self.
info_msginfo_msginfo_msg(f
'{transition_service} service not available, waiting...')
393 req = ManageLifecycleNodes.Request()
394 req.command = ManageLifecycleNodes.Request().SHUTDOWN
395 future = mgr_client.call_async(req)
397 self.
info_msginfo_msginfo_msg(
'Shutting down localization lifecycle manager...')
398 rclpy.spin_until_future_complete(self, future)
401 'Shutting down localization lifecycle manager complete')
402 except Exception
as e:
405 def wait_for_initial_pose(self):
411 rclpy.spin_once(self, timeout_sec=1)
414 def test_RobotMovesToGoal(robot_tester):
415 robot_tester.info_msg(
'Setting goal pose')
416 robot_tester.publishGoalPose()
417 robot_tester.info_msg(
'Waiting 60 seconds for robot to reach goal')
418 return robot_tester.reachesGoal(timeout=60, distance=0.5)
426 def test_SpeedLimitsAllCorrect(robot_tester):
427 if not robot_tester.filter_test_result:
429 for passed
in robot_tester.limit_passed:
431 robot_tester.error_msg(
'Did not meet one of the speed limit')
436 def run_all_tests(robot_tester):
440 robot_tester.wait_for_node_active(
'amcl')
441 robot_tester.wait_for_initial_pose()
442 robot_tester.wait_for_node_active(
'bt_navigator')
443 result = robot_tester.wait_for_filter_mask(10)
445 result = robot_tester.runNavigateAction()
447 if robot_tester.test_type == TestType.KEEPOUT:
448 result = result
and robot_tester.wait_for_pointcloud_subscribers(10)
451 result = test_RobotMovesToGoal(robot_tester)
454 if robot_tester.test_type == TestType.KEEPOUT:
455 result = robot_tester.filter_test_result
456 result = result
and robot_tester.cost_cloud_received
457 elif robot_tester.test_type == TestType.SPEED:
458 result = test_SpeedLimitsAllCorrect(robot_tester)
463 robot_tester.info_msg(
'Test PASSED')
465 robot_tester.error_msg(
'Test FAILED')
470 def fwd_pose(x=0.0, y=0.0, z=0.01):
471 initial_pose = Pose()
472 initial_pose.position.x = x
473 initial_pose.position.y = y
474 initial_pose.position.z = z
475 initial_pose.orientation.x = 0.0
476 initial_pose.orientation.y = 0.0
477 initial_pose.orientation.z = 0.0
478 initial_pose.orientation.w = 1.0
482 def get_tester(args):
486 init_x, init_y, final_x, final_y = args.robot[0]
487 test_type = TestType.KEEPOUT
488 if type_str ==
'speed':
489 test_type = TestType.SPEED
493 initial_pose=fwd_pose(float(init_x), float(init_y)),
494 goal_pose=fwd_pose(float(final_x), float(final_y)))
496 'Starting tester, robot going from ' + init_x +
', ' + init_y +
497 ' to ' + final_x +
', ' + final_y +
'.')
501 def main(argv=sys.argv[1:]):
503 parser = argparse.ArgumentParser(
504 description=
'System-level costmap filters tester node')
505 parser.add_argument(
'-t',
'--type', type=str, action=
'store', dest=
'type',
506 help=
'Type of costmap filter being tested.')
507 group = parser.add_mutually_exclusive_group(required=
True)
508 group.add_argument(
'-r',
'--robot', action=
'append', nargs=4,
509 metavar=(
'init_x',
'init_y',
'final_x',
'final_y'),
510 help=
'The robot starting and final positions.')
512 args, unknown = parser.parse_known_args()
517 tester = get_tester(args)
522 passed = run_all_tests(tester)
526 tester.info_msg(
'Done Shutting Down.')
529 tester.info_msg(
'Exiting failed')
532 tester.info_msg(
'Exiting passed')
536 if __name__ ==
'__main__':
def dwbCostCloudCallback(self, msg)
clearing_endpoints_received
def info_msg(self, str msg)
def getStampedPoseMsg(self, Pose pose)
def planCallback(self, msg)
def voxelMarkedCallback(self, msg)
def distanceFromGoal(self)
def error_msg(self, str msg)
def speedLimitCallback(self, msg)
def clearingEndpointsCallback(self, msg)
def maskCallback(self, msg)
def voxelUnknownCallback(self, msg)
def checkSpeed(self, it, speed_limit)
def warn_msg(self, str msg)
def isInKeepout(self, x, y)
def checkKeepout(self, x, y)
def poseCallback(self, msg)