23 from typing
import Optional
25 from action_msgs.msg
import GoalStatus
27 from lifecycle_msgs.srv
import GetState
28 from nav2_msgs.action
import NavigateToPose
30 from nav2_msgs.srv
import ManageLifecycleNodes
33 from rclpy.action
import ActionClient
34 from rclpy.client
import Client
35 from rclpy.node
import Node
36 from rclpy.qos
import QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy
37 from sensor_msgs.msg
import PointCloud2
47 def __init__(self, filter_mask: OccupancyGrid):
48 self.filter_mask: OccupancyGrid = filter_mask
53 def worldToMap(self, wx: float, wy: float) -> tuple[int, int]:
54 origin_x = self.filter_mask.info.origin.position.x
55 origin_y = self.filter_mask.info.origin.position.y
56 size_x = self.filter_mask.info.width
57 size_y = self.filter_mask.info.height
58 resolution = self.filter_mask.info.resolution
60 if wx < origin_x
or wy < origin_y:
63 mx = int((wx - origin_x) / resolution)
64 my = int((wy - origin_y) / resolution)
66 if mx < size_x
and my < size_y:
72 def getValue(self, mx: int, my: int):
73 size_x = self.filter_mask.info.width
74 return self.filter_mask.data[mx + my * size_x]
85 super().__init__(node_name=
'nav2_tester', namespace=namespace)
87 PoseWithCovarianceStamped,
'initialpose', 10
89 self.
goal_pubgoal_pub = self.create_publisher(PoseStamped,
'goal_pose', 10)
91 transient_local_qos = QoSProfile(
92 durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
93 reliability=QoSReliabilityPolicy.RELIABLE,
94 history=QoSHistoryPolicy.KEEP_LAST,
98 volatile_qos = QoSProfile(
99 durability=QoSDurabilityPolicy.VOLATILE,
100 reliability=QoSReliabilityPolicy.RELIABLE,
101 history=QoSHistoryPolicy.KEEP_LAST,
106 PoseWithCovarianceStamped,
113 'local_costmap/clearing_endpoints',
124 if self.
test_typetest_type == TestType.KEEPOUT:
125 self.
plan_subplan_sub = self.create_subscription(
126 Path,
'plan', self.
planCallbackplanCallback, volatile_qos
137 elif self.
test_typetest_type == TestType.SPEED:
140 self.
limitslimits = [50.0, 0.0]
143 self.
plan_subplan_sub = self.create_subscription(
148 self.
mask_submask_sub = self.create_subscription(
149 OccupancyGrid,
'filter_mask', self.
maskCallbackmaskCallback, transient_local_qos
155 self.action_client: ActionClient[
156 NavigateToPose.Goal, NavigateToPose.Result, NavigateToPose.Feedback] = \
157 ActionClient(self, NavigateToPose,
'navigate_to_pose')
159 def info_msg(self, msg: str) ->
None:
160 self.get_logger().info(
'\033[1;37;44m' + msg +
'\033[0m')
162 def warn_msg(self, msg: str) ->
None:
163 self.get_logger().warning(
'\033[1;37;43m' + msg +
'\033[0m')
165 def error_msg(self, msg: str) ->
None:
166 self.get_logger().error(
'\033[1;37;41m' + msg +
'\033[0m')
168 def setInitialPose(self) -> None:
169 msg = PoseWithCovarianceStamped()
171 msg.header.frame_id =
'map'
176 def getStampedPoseMsg(self, pose: Pose) -> PoseStamped:
178 msg.header.frame_id =
'map'
182 def publishGoalPose(self, goal_pose: Optional[Pose] =
None) ->
None:
183 self.
goal_posegoal_pose = goal_pose
if goal_pose
is not None else self.
goal_posegoal_pose
186 def runNavigateAction(self, goal_pose: Optional[Pose] =
None) -> bool:
189 while not self.action_client.wait_for_server(timeout_sec=1.0):
190 self.
info_msginfo_msginfo_msg(
"'NavigateToPose' action server not available, waiting...")
192 self.
goal_posegoal_pose = goal_pose
if goal_pose
is not None else self.
goal_posegoal_pose
193 goal_msg = NavigateToPose.Goal()
197 send_goal_future = self.action_client.send_goal_async(goal_msg)
199 rclpy.spin_until_future_complete(self, send_goal_future)
200 goal_handle = send_goal_future.result()
202 if not goal_handle
or not goal_handle.accepted:
207 get_result_future = goal_handle.get_result_async()
209 self.
info_msginfo_msginfo_msg(
"Waiting for 'NavigateToPose' action to complete")
210 rclpy.spin_until_future_complete(self, get_result_future)
211 status = get_result_future.result().status
212 if status != GoalStatus.STATUS_SUCCEEDED:
219 def isInKeepout(self, x: float, y: float) -> bool:
220 mx, my = self.
filter_maskfilter_mask.worldToMap(x, y)
221 if mx == -1
and my == -1:
223 if self.
filter_maskfilter_mask.getValue(mx, my) == 100:
228 def checkKeepout(self, x: float, y: float) -> bool:
244 def checkSpeed(self, it: int, speed_limit: float) ->
None:
245 if it >= len(self.
limitslimits):
249 if speed_limit == self.
limitslimits[it]:
253 'Incorrect speed limit received: '
255 +
', but should be: '
256 + str(self.
limitslimits[it])
259 def poseCallback(self, msg: PoseWithCovarianceStamped) ->
None:
263 if self.
test_typetest_type == TestType.KEEPOUT:
265 msg.pose.pose.position.x, msg.pose.pose.position.y
269 def planCallback(self, msg: Path) ->
None:
271 for pose
in msg.poses:
272 if not self.
checkKeepoutcheckKeepout(pose.pose.position.x, pose.pose.position.y):
276 def clearingEndpointsCallback(self, msg: PointCloud2) ->
None:
277 if len(msg.data) > 0:
280 def voxelMarkedCallback(self, msg: PointCloud2) ->
None:
281 if len(msg.data) > 0:
284 def voxelUnknownCallback(self, msg: PointCloud2) ->
None:
285 if len(msg.data) > 0:
288 def dwbCostCloudCallback(self, msg: PointCloud2) ->
None:
290 if len(msg.data) > 0:
293 def speedLimitCallback(self, msg: SpeedLimit) ->
None:
298 def maskCallback(self, msg: OccupancyGrid) ->
None:
303 def wait_for_filter_mask(self, timeout: float) -> bool:
304 start_time = time.time()
308 rclpy.spin_once(self, timeout_sec=1)
309 if (time.time() - start_time) > timeout:
314 def wait_for_pointcloud_subscribers(self, timeout: float) -> bool:
315 start_time = time.time()
322 'Waiting for voxel_marked_cloud/voxel_unknown_cloud/\
323 clearing_endpoints msg to be received ...'
325 rclpy.spin_once(self, timeout_sec=1)
326 if (time.time() - start_time) > timeout:
328 'Time out to waiting for voxel_marked_cloud/voxel_unknown_cloud/\
329 clearing_endpoints msgs'
334 def reachesGoal(self, timeout: float, distance: float) -> bool:
336 start_time = time.time()
338 while not goalReached:
339 rclpy.spin_once(self, timeout_sec=1)
344 elif timeout
is not None:
345 if (time.time() - start_time) > timeout:
352 def distanceFromGoal(self) -> float:
355 distance = math.sqrt(d_x * d_x + d_y * d_y)
359 def wait_for_node_active(self, node_name: str) ->
None:
362 node_service = f
'{node_name}/get_state'
363 state_client: Client[GetState.Request, GetState.Response] \
364 = self.create_client(GetState, node_service)
365 while not state_client.wait_for_service(timeout_sec=1.0):
366 self.
info_msginfo_msginfo_msg(f
'{node_service} service not available, waiting...')
367 req = GetState.Request()
369 while state !=
'active':
371 future = state_client.call_async(req)
372 rclpy.spin_until_future_complete(self, future)
373 if future.result()
is not None:
374 state = future.result().current_state.label
378 f
'Exception while calling service: {future.exception()!r}'
382 def shutdown(self) -> None:
384 self.action_client.destroy()
386 transition_service =
'lifecycle_manager_navigation/manage_nodes'
387 mgr_client: Client[ManageLifecycleNodes.Request, ManageLifecycleNodes.Response] \
388 = self.create_client(ManageLifecycleNodes, transition_service)
389 while not mgr_client.wait_for_service(timeout_sec=1.0):
390 self.
info_msginfo_msginfo_msg(f
'{transition_service} service not available, waiting...')
392 req = ManageLifecycleNodes.Request()
393 req.command = ManageLifecycleNodes.Request().SHUTDOWN
394 future = mgr_client.call_async(req)
396 self.
info_msginfo_msginfo_msg(
'Shutting down navigation lifecycle manager...')
397 rclpy.spin_until_future_complete(self, future)
399 self.
info_msginfo_msginfo_msg(
'Shutting down navigation lifecycle manager complete.')
400 except Exception
as e:
402 transition_service =
'lifecycle_manager_localization/manage_nodes'
403 mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
404 while not mgr_client.wait_for_service(timeout_sec=1.0):
405 self.
info_msginfo_msginfo_msg(f
'{transition_service} service not available, waiting...')
407 req = ManageLifecycleNodes.Request()
408 req.command = ManageLifecycleNodes.Request().SHUTDOWN
409 future = mgr_client.call_async(req)
411 self.
info_msginfo_msginfo_msg(
'Shutting down localization lifecycle manager...')
412 rclpy.spin_until_future_complete(self, future)
414 self.
info_msginfo_msginfo_msg(
'Shutting down localization lifecycle manager complete')
415 except Exception
as e:
418 def wait_for_initial_pose(self) -> None:
424 rclpy.spin_once(self, timeout_sec=1)
427 def test_RobotMovesToGoal(robot_tester: NavTester) -> bool:
428 robot_tester.info_msg(
'Setting goal pose')
429 robot_tester.publishGoalPose()
430 robot_tester.info_msg(
'Waiting 60 seconds for robot to reach goal')
431 return robot_tester.reachesGoal(timeout=60, distance=0.5)
439 def test_SpeedLimitsAllCorrect(robot_tester: NavTester) -> bool:
440 if not robot_tester.filter_test_result:
442 for passed
in robot_tester.limit_passed:
444 robot_tester.error_msg(
'Did not meet one of the speed limit')
449 def run_all_tests(robot_tester: NavTester) -> bool:
453 robot_tester.wait_for_node_active(
'amcl')
454 robot_tester.wait_for_initial_pose()
455 robot_tester.wait_for_node_active(
'bt_navigator')
456 result = robot_tester.wait_for_filter_mask(10)
458 result = robot_tester.runNavigateAction()
460 if robot_tester.test_type == TestType.KEEPOUT:
461 result = result
and robot_tester.wait_for_pointcloud_subscribers(10)
464 result = test_RobotMovesToGoal(robot_tester)
467 if robot_tester.test_type == TestType.KEEPOUT:
468 result = robot_tester.filter_test_result
469 result = result
and robot_tester.cost_cloud_received
470 elif robot_tester.test_type == TestType.SPEED:
471 result = test_SpeedLimitsAllCorrect(robot_tester)
476 robot_tester.info_msg(
'Test PASSED')
478 robot_tester.error_msg(
'Test FAILED')
483 def fwd_pose(x: float = 0.0, y: float = 0.0, z: float = 0.01) -> Pose:
484 initial_pose = Pose()
485 initial_pose.position.x = x
486 initial_pose.position.y = y
487 initial_pose.position.z = z
488 initial_pose.orientation.x = 0.0
489 initial_pose.orientation.y = 0.0
490 initial_pose.orientation.z = 0.0
491 initial_pose.orientation.w = 1.0
495 def get_tester(args: argparse.Namespace) -> NavTester:
499 init_x, init_y, final_x, final_y = args.robot[0]
500 test_type = TestType.KEEPOUT
501 if type_str ==
'speed':
502 test_type = TestType.SPEED
506 initial_pose=fwd_pose(float(init_x), float(init_y)),
507 goal_pose=fwd_pose(float(final_x), float(final_y)),
510 'Starting tester, robot going from '
523 def main(argv: list[str] = sys.argv[1:]):
525 parser = argparse.ArgumentParser(
526 description=
'System-level costmap filters tester node'
534 help=
'Type of costmap filter being tested.',
536 group = parser.add_mutually_exclusive_group(required=
True)
542 metavar=(
'init_x',
'init_y',
'final_x',
'final_y'),
543 help=
'The robot starting and final positions.',
546 args, unknown = parser.parse_known_args()
551 tester = get_tester(args)
556 passed = run_all_tests(tester)
560 tester.info_msg(
'Done Shutting Down.')
563 tester.info_msg(
'Exiting failed')
566 tester.info_msg(
'Exiting passed')
570 if __name__ ==
'__main__':
PoseStamped getStampedPoseMsg(self, Pose pose)
clearing_endpoints_received
None voxelUnknownCallback(self, PointCloud2 msg)
None checkSpeed(self, int it, float speed_limit)
None setInitialPose(self)
bool isInKeepout(self, float x, float y)
None clearingEndpointsCallback(self, PointCloud2 msg)
None poseCallback(self, PoseWithCovarianceStamped msg)
float distanceFromGoal(self)
None warn_msg(self, str msg)
None maskCallback(self, OccupancyGrid msg)
None dwbCostCloudCallback(self, PointCloud2 msg)
None voxelMarkedCallback(self, PointCloud2 msg)
None speedLimitCallback(self, SpeedLimit msg)
None planCallback(self, Path msg)
None error_msg(self, str msg)
bool checkKeepout(self, float x, float y)
None info_msg(self, str msg)