21 from action_msgs.msg
import GoalStatus
23 from lifecycle_msgs.srv
import GetState
24 from nav2_msgs.action
import ComputeAndTrackRoute, ComputeRoute
25 from nav2_msgs.srv
import ManageLifecycleNodes
28 from rclpy.action
import ActionClient
29 from rclpy.node
import Node
30 from rclpy.qos
import QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy
31 from std_srvs.srv
import Trigger
36 def __init__(self, initial_pose: Pose, goal_pose: Pose, namespace: str =
''):
37 super().__init__(node_name=
'nav2_tester', namespace=namespace)
39 PoseWithCovarianceStamped,
'initialpose', 10
42 pose_qos = QoSProfile(
43 durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
44 reliability=QoSReliabilityPolicy.RELIABLE,
45 history=QoSHistoryPolicy.KEEP_LAST,
50 PoseWithCovarianceStamped,
'amcl_pose', self.
poseCallbackposeCallback, pose_qos
57 self, ComputeAndTrackRoute,
'compute_and_track_route')
58 self.
feedback_msgsfeedback_msgs: list[ComputeAndTrackRoute.Feedback] = []
62 def runComputeRouteTest(self, use_poses: bool =
True) -> bool:
64 self.
info_msginfo_msg(
"Waiting for 'ComputeRoute' action server")
66 self.
info_msginfo_msg(
"'ComputeRoute' action server not available, waiting...")
68 route_msg = ComputeRoute.Goal()
72 route_msg.use_start =
True
73 route_msg.use_poses =
True
76 route_msg.start_id = 7
77 route_msg.goal_id = 13
78 route_msg.use_start =
False
79 route_msg.use_poses =
False
81 self.
info_msginfo_msg(
'Sending ComputeRoute goal request...')
84 rclpy.spin_until_future_complete(self, send_goal_future)
85 goal_handle = send_goal_future.result()
87 if not goal_handle
or not goal_handle.accepted:
91 self.
info_msginfo_msg(
'Goal accepted')
92 get_result_future = goal_handle.get_result_async()
94 self.
info_msginfo_msg(
"Waiting for 'ComputeRoute' action to complete")
95 rclpy.spin_until_future_complete(self, get_result_future)
96 status = get_result_future.result().status
97 result = get_result_future.result().result
98 if status != GoalStatus.STATUS_SUCCEEDED:
99 self.
info_msginfo_msg(f
'Goal failed with status code: {status}')
102 self.
info_msginfo_msg(
'Action completed! Checking validity of results...')
105 assert (len(result.path.poses) == 80)
106 assert (result.route.route_cost > 6)
107 assert (result.route.route_cost < 7)
108 assert (len(result.route.nodes) == 5)
109 assert (len(result.route.edges) == 4)
110 assert (result.error_code == 0)
111 assert (result.error_msg ==
'')
113 self.
info_msginfo_msg(
'Goal succeeded!')
116 def runComputeRouteSamePoseTest(self) -> bool:
118 self.
info_msginfo_msg(
"Waiting for 'ComputeRoute' action server")
120 self.
info_msginfo_msg(
"'ComputeRoute' action server not available, waiting...")
122 route_msg = ComputeRoute.Goal()
123 route_msg.start_id = 7
124 route_msg.goal_id = 7
125 route_msg.use_start =
False
126 route_msg.use_poses =
False
128 self.
info_msginfo_msg(
'Sending ComputeRoute goal request...')
131 rclpy.spin_until_future_complete(self, send_goal_future)
132 goal_handle = send_goal_future.result()
134 if not goal_handle
or not goal_handle.accepted:
138 self.
info_msginfo_msg(
'Goal accepted')
139 get_result_future = goal_handle.get_result_async()
141 self.
info_msginfo_msg(
"Waiting for 'ComputeRoute' action to complete")
142 rclpy.spin_until_future_complete(self, get_result_future)
143 status = get_result_future.result().status
144 result = get_result_future.result().result
145 if status != GoalStatus.STATUS_SUCCEEDED:
146 self.
info_msginfo_msg(f
'Goal failed with status code: {status}')
149 self.
info_msginfo_msg(
'Action completed! Checking validity of results...')
152 assert (len(result.path.poses) == 1)
153 assert (len(result.route.nodes) == 1)
154 assert (len(result.route.edges) == 0)
155 assert (result.error_code == 0)
156 assert (result.error_msg ==
'')
158 self.
info_msginfo_msg(
'Goal succeeded!')
161 def runTrackRouteTest(self) -> bool:
163 self.
info_msginfo_msg(
"Waiting for 'ComputeAndTrackRoute' action server")
165 self.
info_msginfo_msg(
"'ComputeAndTrackRoute' action server not available, waiting...")
167 route_msg = ComputeAndTrackRoute.Goal()
169 route_msg.use_start =
False
170 route_msg.use_poses =
True
172 self.
info_msginfo_msg(
'Sending ComputeAndTrackRoute goal request...')
176 rclpy.spin_until_future_complete(self, send_goal_future)
177 goal_handle = send_goal_future.result()
179 if not goal_handle
or not goal_handle.accepted:
183 self.
info_msginfo_msg(
'Goal accepted')
184 get_result_future = goal_handle.get_result_async()
188 self.
info_msginfo_msg(
'Triggering a reroute')
189 srv_client = self.create_client(Trigger,
'route_server/ReroutingService/reroute')
190 while not srv_client.wait_for_service(timeout_sec=1.0):
191 self.
info_msginfo_msg(
'Reroute service not available, waiting...')
192 req = Trigger.Request()
193 future = srv_client.call_async(req)
194 rclpy.spin_until_future_complete(self, future)
195 if future.result()
is not None:
196 self.
info_msginfo_msg(
'Reroute triggered')
198 self.
error_msgerror_msg(
'Reroute failed')
204 cancel_future = goal_handle.cancel_goal_async()
205 rclpy.spin_until_future_complete(self, cancel_future)
206 status = cancel_future.result()
207 if status
is not None and len(status.goals_canceling) > 0:
208 self.
info_msginfo_msg(
'Action cancel completed!')
210 self.
info_msginfo_msg(
'Goal cancel failed')
214 self.
info_msginfo_msg(
'Sending ComputeAndTrackRoute goal request...')
218 rclpy.spin_until_future_complete(self, send_goal_future)
219 goal_handle = send_goal_future.result()
221 if not goal_handle
or not goal_handle.accepted:
225 self.
info_msginfo_msg(
'Goal accepted')
226 get_result_future = goal_handle.get_result_async()
232 route_msg.use_poses =
False
233 route_msg.start_id = 7
234 route_msg.goal_id = 13
238 rclpy.spin_until_future_complete(self, send_goal_future)
239 goal_handle = send_goal_future.result()
241 if not goal_handle
or not goal_handle.accepted:
245 self.
info_msginfo_msg(
'Goal accepted')
246 get_result_future = goal_handle.get_result_async()
249 self.
info_msginfo_msg(
"Waiting for 'ComputeAndTrackRoute' action to complete")
251 last_feedback_msg =
None
252 follow_path_task =
None
254 rclpy.spin_until_future_complete(self, get_result_future, timeout_sec=0.10)
255 if get_result_future.result()
is not None:
256 status = get_result_future.result().status
257 if status == GoalStatus.STATUS_SUCCEEDED:
259 elif status == GoalStatus.STATUS_CANCELED
or status == GoalStatus.STATUS_ABORTED:
260 self.
info_msginfo_msg(f
'Goal failed with status code: {status}')
268 if (last_feedback_msg
and feedback_msg.path != last_feedback_msg.path):
269 follow_path_task = self.
navigatornavigator.followPath(feedback_msg.path)
272 if last_feedback_msg
and \
273 last_feedback_msg.current_edge_id != feedback_msg.current_edge_id
and \
274 int(feedback_msg.current_edge_id) != 0:
275 if last_feedback_msg.next_node_id != feedback_msg.last_node_id:
276 self.
error_msgerror_msg(
'Feedback state is not tracking in order!')
279 last_feedback_msg = feedback_msg
282 if last_feedback_msg
is None:
283 self.
error_msgerror_msg(
'No feedback message received!')
286 if int(last_feedback_msg.next_node_id) != 0:
287 self.
error_msgerror_msg(
'Terminal feedback state of nodes is not correct!')
289 if int(last_feedback_msg.current_edge_id) != 0:
290 self.
error_msgerror_msg(
'Terminal feedback state of edges is not correct!')
292 if int(last_feedback_msg.route.nodes[-1].nodeid) != 13:
293 self.
error_msgerror_msg(
'Final route node is not correct!')
296 while not self.
navigatornavigator.isTaskComplete(task=follow_path_task):
299 self.
info_msginfo_msg(
'Action completed! Checking validity of terminal condition...')
303 self.
error_msgerror_msg(
'Did not make it to the goal pose!')
306 self.
info_msginfo_msg(
'Goal succeeded!')
309 def feedback_callback(self, feedback_msg: ComputeAndTrackRoute.Feedback) ->
None:
310 self.
feedback_msgsfeedback_msgs.append(feedback_msg.feedback)
312 def distanceFromGoal(self) -> float:
315 distance = math.sqrt(d_x * d_x + d_y * d_y)
316 self.
info_msginfo_msg(f
'Distance from goal is: {distance}')
319 def info_msg(self, msg: str) ->
None:
320 self.get_logger().info(
'\033[1;37;44m' + msg +
'\033[0m')
322 def error_msg(self, msg: str) ->
None:
323 self.get_logger().error(
'\033[1;37;41m' + msg +
'\033[0m')
325 def setInitialPose(self) -> None:
326 msg = PoseWithCovarianceStamped()
328 msg.header.frame_id =
'map'
329 self.
info_msginfo_msg(
'Publishing Initial Pose')
333 def getStampedPoseMsg(self, pose: Pose) -> PoseStamped:
335 msg.header.frame_id =
'map'
339 def poseCallback(self, msg: PoseWithCovarianceStamped) ->
None:
340 self.
info_msginfo_msg(
'Received amcl_pose')
344 def wait_for_node_active(self, node_name: str) ->
None:
346 self.
info_msginfo_msg(f
'Waiting for {node_name} to become active')
347 node_service = f
'{node_name}/get_state'
348 state_client = self.create_client(GetState, node_service)
349 while not state_client.wait_for_service(timeout_sec=1.0):
350 self.
info_msginfo_msg(f
'{node_service} service not available, waiting...')
351 req = GetState.Request()
353 while state !=
'active':
354 self.
info_msginfo_msg(f
'Getting {node_name} state...')
355 future = state_client.call_async(req)
356 rclpy.spin_until_future_complete(self, future)
357 if future.result()
is not None:
358 state = future.result().current_state.label
359 self.
info_msginfo_msg(f
'Result of get_state: {state}')
362 f
'Exception while calling service: {future.exception()!r}'
366 def shutdown(self) -> None:
367 self.
info_msginfo_msg(
'Shutting down')
371 transition_service =
'lifecycle_manager_navigation/manage_nodes'
372 mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
373 while not mgr_client.wait_for_service(timeout_sec=1.0):
374 self.
info_msginfo_msg(f
'{transition_service} service not available, waiting...')
376 req = ManageLifecycleNodes.Request()
377 req.command = ManageLifecycleNodes.Request().SHUTDOWN
378 future = mgr_client.call_async(req)
380 self.
info_msginfo_msg(
'Shutting down navigation lifecycle manager...')
381 rclpy.spin_until_future_complete(self, future)
383 self.
info_msginfo_msg(
'Shutting down navigation lifecycle manager complete.')
384 except Exception
as e:
385 self.
error_msgerror_msg(f
'Service call failed {e!r}')
386 transition_service =
'lifecycle_manager_localization/manage_nodes'
387 mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
388 while not mgr_client.wait_for_service(timeout_sec=1.0):
389 self.
info_msginfo_msg(f
'{transition_service} service not available, waiting...')
391 req = ManageLifecycleNodes.Request()
392 req.command = ManageLifecycleNodes.Request().SHUTDOWN
393 future = mgr_client.call_async(req)
395 self.
info_msginfo_msg(
'Shutting down localization lifecycle manager...')
396 rclpy.spin_until_future_complete(self, future)
398 self.
info_msginfo_msg(
'Shutting down localization lifecycle manager complete')
399 except Exception
as e:
400 self.
error_msgerror_msg(f
'Service call failed {e!r}')
402 def wait_for_initial_pose(self) -> None:
405 self.
info_msginfo_msg(
'Setting initial pose')
407 self.
info_msginfo_msg(
'Waiting for amcl_pose to be received')
408 rclpy.spin_once(self, timeout_sec=1)
411 def run_all_tests(robot_tester: RouteTester) -> bool:
413 robot_tester.wait_for_node_active(
'amcl')
414 robot_tester.wait_for_initial_pose()
415 robot_tester.wait_for_node_active(
'bt_navigator')
416 result_poses = robot_tester.runComputeRouteTest(use_poses=
True)
417 result_node_ids = robot_tester.runComputeRouteTest(use_poses=
False)
418 result_same = robot_tester.runComputeRouteSamePoseTest()
419 result = result_poses
and result_node_ids
and result_same
and robot_tester.runTrackRouteTest()
422 robot_tester.info_msg(
'Test PASSED')
424 robot_tester.error_msg(
'Test FAILED')
428 def fwd_pose(x: float = 0.0, y: float = 0.0, z: float = 0.01) -> Pose:
429 initial_pose = Pose()
430 initial_pose.position.x = x
431 initial_pose.position.y = y
432 initial_pose.position.z = z
433 initial_pose.orientation.x = 0.0
434 initial_pose.orientation.y = 0.0
435 initial_pose.orientation.z = 0.0
436 initial_pose.orientation.w = 1.0
440 def main(argv: list[str] = sys.argv[1:]):
442 parser = argparse.ArgumentParser(description=
'Route server tester node')
443 group = parser.add_mutually_exclusive_group(required=
True)
449 metavar=(
'init_x',
'init_y',
'final_x',
'final_y'),
450 help=
'The robot starting and final positions.',
452 args, unknown = parser.parse_known_args()
457 init_x, init_y, final_x, final_y = args.robot[0]
459 initial_pose=fwd_pose(float(init_x), float(init_y)),
460 goal_pose=fwd_pose(float(final_x), float(final_y)),
463 'Starting tester, robot going from '
471 +
' via route server.'
478 passed = run_all_tests(tester)
481 tester.info_msg(
'Done Shutting Down.')
484 tester.info_msg(
'Exiting failed')
487 tester.info_msg(
'Exiting passed')
491 if __name__ ==
'__main__':
None feedback_callback(self, ComputeAndTrackRoute.Feedback feedback_msg)
None info_msg(self, str msg)
None poseCallback(self, PoseWithCovarianceStamped msg)
compute_track_action_client
None error_msg(self, str msg)
None setInitialPose(self)
PoseStamped getStampedPoseMsg(self, Pose pose)
float distanceFromGoal(self)