18 from typing
import Optional
20 from action_msgs.msg
import GoalStatus
22 from nav2_msgs.action
import ComputePathToPose, FollowWaypoints
23 from nav2_msgs.srv
import ManageLifecycleNodes
24 from rcl_interfaces.srv
import SetParameters
26 from rclpy.action
import ActionClient
27 from rclpy.action.client
import ClientGoalHandle
28 from rclpy.client
import Client
29 from rclpy.node
import Node
30 from rclpy.parameter
import Parameter
31 from rclpy.qos
import QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy
36 def __init__(self) -> None:
37 super().__init__(node_name=
'nav2_waypoint_tester', namespace=
'')
39 self.action_client: ActionClient[
41 FollowWaypoints.Result,
42 FollowWaypoints.Feedback
43 ] = ActionClient(self, FollowWaypoints,
'follow_waypoints')
45 PoseWithCovarianceStamped,
'initialpose', 10
48 self.
goal_handlegoal_handle: Optional[ClientGoalHandle[
49 FollowWaypoints.Goal, FollowWaypoints.Result,
50 FollowWaypoints.Feedback]] =
None
53 pose_qos = QoSProfile(
54 durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
55 reliability=QoSReliabilityPolicy.RELIABLE,
56 history=QoSHistoryPolicy.KEEP_LAST,
61 PoseWithCovarianceStamped,
'amcl_pose', self.
poseCallbackposeCallback, pose_qos
63 self.param_cli: Client[SetParameters.Request, SetParameters.Response] = \
65 SetParameters,
'/waypoint_follower/set_parameters'
68 def setInitialPose(self, pose: list[float]) ->
None:
69 self.
init_poseinit_pose = PoseWithCovarianceStamped()
70 self.
init_poseinit_pose.pose.pose.position.x = pose[0]
71 self.
init_poseinit_pose.pose.pose.position.y = pose[1]
72 self.
init_poseinit_pose.header.frame_id =
'map'
76 def poseCallback(self, msg: PoseWithCovarianceStamped) ->
None:
77 self.
info_msginfo_msg(
'Received amcl_pose')
80 def setWaypoints(self, waypoints: list[list[float]]) ->
None:
84 msg.header.frame_id =
'map'
85 msg.pose.position.x = wp[0]
86 msg.pose.position.y = wp[1]
87 msg.pose.orientation.w = 1.0
90 def run(self, block: bool, cancel: bool) -> bool:
95 while not self.action_client.wait_for_server(timeout_sec=1.0):
96 self.
info_msginfo_msg(
"'follow_waypoints' action server not available, waiting...")
98 action_request = FollowWaypoints.Goal()
99 action_request.poses = self.
waypointswaypoints
101 self.
info_msginfo_msg(
'Sending goal request...')
102 send_goal_future = self.action_client.send_goal_async(action_request)
104 rclpy.spin_until_future_complete(self, send_goal_future)
105 self.
goal_handlegoal_handle = send_goal_future.result()
106 except Exception
as e:
107 self.
error_msgerror_msg(f
'Service call failed {e!r}')
113 self.
info_msginfo_msg(
'Goal accepted')
117 get_result_future = self.
goal_handlegoal_handle.get_result_async()
122 self.
info_msginfo_msg(
"Waiting for 'follow_waypoints' action to complete")
124 rclpy.spin_until_future_complete(self, get_result_future)
125 status = get_result_future.result().status
126 result = get_result_future.result().result
128 except Exception
as e:
129 self.
error_msgerror_msg(f
'Service call failed {e!r}')
131 if status != GoalStatus.STATUS_SUCCEEDED:
132 self.
info_msginfo_msg(f
'Goal failed with status code: {status}')
134 if len(self.
action_resultaction_result.missed_waypoints) > 0:
136 'Goal failed to process all waypoints,'
137 f
' missed {len(self.action_result.missed_waypoints)} wps.'
141 self.
info_msginfo_msg(
'Goal succeeded!')
144 def publishInitialPose(self) -> None:
147 def setStopFailureParam(self, value: bool) ->
None:
148 req = SetParameters.Request()
150 Parameter(
'stop_on_failure', Parameter.Type.BOOL, value).to_parameter_msg()
152 future = self.param_cli.call_async(req)
153 rclpy.spin_until_future_complete(self, future)
155 def shutdown(self) -> None:
156 self.
info_msginfo_msg(
'Shutting down')
158 self.action_client.destroy()
159 self.
info_msginfo_msg(
'Destroyed follow_waypoints action client')
161 transition_service =
'lifecycle_manager_navigation/manage_nodes'
162 mgr_client: Client[ManageLifecycleNodes.Request, ManageLifecycleNodes.Response] = \
163 self.create_client(ManageLifecycleNodes, transition_service)
164 while not mgr_client.wait_for_service(timeout_sec=1.0):
165 self.
info_msginfo_msg(f
'{transition_service} service not available, waiting...')
167 req = ManageLifecycleNodes.Request()
168 req.command = ManageLifecycleNodes.Request().SHUTDOWN
169 future = mgr_client.call_async(req)
171 rclpy.spin_until_future_complete(self, future)
173 except Exception
as e:
174 self.
error_msgerror_msg(f
'{transition_service} service call failed {e!r}')
176 self.
info_msginfo_msg(f
'{transition_service} finished')
178 transition_service =
'lifecycle_manager_localization/manage_nodes'
179 mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
180 while not mgr_client.wait_for_service(timeout_sec=1.0):
181 self.
info_msginfo_msg(f
'{transition_service} service not available, waiting...')
183 req = ManageLifecycleNodes.Request()
184 req.command = ManageLifecycleNodes.Request().SHUTDOWN
185 future = mgr_client.call_async(req)
187 rclpy.spin_until_future_complete(self, future)
189 except Exception
as e:
190 self.
error_msgerror_msg(f
'{transition_service} service call failed {e!r}')
192 self.
info_msginfo_msg(f
'{transition_service} finished')
194 def cancel_goal(self) -> None:
195 cancel_future = self.
goal_handlegoal_handle.cancel_goal_async()
196 rclpy.spin_until_future_complete(self, cancel_future)
198 def info_msg(self, msg: str) ->
None:
199 self.get_logger().info(msg)
201 def warn_msg(self, msg: str) ->
None:
202 self.get_logger().warning(msg)
204 def error_msg(self, msg: str) ->
None:
205 self.get_logger().error(msg)
208 def main(argv: list[str] = sys.argv[1:]):
214 wps = [[-0.52, -0.54], [0.58, -0.55], [1.78, -0.57]]
215 starting_pose = [-2.0, -0.5]
218 test.setWaypoints(wps)
222 while not test.initial_pose_received
and retry_count <= retries:
224 test.info_msg(
'Setting initial pose')
225 test.setInitialPose(starting_pose)
226 test.info_msg(
'Waiting for amcl_pose to be received')
227 rclpy.spin_once(test, timeout_sec=1.0)
229 result = test.run(
True,
False)
233 test.setWaypoints([starting_pose])
234 result = test.run(
False,
False)
236 test.setWaypoints([wps[1]])
237 result = test.run(
False,
False)
245 test.setWaypoints([[100.0, 100.0]])
246 result = test.run(
True,
False)
250 test.action_result.missed_waypoints[0].error_code
251 == ComputePathToPose.Result().GOAL_OUTSIDE_MAP
253 assert (test.action_result.missed_waypoints[0].error_msg !=
'')
256 test.setStopFailureParam(
True)
257 bwps = [[-0.52, -0.54], [100.0, 100.0], [0.58, 0.52]]
258 test.setWaypoints(bwps)
259 result = test.run(
True,
False)
262 mwps = test.action_result.missed_waypoints
263 result = (len(mwps) == 1) & (mwps[0] == 1)
264 test.setStopFailureParam(
False)
267 test.setWaypoints([])
268 result = test.run(
True,
False)
271 test.setWaypoints(wps)
272 result = test.run(
True,
True)
277 test.info_msg(
'Done Shutting Down.')
280 test.info_msg(
'Exiting failed')
283 test.info_msg(
'Exiting passed')
287 if __name__ ==
'__main__':
None poseCallback(self, PoseWithCovarianceStamped msg)
None info_msg(self, str msg)
None error_msg(self, str msg)
None publishInitialPose(self)