21 from action_msgs.msg
import GoalStatus
23 from lifecycle_msgs.srv
import GetState
24 from nav2_msgs.action
import ComputeAndTrackRoute, ComputeRoute
25 from nav2_msgs.srv
import ManageLifecycleNodes
28 from rclpy.action
import ActionClient
29 from rclpy.client
import Client
30 from rclpy.node
import Node
31 from rclpy.qos
import QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy
32 from std_srvs.srv
import Trigger
37 def __init__(self, initial_pose: Pose, goal_pose: Pose, namespace: str =
''):
38 super().__init__(node_name=
'nav2_tester', namespace=namespace)
40 PoseWithCovarianceStamped,
'initialpose', 10
43 pose_qos = QoSProfile(
44 durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
45 reliability=QoSReliabilityPolicy.RELIABLE,
46 history=QoSHistoryPolicy.KEEP_LAST,
51 PoseWithCovarianceStamped,
'amcl_pose', self.
poseCallbackposeCallback, pose_qos
56 self.compute_action_client: ActionClient[
60 ] = ActionClient(self, ComputeRoute,
'compute_route')
61 self.compute_track_action_client: ActionClient[
62 ComputeAndTrackRoute.Goal,
63 ComputeAndTrackRoute.Result,
64 ComputeAndTrackRoute.Feedback
66 self, ComputeAndTrackRoute,
'compute_and_track_route')
67 self.
feedback_msgsfeedback_msgs: list[ComputeAndTrackRoute.Feedback] = []
71 def runComputeRouteTest(self, use_poses: bool =
True) -> bool:
73 self.
info_msginfo_msg(
"Waiting for 'ComputeRoute' action server")
74 while not self.compute_action_client.wait_for_server(timeout_sec=1.0):
75 self.
info_msginfo_msg(
"'ComputeRoute' action server not available, waiting...")
77 route_msg = ComputeRoute.Goal()
81 route_msg.use_start =
True
82 route_msg.use_poses =
True
85 route_msg.start_id = 7
86 route_msg.goal_id = 13
87 route_msg.use_start =
False
88 route_msg.use_poses =
False
90 self.
info_msginfo_msg(
'Sending ComputeRoute goal request...')
91 send_goal_future = self.compute_action_client.send_goal_async(route_msg)
93 rclpy.spin_until_future_complete(self, send_goal_future)
94 goal_handle = send_goal_future.result()
96 if not goal_handle
or not goal_handle.accepted:
100 self.
info_msginfo_msg(
'Goal accepted')
101 get_result_future = goal_handle.get_result_async()
103 self.
info_msginfo_msg(
"Waiting for 'ComputeRoute' action to complete")
104 rclpy.spin_until_future_complete(self, get_result_future)
105 status = get_result_future.result().status
106 result = get_result_future.result().result
107 if status != GoalStatus.STATUS_SUCCEEDED:
108 self.
info_msginfo_msg(f
'Goal failed with status code: {status}')
111 self.
info_msginfo_msg(
'Action completed! Checking validity of results...')
114 assert (len(result.path.poses) == 80)
115 assert (result.route.route_cost > 6)
116 assert (result.route.route_cost < 7)
117 assert (len(result.route.nodes) == 5)
118 assert (len(result.route.edges) == 4)
119 assert (result.error_code == 0)
120 assert (result.error_msg ==
'')
122 self.
info_msginfo_msg(
'Goal succeeded!')
125 def runComputeRouteSamePoseTest(self) -> bool:
127 self.
info_msginfo_msg(
"Waiting for 'ComputeRoute' action server")
128 while not self.compute_action_client.wait_for_server(timeout_sec=1.0):
129 self.
info_msginfo_msg(
"'ComputeRoute' action server not available, waiting...")
131 route_msg = ComputeRoute.Goal()
132 route_msg.start_id = 7
133 route_msg.goal_id = 7
134 route_msg.use_start =
False
135 route_msg.use_poses =
False
137 self.
info_msginfo_msg(
'Sending ComputeRoute goal request...')
138 send_goal_future = self.compute_action_client.send_goal_async(route_msg)
140 rclpy.spin_until_future_complete(self, send_goal_future)
141 goal_handle = send_goal_future.result()
143 if not goal_handle
or not goal_handle.accepted:
147 self.
info_msginfo_msg(
'Goal accepted')
148 get_result_future = goal_handle.get_result_async()
150 self.
info_msginfo_msg(
"Waiting for 'ComputeRoute' action to complete")
151 rclpy.spin_until_future_complete(self, get_result_future)
152 status = get_result_future.result().status
153 result = get_result_future.result().result
154 if status != GoalStatus.STATUS_SUCCEEDED:
155 self.
info_msginfo_msg(f
'Goal failed with status code: {status}')
158 self.
info_msginfo_msg(
'Action completed! Checking validity of results...')
161 assert (len(result.path.poses) == 1)
162 assert (len(result.route.nodes) == 1)
163 assert (len(result.route.edges) == 0)
164 assert (result.error_code == 0)
165 assert (result.error_msg ==
'')
167 self.
info_msginfo_msg(
'Goal succeeded!')
170 def runTrackRouteTest(self) -> bool:
172 self.
info_msginfo_msg(
"Waiting for 'ComputeAndTrackRoute' action server")
173 while not self.compute_track_action_client.wait_for_server(timeout_sec=1.0):
174 self.
info_msginfo_msg(
"'ComputeAndTrackRoute' action server not available, waiting...")
176 route_msg = ComputeAndTrackRoute.Goal()
178 route_msg.use_start =
False
179 route_msg.use_poses =
True
181 self.
info_msginfo_msg(
'Sending ComputeAndTrackRoute goal request...')
182 send_goal_future = self.compute_track_action_client.send_goal_async(
185 rclpy.spin_until_future_complete(self, send_goal_future)
186 goal_handle = send_goal_future.result()
188 if not goal_handle
or not goal_handle.accepted:
192 self.
info_msginfo_msg(
'Goal accepted')
193 get_result_future = goal_handle.get_result_async()
197 self.
info_msginfo_msg(
'Triggering a reroute')
198 srv_client: Client[Trigger.Request, Trigger.Response] = \
199 self.create_client(Trigger,
'route_server/ReroutingService/reroute')
200 while not srv_client.wait_for_service(timeout_sec=1.0):
201 self.
info_msginfo_msg(
'Reroute service not available, waiting...')
202 req = Trigger.Request()
203 future = srv_client.call_async(req)
204 rclpy.spin_until_future_complete(self, future)
205 if future.result()
is not None:
206 self.
info_msginfo_msg(
'Reroute triggered')
208 self.
error_msgerror_msg(
'Reroute failed')
214 cancel_future = goal_handle.cancel_goal_async()
215 rclpy.spin_until_future_complete(self, cancel_future)
216 status = cancel_future.result()
217 if status
is not None and len(status.goals_canceling) > 0:
218 self.
info_msginfo_msg(
'Action cancel completed!')
220 self.
info_msginfo_msg(
'Goal cancel failed')
224 self.
info_msginfo_msg(
'Sending ComputeAndTrackRoute goal request...')
225 send_goal_future = self.compute_track_action_client.send_goal_async(
228 rclpy.spin_until_future_complete(self, send_goal_future)
229 goal_handle = send_goal_future.result()
231 if not goal_handle
or not goal_handle.accepted:
235 self.
info_msginfo_msg(
'Goal accepted')
236 get_result_future = goal_handle.get_result_async()
242 route_msg.use_poses =
False
243 route_msg.start_id = 7
244 route_msg.goal_id = 13
245 send_goal_future = self.compute_track_action_client.send_goal_async(
248 rclpy.spin_until_future_complete(self, send_goal_future)
249 goal_handle = send_goal_future.result()
251 if not goal_handle
or not goal_handle.accepted:
255 self.
info_msginfo_msg(
'Goal accepted')
256 get_result_future = goal_handle.get_result_async()
259 self.
info_msginfo_msg(
"Waiting for 'ComputeAndTrackRoute' action to complete")
261 last_feedback_msg =
None
262 follow_path_task =
None
264 rclpy.spin_until_future_complete(self, get_result_future, timeout_sec=0.10)
265 if get_result_future.result()
is not None:
266 status = get_result_future.result().status
267 if status == GoalStatus.STATUS_SUCCEEDED:
269 elif status == GoalStatus.STATUS_CANCELED
or status == GoalStatus.STATUS_ABORTED:
270 self.
info_msginfo_msg(f
'Goal failed with status code: {status}')
278 if (last_feedback_msg
and feedback_msg.path != last_feedback_msg.path):
279 follow_path_task = self.
navigatornavigator.followPath(feedback_msg.path)
282 if last_feedback_msg
and \
283 last_feedback_msg.current_edge_id != feedback_msg.current_edge_id
and \
284 int(feedback_msg.current_edge_id) != 0:
285 if last_feedback_msg.next_node_id != feedback_msg.last_node_id:
286 self.
error_msgerror_msg(
'Feedback state is not tracking in order!')
289 last_feedback_msg = feedback_msg
292 if last_feedback_msg
is None:
293 self.
error_msgerror_msg(
'No feedback message received!')
296 if int(last_feedback_msg.next_node_id) != 0:
297 self.
error_msgerror_msg(
'Terminal feedback state of nodes is not correct!')
299 if int(last_feedback_msg.current_edge_id) != 0:
300 self.
error_msgerror_msg(
'Terminal feedback state of edges is not correct!')
302 if int(last_feedback_msg.route.nodes[-1].nodeid) != 13:
303 self.
error_msgerror_msg(
'Final route node is not correct!')
306 while not self.
navigatornavigator.isTaskComplete(task=follow_path_task):
309 self.
info_msginfo_msg(
'Action completed! Checking validity of terminal condition...')
313 self.
error_msgerror_msg(
'Did not make it to the goal pose!')
316 self.
info_msginfo_msg(
'Goal succeeded!')
319 def feedback_callback(self, feedback_msg: ComputeAndTrackRoute.Feedback) ->
None:
320 self.
feedback_msgsfeedback_msgs.append(feedback_msg.feedback)
322 def distanceFromGoal(self) -> float:
325 distance = math.sqrt(d_x * d_x + d_y * d_y)
326 self.
info_msginfo_msg(f
'Distance from goal is: {distance}')
329 def info_msg(self, msg: str) ->
None:
330 self.get_logger().info(
'\033[1;37;44m' + msg +
'\033[0m')
332 def error_msg(self, msg: str) ->
None:
333 self.get_logger().error(
'\033[1;37;41m' + msg +
'\033[0m')
335 def setInitialPose(self) -> None:
336 msg = PoseWithCovarianceStamped()
338 msg.header.frame_id =
'map'
339 self.
info_msginfo_msg(
'Publishing Initial Pose')
343 def getStampedPoseMsg(self, pose: Pose) -> PoseStamped:
345 msg.header.frame_id =
'map'
349 def poseCallback(self, msg: PoseWithCovarianceStamped) ->
None:
350 self.
info_msginfo_msg(
'Received amcl_pose')
354 def wait_for_node_active(self, node_name: str) ->
None:
356 self.
info_msginfo_msg(f
'Waiting for {node_name} to become active')
357 node_service = f
'{node_name}/get_state'
358 state_client: Client[GetState.Request, GetState.Response] = \
359 self.create_client(GetState, node_service)
360 while not state_client.wait_for_service(timeout_sec=1.0):
361 self.
info_msginfo_msg(f
'{node_service} service not available, waiting...')
362 req = GetState.Request()
364 while state !=
'active':
365 self.
info_msginfo_msg(f
'Getting {node_name} state...')
366 future = state_client.call_async(req)
367 rclpy.spin_until_future_complete(self, future)
368 if future.result()
is not None:
369 state = future.result().current_state.label
370 self.
info_msginfo_msg(f
'Result of get_state: {state}')
373 f
'Exception while calling service: {future.exception()!r}'
377 def shutdown(self) -> None:
378 self.
info_msginfo_msg(
'Shutting down')
379 self.compute_action_client.destroy()
380 self.compute_track_action_client.destroy()
382 transition_service =
'lifecycle_manager_navigation/manage_nodes'
383 mgr_client: Client[ManageLifecycleNodes.Request, ManageLifecycleNodes.Response] = \
384 self.create_client(ManageLifecycleNodes, transition_service)
385 while not mgr_client.wait_for_service(timeout_sec=1.0):
386 self.
info_msginfo_msg(f
'{transition_service} service not available, waiting...')
388 req = ManageLifecycleNodes.Request()
389 req.command = ManageLifecycleNodes.Request().SHUTDOWN
390 future = mgr_client.call_async(req)
392 self.
info_msginfo_msg(
'Shutting down navigation lifecycle manager...')
393 rclpy.spin_until_future_complete(self, future)
395 self.
info_msginfo_msg(
'Shutting down navigation lifecycle manager complete.')
396 except Exception
as e:
397 self.
error_msgerror_msg(f
'Service call failed {e!r}')
398 transition_service =
'lifecycle_manager_localization/manage_nodes'
399 mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
400 while not mgr_client.wait_for_service(timeout_sec=1.0):
401 self.
info_msginfo_msg(f
'{transition_service} service not available, waiting...')
403 req = ManageLifecycleNodes.Request()
404 req.command = ManageLifecycleNodes.Request().SHUTDOWN
405 future = mgr_client.call_async(req)
407 self.
info_msginfo_msg(
'Shutting down localization lifecycle manager...')
408 rclpy.spin_until_future_complete(self, future)
410 self.
info_msginfo_msg(
'Shutting down localization lifecycle manager complete')
411 except Exception
as e:
412 self.
error_msgerror_msg(f
'Service call failed {e!r}')
414 def wait_for_initial_pose(self) -> None:
417 self.
info_msginfo_msg(
'Setting initial pose')
419 self.
info_msginfo_msg(
'Waiting for amcl_pose to be received')
420 rclpy.spin_once(self, timeout_sec=1)
423 def run_all_tests(robot_tester: RouteTester) -> bool:
425 robot_tester.wait_for_node_active(
'amcl')
426 robot_tester.wait_for_initial_pose()
427 robot_tester.wait_for_node_active(
'bt_navigator')
428 result_poses = robot_tester.runComputeRouteTest(use_poses=
True)
429 result_node_ids = robot_tester.runComputeRouteTest(use_poses=
False)
430 result_same = robot_tester.runComputeRouteSamePoseTest()
431 result = result_poses
and result_node_ids
and result_same
and robot_tester.runTrackRouteTest()
434 robot_tester.info_msg(
'Test PASSED')
436 robot_tester.error_msg(
'Test FAILED')
440 def fwd_pose(x: float = 0.0, y: float = 0.0, z: float = 0.01) -> Pose:
441 initial_pose = Pose()
442 initial_pose.position.x = x
443 initial_pose.position.y = y
444 initial_pose.position.z = z
445 initial_pose.orientation.x = 0.0
446 initial_pose.orientation.y = 0.0
447 initial_pose.orientation.z = 0.0
448 initial_pose.orientation.w = 1.0
452 def main(argv: list[str] = sys.argv[1:]):
454 parser = argparse.ArgumentParser(description=
'Route server tester node')
455 group = parser.add_mutually_exclusive_group(required=
True)
461 metavar=(
'init_x',
'init_y',
'final_x',
'final_y'),
462 help=
'The robot starting and final positions.',
464 args, unknown = parser.parse_known_args()
469 init_x, init_y, final_x, final_y = args.robot[0]
471 initial_pose=fwd_pose(float(init_x), float(init_y)),
472 goal_pose=fwd_pose(float(final_x), float(final_y)),
475 'Starting tester, robot going from '
483 +
' via route server.'
490 passed = run_all_tests(tester)
493 tester.info_msg(
'Done Shutting Down.')
496 tester.info_msg(
'Exiting failed')
499 tester.info_msg(
'Exiting passed')
503 if __name__ ==
'__main__':
None feedback_callback(self, ComputeAndTrackRoute.Feedback feedback_msg)
None info_msg(self, str msg)
None poseCallback(self, PoseWithCovarianceStamped msg)
None error_msg(self, str msg)
None setInitialPose(self)
PoseStamped getStampedPoseMsg(self, Pose pose)
float distanceFromGoal(self)