20 from typing
import Optional
22 from action_msgs.msg
import GoalStatus
24 from lifecycle_msgs.srv
import GetState
25 from nav2_msgs.action
import NavigateThroughPoses
26 from nav2_msgs.srv
import ManageLifecycleNodes
27 from rcl_interfaces.srv
import SetParameters
29 from rclpy.action.client
import ActionClient
30 from rclpy.client
import Client
31 from rclpy.node
import Node
32 from rclpy.parameter
import Parameter
33 from rclpy.qos
import QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy
39 def __init__(self, initial_pose: Pose, goal_pose: Pose, namespace: str =
''):
40 super().__init__(node_name=
'nav2_tester', namespace=namespace)
42 PoseWithCovarianceStamped,
'initialpose', 10
45 checker_qos = QoSProfile(
46 durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
47 reliability=QoSReliabilityPolicy.RELIABLE,
48 history=QoSHistoryPolicy.KEEP_LAST,
53 String,
'goal_checker_selector', checker_qos)
56 String,
'progress_checker_selector', checker_qos)
58 pose_qos = QoSProfile(
59 durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
60 reliability=QoSReliabilityPolicy.RELIABLE,
61 history=QoSHistoryPolicy.KEEP_LAST,
66 PoseWithCovarianceStamped,
'amcl_pose', self.
poseCallbackposeCallback, pose_qos
72 self.action_client: ActionClient[
73 NavigateThroughPoses.Goal,
74 NavigateThroughPoses.Result,
75 NavigateThroughPoses.Feedback
76 ] = ActionClient(self, NavigateThroughPoses,
'navigate_through_poses')
78 self.controller_param_cli: Client[SetParameters.Request, SetParameters.Response] = \
80 SetParameters,
'/controller_server/set_parameters'
83 def info_msg(self, msg: str) ->
None:
84 self.get_logger().info(
'\033[1;37;44m' + msg +
'\033[0m')
86 def warn_msg(self, msg: str) ->
None:
87 self.get_logger().warning(
'\033[1;37;43m' + msg +
'\033[0m')
89 def error_msg(self, msg: str) ->
None:
90 self.get_logger().error(
'\033[1;37;41m' + msg +
'\033[0m')
92 def setInitialPose(self) -> None:
93 msg = PoseWithCovarianceStamped()
95 msg.header.frame_id =
'map'
96 self.
info_msginfo_msg(
'Publishing Initial Pose')
100 def setGoalChecker(self, name: str) ->
None:
105 def setProgressChecker(self, name: str) ->
None:
110 def setControllerParam(self, name: str, parameter_type: Parameter.Type, value: float) ->
None:
111 req = SetParameters.Request()
113 Parameter(name, parameter_type, value).to_parameter_msg()
115 future = self.controller_param_cli.call_async(req)
116 rclpy.spin_until_future_complete(self, future)
118 def getStampedPoseMsg(self, pose: Pose) -> PoseStamped:
120 msg.header.frame_id =
'map'
124 def runNavigateAction(self,
125 goal_pose: Optional[Pose] =
None,
126 behavior_tree: Optional[str] =
None,
127 expected_error_code: Optional[int] =
None,
128 expected_error_msg: Optional[str] =
None) -> bool:
130 self.
info_msginfo_msg(
"Waiting for 'NavigateThroughPoses' action server")
131 while not self.action_client.wait_for_server(timeout_sec=1.0):
133 "'NavigateThroughPoses' action server not available, waiting..."
136 self.
goal_posegoal_pose = goal_pose
if goal_pose
is not None else self.
goal_posegoal_pose
137 goal_msg = NavigateThroughPoses.Goal()
138 goal_msg.poses.header.frame_id =
'map'
139 goal_msg.poses.header.stamp = self.get_clock().now().to_msg()
140 goal_msg.poses.goals = [
144 goal_msg.behavior_tree = behavior_tree
146 self.
info_msginfo_msg(
'Sending goal request...')
147 send_goal_future = self.action_client.send_goal_async(goal_msg)
149 rclpy.spin_until_future_complete(self, send_goal_future)
150 goal_handle = send_goal_future.result()
152 if goal_handle
is None or not goal_handle.accepted:
156 self.
info_msginfo_msg(
'Goal accepted')
157 get_result_future = goal_handle.get_result_async()
159 self.
info_msginfo_msg(
"Waiting for 'NavigateToPose' action to complete")
160 rclpy.spin_until_future_complete(self, get_result_future)
161 status = get_result_future.result().status
162 if status != GoalStatus.STATUS_SUCCEEDED:
163 result = get_result_future.result().result
164 if (result.error_code == expected_error_code
and
165 result.error_msg == expected_error_msg):
166 self.
info_msginfo_msg(f
'Goal failed as expected with status code: {status}'
167 f
' error code:{result.error_code}'
168 f
' error msg:{result.error_msg}')
171 self.
error_msgerror_msg(f
'Goal failed unexpectedly with status code: {status}'
172 f
' Expected error_code:{expected_error_code},'
173 f
' Got error_code:{result.error_code},'
174 f
' Expected error_msg:{expected_error_msg},'
175 f
' Got error_msg:{result.error_msg}')
178 self.
info_msginfo_msg(
'Goal succeeded!')
181 def poseCallback(self, msg: PoseWithCovarianceStamped) ->
None:
182 self.
info_msginfo_msg(
'Received amcl_pose')
186 def wait_for_node_active(self, node_name: str) ->
None:
188 self.
info_msginfo_msg(f
'Waiting for {node_name} to become active')
189 node_service = f
'{node_name}/get_state'
190 state_client: Client[GetState.Request, GetState.Response] = \
191 self.create_client(GetState, node_service)
192 while not state_client.wait_for_service(timeout_sec=1.0):
193 self.
info_msginfo_msg(f
'{node_service} service not available, waiting...')
194 req = GetState.Request()
196 while state !=
'active':
197 self.
info_msginfo_msg(f
'Getting {node_name} state...')
198 future = state_client.call_async(req)
199 rclpy.spin_until_future_complete(self, future)
200 if future.result()
is not None:
201 state = future.result().current_state.label
202 self.
info_msginfo_msg(f
'Result of get_state: {state}')
205 f
'Exception while calling service: {future.exception()!r}'
209 def shutdown(self) -> None:
210 self.
info_msginfo_msg(
'Shutting down')
211 self.action_client.destroy()
213 transition_service =
'lifecycle_manager_navigation/manage_nodes'
214 mgr_client: Client[ManageLifecycleNodes.Request, ManageLifecycleNodes.Response] = \
215 self.create_client(ManageLifecycleNodes, transition_service)
216 while not mgr_client.wait_for_service(timeout_sec=1.0):
217 self.
info_msginfo_msg(f
'{transition_service} service not available, waiting...')
219 req = ManageLifecycleNodes.Request()
220 req.command = ManageLifecycleNodes.Request.SHUTDOWN
221 future = mgr_client.call_async(req)
223 self.
info_msginfo_msg(
'Shutting down navigation lifecycle manager...')
224 rclpy.spin_until_future_complete(self, future)
226 self.
info_msginfo_msg(
'Shutting down navigation lifecycle manager complete.')
227 except Exception
as e:
228 self.
error_msgerror_msg(f
'Service call failed {e!r}')
229 transition_service =
'lifecycle_manager_localization/manage_nodes'
230 mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
231 while not mgr_client.wait_for_service(timeout_sec=1.0):
232 self.
info_msginfo_msg(f
'{transition_service} service not available, waiting...')
234 req = ManageLifecycleNodes.Request()
235 req.command = ManageLifecycleNodes.Request.SHUTDOWN
236 future = mgr_client.call_async(req)
238 self.
info_msginfo_msg(
'Shutting down localization lifecycle manager...')
239 rclpy.spin_until_future_complete(self, future)
241 self.
info_msginfo_msg(
'Shutting down localization lifecycle manager complete')
242 except Exception
as e:
243 self.
error_msgerror_msg(f
'Service call failed {e!r}')
245 def wait_for_initial_pose(self) -> None:
248 self.
info_msginfo_msg(
'Setting initial pose')
250 self.
info_msginfo_msg(
'Waiting for amcl_pose to be received')
251 rclpy.spin_once(self, timeout_sec=1)
254 def run_all_tests(robot_tester: NavTester) -> bool:
255 pose_out_of_bounds = Pose()
256 pose_out_of_bounds.position.x = 2000.0
257 pose_out_of_bounds.position.y = 4000.0
258 pose_out_of_bounds.position.z = 0.0
259 pose_out_of_bounds.orientation.x = 0.0
260 pose_out_of_bounds.orientation.y = 0.0
261 pose_out_of_bounds.orientation.z = 0.0
262 pose_out_of_bounds.orientation.w = 1.0
264 reasonable_pose = Pose()
265 reasonable_pose.position.x = -1.0
266 reasonable_pose.position.y = 0.0
267 reasonable_pose.position.z = 0.0
268 reasonable_pose.orientation.x = 0.0
269 reasonable_pose.orientation.y = 0.0
270 reasonable_pose.orientation.z = 0.0
271 reasonable_pose.orientation.w = 1.0
273 robot_tester.wait_for_node_active(
'amcl')
274 robot_tester.wait_for_initial_pose()
275 robot_tester.wait_for_node_active(
'bt_navigator')
276 robot_tester.setGoalChecker(
'general_goal_checker')
277 robot_tester.setProgressChecker(
'progress_checker')
281 robot_tester.info_msg(
'Test non existing behavior_tree xml file')
282 result = robot_tester.runNavigateAction(
283 goal_pose=pose_out_of_bounds,
284 behavior_tree=
'behavior_tree_that_does_not_exist.xml',
285 expected_error_code=NavigateThroughPoses.Result.FAILED_TO_LOAD_BEHAVIOR_TREE,
286 expected_error_msg=(
'Error loading XML file: behavior_tree_that_does_not_exist.xml. '
287 'Navigation canceled.'))
290 robot_tester.info_msg(
'Test goal out of bounds')
291 result = robot_tester.runNavigateAction(
292 goal_pose=pose_out_of_bounds,
294 expected_error_code=304,
295 expected_error_msg=(
'GridBasedplugin failed to plan from '
296 '(-2.00, -0.50) to (2000.00, 4000.00): '
297 '"Goal Coordinates of(2000.000000, 4000.000000) '
298 'was outside bounds"'))
301 robot_tester.info_msg(
'Test for unknown goal checker')
302 robot_tester.setGoalChecker(
'junk_goal_checker')
303 result = robot_tester.runNavigateAction(
304 goal_pose=reasonable_pose,
306 expected_error_code=100,
307 expected_error_msg=(
'Failed to find goal checker name: junk_goal_checker'))
308 robot_tester.setGoalChecker(
'general_goal_checker')
311 robot_tester.info_msg(
'Test for unknown progress checker')
312 robot_tester.setProgressChecker(
'junk_progress_checker')
313 result = robot_tester.runNavigateAction(
314 goal_pose=reasonable_pose,
316 expected_error_code=100,
317 expected_error_msg=(
'Failed to find progress checker name: junk_progress_checker'))
318 robot_tester.setProgressChecker(
'progress_checker')
321 robot_tester.info_msg(
'Test for impossible to achieve progress parameters')
322 robot_tester.setControllerParam(
323 'progress_checker.movement_time_allowance',
324 Parameter.Type.DOUBLE,
326 robot_tester.setControllerParam(
327 'progress_checker.required_movement_radius',
328 Parameter.Type.DOUBLE,
332 robot_tester.setControllerParam(
333 'FollowPath.max_vel_x',
334 Parameter.Type.DOUBLE,
336 result = robot_tester.runNavigateAction(
337 goal_pose=reasonable_pose,
339 expected_error_code=105,
340 expected_error_msg=(
'Failed to make progress'))
344 robot_tester.info_msg(
'Test PASSED')
346 robot_tester.error_msg(
'Test FAILED')
351 def fwd_pose(x: float = 0.0, y: float = 0.0, z: float = 0.01) -> Pose:
352 initial_pose = Pose()
353 initial_pose.position.x = x
354 initial_pose.position.y = y
355 initial_pose.position.z = z
356 initial_pose.orientation.x = 0.0
357 initial_pose.orientation.y = 0.0
358 initial_pose.orientation.z = 0.0
359 initial_pose.orientation.w = 1.0
363 def get_testers(args: argparse.Namespace) -> list[NavTester]:
366 init_x, init_y, final_x, final_y = args.robot[0]
368 initial_pose=fwd_pose(float(init_x), float(init_y)),
369 goal_pose=fwd_pose(float(final_x), float(final_y)),
372 'Starting tester, robot going from '
382 testers.append(tester)
386 def main(argv: list[str] = sys.argv[1:]):
388 parser = argparse.ArgumentParser(description=
'System-level navigation tester node')
389 group = parser.add_mutually_exclusive_group(required=
True)
395 metavar=(
'init_x',
'init_y',
'final_x',
'final_y'),
396 help=
'The robot starting and final positions.',
399 args, _ = parser.parse_known_args()
404 testers = get_testers(args)
410 for tester
in testers:
411 passed = run_all_tests(tester)
415 for tester
in testers:
419 testers[0].info_msg(
'Done Shutting Down.')
422 testers[0].info_msg(
'Exiting failed')
425 testers[0].info_msg(
'Exiting passed')
429 if __name__ ==
'__main__':
None setInitialPose(self)
goal_checker_selector_pub
progress_checker_selector_pub
PoseStamped getStampedPoseMsg(self, Pose pose)
None error_msg(self, str msg)
None poseCallback(self, PoseWithCovarianceStamped msg)
None info_msg(self, str msg)