20 from typing
import Optional
22 from action_msgs.msg
import GoalStatus
24 from lifecycle_msgs.srv
import GetState
25 from nav2_msgs.action
import NavigateThroughPoses
26 from nav2_msgs.srv
import ManageLifecycleNodes
27 from rcl_interfaces.srv
import SetParameters
29 from rclpy.action.client
import ActionClient
30 from rclpy.node
import Node
31 from rclpy.parameter
import Parameter
32 from rclpy.qos
import QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy
38 def __init__(self, initial_pose: Pose, goal_pose: Pose, namespace: str =
''):
39 super().__init__(node_name=
'nav2_tester', namespace=namespace)
41 PoseWithCovarianceStamped,
'initialpose', 10
44 checker_qos = QoSProfile(
45 durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
46 reliability=QoSReliabilityPolicy.RELIABLE,
47 history=QoSHistoryPolicy.KEEP_LAST,
52 String,
'goal_checker_selector', checker_qos)
55 String,
'progress_checker_selector', checker_qos)
57 pose_qos = QoSProfile(
58 durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
59 reliability=QoSReliabilityPolicy.RELIABLE,
60 history=QoSHistoryPolicy.KEEP_LAST,
65 PoseWithCovarianceStamped,
'amcl_pose', self.
poseCallbackposeCallback, pose_qos
71 self, NavigateThroughPoses,
'navigate_through_poses'
75 SetParameters,
'/controller_server/set_parameters'
78 def info_msg(self, msg: str) ->
None:
79 self.get_logger().info(
'\033[1;37;44m' + msg +
'\033[0m')
81 def warn_msg(self, msg: str) ->
None:
82 self.get_logger().warning(
'\033[1;37;43m' + msg +
'\033[0m')
84 def error_msg(self, msg: str) ->
None:
85 self.get_logger().error(
'\033[1;37;41m' + msg +
'\033[0m')
87 def setInitialPose(self) -> None:
88 msg = PoseWithCovarianceStamped()
90 msg.header.frame_id =
'map'
91 self.
info_msginfo_msg(
'Publishing Initial Pose')
95 def setGoalChecker(self, name: str) ->
None:
100 def setProgressChecker(self, name: str) ->
None:
105 def setControllerParam(self, name: str, parameter_type: Parameter.Type, value: float) ->
None:
106 req = SetParameters.Request()
108 Parameter(name, parameter_type, value).to_parameter_msg()
111 rclpy.spin_until_future_complete(self, future)
113 def getStampedPoseMsg(self, pose: Pose) -> PoseStamped:
115 msg.header.frame_id =
'map'
119 def runNavigateAction(self,
120 goal_pose: Optional[Pose] =
None,
121 behavior_tree: Optional[str] =
None,
122 expected_error_code: Optional[int] =
None,
123 expected_error_msg: Optional[str] =
None) -> bool:
125 self.
info_msginfo_msg(
"Waiting for 'NavigateThroughPoses' action server")
126 while not self.
action_clientaction_client.wait_for_server(timeout_sec=1.0):
128 "'NavigateThroughPoses' action server not available, waiting..."
131 self.
goal_posegoal_pose = goal_pose
if goal_pose
is not None else self.
goal_posegoal_pose
132 goal_msg = NavigateThroughPoses.Goal()
133 goal_msg.poses.header.frame_id =
'map'
134 goal_msg.poses.header.stamp = self.get_clock().now().to_msg()
135 goal_msg.poses.goals = [
139 goal_msg.behavior_tree = behavior_tree
141 self.
info_msginfo_msg(
'Sending goal request...')
142 send_goal_future = self.
action_clientaction_client.send_goal_async(goal_msg)
144 rclpy.spin_until_future_complete(self, send_goal_future)
145 goal_handle = send_goal_future.result()
147 if goal_handle
is None or not goal_handle.accepted:
151 self.
info_msginfo_msg(
'Goal accepted')
152 get_result_future = goal_handle.get_result_async()
154 self.
info_msginfo_msg(
"Waiting for 'NavigateToPose' action to complete")
155 rclpy.spin_until_future_complete(self, get_result_future)
156 status = get_result_future.result().status
157 if status != GoalStatus.STATUS_SUCCEEDED:
158 result = get_result_future.result().result
159 if (result.error_code == expected_error_code
and
160 result.error_msg == expected_error_msg):
161 self.
info_msginfo_msg(f
'Goal failed as expected with status code: {status}'
162 f
' error code:{result.error_code}'
163 f
' error msg:{result.error_msg}')
166 self.
error_msgerror_msg(f
'Goal failed unexpectedly with status code: {status}'
167 f
' Expected error_code:{expected_error_code},'
168 f
' Got error_code:{result.error_code},'
169 f
' Expected error_msg:{expected_error_msg},'
170 f
' Got error_msg:{result.error_msg}')
173 self.
info_msginfo_msg(
'Goal succeeded!')
176 def poseCallback(self, msg: PoseWithCovarianceStamped) ->
None:
177 self.
info_msginfo_msg(
'Received amcl_pose')
181 def wait_for_node_active(self, node_name: str) ->
None:
183 self.
info_msginfo_msg(f
'Waiting for {node_name} to become active')
184 node_service = f
'{node_name}/get_state'
185 state_client = self.create_client(GetState, node_service)
186 while not state_client.wait_for_service(timeout_sec=1.0):
187 self.
info_msginfo_msg(f
'{node_service} service not available, waiting...')
188 req = GetState.Request()
190 while state !=
'active':
191 self.
info_msginfo_msg(f
'Getting {node_name} state...')
192 future = state_client.call_async(req)
193 rclpy.spin_until_future_complete(self, future)
194 if future.result()
is not None:
195 state = future.result().current_state.label
196 self.
info_msginfo_msg(f
'Result of get_state: {state}')
199 f
'Exception while calling service: {future.exception()!r}'
203 def shutdown(self) -> None:
204 self.
info_msginfo_msg(
'Shutting down')
207 transition_service =
'lifecycle_manager_navigation/manage_nodes'
208 mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
209 while not mgr_client.wait_for_service(timeout_sec=1.0):
210 self.
info_msginfo_msg(f
'{transition_service} service not available, waiting...')
212 req = ManageLifecycleNodes.Request()
213 req.command = ManageLifecycleNodes.Request.SHUTDOWN
214 future = mgr_client.call_async(req)
216 self.
info_msginfo_msg(
'Shutting down navigation lifecycle manager...')
217 rclpy.spin_until_future_complete(self, future)
219 self.
info_msginfo_msg(
'Shutting down navigation lifecycle manager complete.')
220 except Exception
as e:
221 self.
error_msgerror_msg(f
'Service call failed {e!r}')
222 transition_service =
'lifecycle_manager_localization/manage_nodes'
223 mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
224 while not mgr_client.wait_for_service(timeout_sec=1.0):
225 self.
info_msginfo_msg(f
'{transition_service} service not available, waiting...')
227 req = ManageLifecycleNodes.Request()
228 req.command = ManageLifecycleNodes.Request.SHUTDOWN
229 future = mgr_client.call_async(req)
231 self.
info_msginfo_msg(
'Shutting down localization lifecycle manager...')
232 rclpy.spin_until_future_complete(self, future)
234 self.
info_msginfo_msg(
'Shutting down localization lifecycle manager complete')
235 except Exception
as e:
236 self.
error_msgerror_msg(f
'Service call failed {e!r}')
238 def wait_for_initial_pose(self) -> None:
241 self.
info_msginfo_msg(
'Setting initial pose')
243 self.
info_msginfo_msg(
'Waiting for amcl_pose to be received')
244 rclpy.spin_once(self, timeout_sec=1)
247 def run_all_tests(robot_tester: NavTester) -> bool:
248 pose_out_of_bounds = Pose()
249 pose_out_of_bounds.position.x = 2000.0
250 pose_out_of_bounds.position.y = 4000.0
251 pose_out_of_bounds.position.z = 0.0
252 pose_out_of_bounds.orientation.x = 0.0
253 pose_out_of_bounds.orientation.y = 0.0
254 pose_out_of_bounds.orientation.z = 0.0
255 pose_out_of_bounds.orientation.w = 1.0
257 reasonable_pose = Pose()
258 reasonable_pose.position.x = -1.0
259 reasonable_pose.position.y = 0.0
260 reasonable_pose.position.z = 0.0
261 reasonable_pose.orientation.x = 0.0
262 reasonable_pose.orientation.y = 0.0
263 reasonable_pose.orientation.z = 0.0
264 reasonable_pose.orientation.w = 1.0
266 robot_tester.wait_for_node_active(
'amcl')
267 robot_tester.wait_for_initial_pose()
268 robot_tester.wait_for_node_active(
'bt_navigator')
269 robot_tester.setGoalChecker(
'general_goal_checker')
270 robot_tester.setProgressChecker(
'progress_checker')
274 robot_tester.info_msg(
'Test non existing behavior_tree xml file')
275 result = robot_tester.runNavigateAction(
276 goal_pose=pose_out_of_bounds,
277 behavior_tree=
'behavior_tree_that_does_not_exist.xml',
278 expected_error_code=NavigateThroughPoses.Result.FAILED_TO_LOAD_BEHAVIOR_TREE,
279 expected_error_msg=(
'Error loading XML file: behavior_tree_that_does_not_exist.xml. '
280 'Navigation canceled.'))
283 robot_tester.info_msg(
'Test goal out of bounds')
284 result = robot_tester.runNavigateAction(
285 goal_pose=pose_out_of_bounds,
287 expected_error_code=304,
288 expected_error_msg=(
'GridBasedplugin failed to plan from '
289 '(-2.00, -0.50) to (2000.00, 4000.00): '
290 '"Goal Coordinates of(2000.000000, 4000.000000) '
291 'was outside bounds"'))
294 robot_tester.info_msg(
'Test for unknown goal checker')
295 robot_tester.setGoalChecker(
'junk_goal_checker')
296 result = robot_tester.runNavigateAction(
297 goal_pose=reasonable_pose,
299 expected_error_code=100,
300 expected_error_msg=(
'Failed to find goal checker name: junk_goal_checker'))
301 robot_tester.setGoalChecker(
'general_goal_checker')
304 robot_tester.info_msg(
'Test for unknown progress checker')
305 robot_tester.setProgressChecker(
'junk_progress_checker')
306 result = robot_tester.runNavigateAction(
307 goal_pose=reasonable_pose,
309 expected_error_code=100,
310 expected_error_msg=(
'Failed to find progress checker name: junk_progress_checker'))
311 robot_tester.setProgressChecker(
'progress_checker')
314 robot_tester.info_msg(
'Test for impossible to achieve progress parameters')
315 robot_tester.setControllerParam(
316 'progress_checker.movement_time_allowance',
317 Parameter.Type.DOUBLE,
319 robot_tester.setControllerParam(
320 'progress_checker.required_movement_radius',
321 Parameter.Type.DOUBLE,
325 robot_tester.setControllerParam(
326 'FollowPath.max_vel_x',
327 Parameter.Type.DOUBLE,
329 result = robot_tester.runNavigateAction(
330 goal_pose=reasonable_pose,
332 expected_error_code=105,
333 expected_error_msg=(
'Failed to make progress'))
337 robot_tester.info_msg(
'Test PASSED')
339 robot_tester.error_msg(
'Test FAILED')
344 def fwd_pose(x: float = 0.0, y: float = 0.0, z: float = 0.01) -> Pose:
345 initial_pose = Pose()
346 initial_pose.position.x = x
347 initial_pose.position.y = y
348 initial_pose.position.z = z
349 initial_pose.orientation.x = 0.0
350 initial_pose.orientation.y = 0.0
351 initial_pose.orientation.z = 0.0
352 initial_pose.orientation.w = 1.0
356 def get_testers(args: argparse.Namespace) -> list[NavTester]:
359 init_x, init_y, final_x, final_y = args.robot[0]
361 initial_pose=fwd_pose(float(init_x), float(init_y)),
362 goal_pose=fwd_pose(float(final_x), float(final_y)),
365 'Starting tester, robot going from '
375 testers.append(tester)
379 def main(argv: list[str] = sys.argv[1:]):
381 parser = argparse.ArgumentParser(description=
'System-level navigation tester node')
382 group = parser.add_mutually_exclusive_group(required=
True)
388 metavar=(
'init_x',
'init_y',
'final_x',
'final_y'),
389 help=
'The robot starting and final positions.',
392 args, _ = parser.parse_known_args()
397 testers = get_testers(args)
403 for tester
in testers:
404 passed = run_all_tests(tester)
408 for tester
in testers:
412 testers[0].info_msg(
'Done Shutting Down.')
415 testers[0].info_msg(
'Exiting failed')
418 testers[0].info_msg(
'Exiting passed')
422 if __name__ ==
'__main__':
None setInitialPose(self)
goal_checker_selector_pub
progress_checker_selector_pub
PoseStamped getStampedPoseMsg(self, Pose pose)
None error_msg(self, str msg)
None poseCallback(self, PoseWithCovarianceStamped msg)
None info_msg(self, str msg)