20 from typing
import Optional
22 from action_msgs.msg
import GoalStatus
24 from lifecycle_msgs.srv
import GetState
25 from nav2_msgs.action
import NavigateThroughPoses
26 from nav2_msgs.srv
import ManageLifecycleNodes
28 from rclpy.action
import ActionClient
29 from rclpy.client
import Client
30 from rclpy.node
import Node
31 from rclpy.qos
import QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy
36 def __init__(self, initial_pose: Pose, goal_pose: Pose, namespace: str =
''):
37 super().__init__(node_name=
'nav2_tester', namespace=namespace)
39 PoseWithCovarianceStamped,
'initialpose', 10
42 pose_qos = QoSProfile(
43 durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
44 reliability=QoSReliabilityPolicy.RELIABLE,
45 history=QoSHistoryPolicy.KEEP_LAST,
50 PoseWithCovarianceStamped,
'amcl_pose', self.
poseCallbackposeCallback, pose_qos
55 self.action_client: ActionClient[
56 NavigateThroughPoses.Goal,
57 NavigateThroughPoses.Result,
58 NavigateThroughPoses.Feedback
59 ] = ActionClient(self, NavigateThroughPoses,
'navigate_through_poses')
61 def info_msg(self, msg: str) ->
None:
62 self.get_logger().info(
'\033[1;37;44m' + msg +
'\033[0m')
64 def warn_msg(self, msg: str) ->
None:
65 self.get_logger().warning(
'\033[1;37;43m' + msg +
'\033[0m')
67 def error_msg(self, msg: str) ->
None:
68 self.get_logger().error(
'\033[1;37;41m' + msg +
'\033[0m')
70 def setInitialPose(self) -> None:
71 msg = PoseWithCovarianceStamped()
73 msg.header.frame_id =
'map'
74 self.
info_msginfo_msg(
'Publishing Initial Pose')
78 def getStampedPoseMsg(self, pose: Pose) -> PoseStamped:
80 msg.header.frame_id =
'map'
84 def runNavigateAction(self, goal_pose: Optional[Pose] =
None) -> bool:
86 self.
info_msginfo_msg(
"Waiting for 'NavigateThroughPoses' action server")
87 while not self.action_client.wait_for_server(timeout_sec=1.0):
89 "'NavigateThroughPoses' action server not available, waiting..."
92 self.
goal_posegoal_pose = goal_pose
if goal_pose
is not None else self.
goal_posegoal_pose
93 goal_msg = NavigateThroughPoses.Goal()
94 goal_msg.poses.header.frame_id =
'map'
95 goal_msg.poses.header.stamp = self.get_clock().now().to_msg()
96 goal_msg.poses.goals = [
101 self.
info_msginfo_msg(
'Sending goal request...')
102 send_goal_future = self.action_client.send_goal_async(goal_msg)
104 rclpy.spin_until_future_complete(self, send_goal_future)
105 goal_handle = send_goal_future.result()
107 if not goal_handle
or not goal_handle.accepted:
111 self.
info_msginfo_msg(
'Goal accepted')
112 get_result_future = goal_handle.get_result_async()
114 self.
info_msginfo_msg(
"Waiting for 'NavigateToPose' action to complete")
115 rclpy.spin_until_future_complete(self, get_result_future)
116 status = get_result_future.result().status
117 if status != GoalStatus.STATUS_SUCCEEDED:
118 result = get_result_future.result().result
119 self.
info_msginfo_msg(f
'Goal failed with status code: {status}'
120 f
' error code:{result.error_code}'
121 f
' error msg:{result.error_msg}')
124 self.
info_msginfo_msg(
'Goal succeeded!')
127 def runFakeNavigateAction(self) -> bool:
129 self.
info_msginfo_msg(
"Waiting for 'NavigateThroughPoses' action server")
130 while not self.action_client.wait_for_server(timeout_sec=1.0):
132 "'NavigateThroughPoses' action server not available, waiting..."
135 goal_msg = NavigateThroughPoses.Goal()
137 self.
info_msginfo_msg(
'Sending goal request...')
138 send_goal_future = self.action_client.send_goal_async(goal_msg)
140 rclpy.spin_until_future_complete(self, send_goal_future)
141 goal_handle = send_goal_future.result()
143 if not goal_handle
or not goal_handle.accepted:
147 self.
info_msginfo_msg(
'Goal accepted')
148 get_result_future = goal_handle.get_result_async()
150 self.
info_msginfo_msg(
"Waiting for 'NavigateToPose' action to complete")
151 rclpy.spin_until_future_complete(self, get_result_future)
152 status = get_result_future.result().status
153 if status != GoalStatus.STATUS_SUCCEEDED:
154 result = get_result_future.result().result
155 self.
info_msginfo_msg(f
'Goal failed with status code: {status}'
156 f
' error code:{result.error_code}'
157 f
' error msg:{result.error_msg}')
160 self.
info_msginfo_msg(
'Goal succeeded!')
163 def runNavigatePreemptionAction(self, block: bool) -> bool:
165 self.
info_msginfo_msg(
"Waiting for 'NavigateThroughPoses' action server")
166 while not self.action_client.wait_for_server(timeout_sec=1.0):
168 "'NavigateThroughPoses' action server not available, waiting..."
171 goal_msg = NavigateThroughPoses.Goal()
174 self.
info_msginfo_msg(
'Sending goal request...')
175 send_goal_future = self.action_client.send_goal_async(goal_msg)
177 rclpy.spin_until_future_complete(self, send_goal_future)
178 goal_handle = send_goal_future.result()
180 if not goal_handle
or not goal_handle.accepted:
187 self.
info_msginfo_msg(
'Goal accepted')
188 get_result_future = goal_handle.get_result_async()
190 self.
info_msginfo_msg(
"Waiting for 'NavigateToPose' action to complete")
191 rclpy.spin_until_future_complete(self, get_result_future)
192 status = get_result_future.result().status
193 if status != GoalStatus.STATUS_SUCCEEDED:
194 result = get_result_future.result().result
195 self.
info_msginfo_msg(f
'Goal failed with status code: {status}'
196 f
' error code:{result.error_code}'
197 f
' error msg:{result.error_msg}')
200 self.
info_msginfo_msg(
'Goal succeeded!')
203 def poseCallback(self, msg: PoseWithCovarianceStamped) ->
None:
204 self.
info_msginfo_msg(
'Received amcl_pose')
208 def wait_for_node_active(self, node_name: str) ->
None:
210 self.
info_msginfo_msg(f
'Waiting for {node_name} to become active')
211 node_service = f
'{node_name}/get_state'
212 state_client: Client[GetState.Request, GetState.Response] = \
213 self.create_client(GetState, node_service)
214 while not state_client.wait_for_service(timeout_sec=1.0):
215 self.
info_msginfo_msg(f
'{node_service} service not available, waiting...')
216 req = GetState.Request()
218 while state !=
'active':
219 self.
info_msginfo_msg(f
'Getting {node_name} state...')
220 future = state_client.call_async(req)
221 rclpy.spin_until_future_complete(self, future)
222 if future.result()
is not None:
223 state = future.result().current_state.label
224 self.
info_msginfo_msg(f
'Result of get_state: {state}')
227 f
'Exception while calling service: {future.exception()!r}'
231 def shutdown(self) -> None:
232 self.
info_msginfo_msg(
'Shutting down')
233 self.action_client.destroy()
235 transition_service =
'lifecycle_manager_navigation/manage_nodes'
236 mgr_client: Client[ManageLifecycleNodes.Request, ManageLifecycleNodes.Response] = \
237 self.create_client(ManageLifecycleNodes, transition_service)
238 while not mgr_client.wait_for_service(timeout_sec=1.0):
239 self.
info_msginfo_msg(f
'{transition_service} service not available, waiting...')
241 req = ManageLifecycleNodes.Request()
242 req.command = ManageLifecycleNodes.Request().SHUTDOWN
243 future = mgr_client.call_async(req)
245 self.
info_msginfo_msg(
'Shutting down navigation lifecycle manager...')
246 rclpy.spin_until_future_complete(self, future)
248 self.
info_msginfo_msg(
'Shutting down navigation lifecycle manager complete.')
249 except Exception
as e:
250 self.
error_msgerror_msg(f
'Service call failed {e!r}')
251 transition_service =
'lifecycle_manager_localization/manage_nodes'
252 mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
253 while not mgr_client.wait_for_service(timeout_sec=1.0):
254 self.
info_msginfo_msg(f
'{transition_service} service not available, waiting...')
256 req = ManageLifecycleNodes.Request()
257 req.command = ManageLifecycleNodes.Request().SHUTDOWN
258 future = mgr_client.call_async(req)
260 self.
info_msginfo_msg(
'Shutting down localization lifecycle manager...')
261 rclpy.spin_until_future_complete(self, future)
263 self.
info_msginfo_msg(
'Shutting down localization lifecycle manager complete')
264 except Exception
as e:
265 self.
error_msgerror_msg(f
'Service call failed {e!r}')
267 def wait_for_initial_pose(self) -> None:
270 self.
info_msginfo_msg(
'Setting initial pose')
272 self.
info_msginfo_msg(
'Waiting for amcl_pose to be received')
273 rclpy.spin_once(self, timeout_sec=1)
276 def run_all_tests(robot_tester: NavTester) -> bool:
280 robot_tester.wait_for_node_active(
'amcl')
281 robot_tester.wait_for_initial_pose()
282 robot_tester.wait_for_node_active(
'bt_navigator')
283 result = robot_tester.runNavigateAction()
285 result = result
and not robot_tester.runFakeNavigateAction()
287 result = result
and robot_tester.runNavigatePreemptionAction(
False)
288 result = result
and robot_tester.runNavigatePreemptionAction(
True)
293 robot_tester.info_msg(
'Test PASSED')
295 robot_tester.error_msg(
'Test FAILED')
300 def fwd_pose(x: float = 0.0, y: float = 0.0, z: float = 0.01) -> Pose:
301 initial_pose = Pose()
302 initial_pose.position.x = x
303 initial_pose.position.y = y
304 initial_pose.position.z = z
305 initial_pose.orientation.x = 0.0
306 initial_pose.orientation.y = 0.0
307 initial_pose.orientation.z = 0.0
308 initial_pose.orientation.w = 1.0
312 def get_testers(args: argparse.Namespace) -> list[NavTester]:
315 init_x, init_y, final_x, final_y = args.robot[0]
317 initial_pose=fwd_pose(float(init_x), float(init_y)),
318 goal_pose=fwd_pose(float(final_x), float(final_y)),
321 'Starting tester, robot going from '
331 testers.append(tester)
335 def main(argv: list[str] = sys.argv[1:]):
337 parser = argparse.ArgumentParser(description=
'System-level navigation tester node')
338 group = parser.add_mutually_exclusive_group(required=
True)
344 metavar=(
'init_x',
'init_y',
'final_x',
'final_y'),
345 help=
'The robot starting and final positions.',
348 args, unknown = parser.parse_known_args()
353 testers = get_testers(args)
358 for tester
in testers:
359 passed = run_all_tests(tester)
363 for tester
in testers:
367 testers[0].info_msg(
'Done Shutting Down.')
370 testers[0].info_msg(
'Exiting failed')
373 testers[0].info_msg(
'Exiting passed')
377 if __name__ ==
'__main__':
None error_msg(self, str msg)
None poseCallback(self, PoseWithCovarianceStamped msg)
None setInitialPose(self)
PoseStamped getStampedPoseMsg(self, Pose pose)
None info_msg(self, str msg)