18 from typing
import Optional
20 from action_msgs.msg
import GoalStatus
22 from nav2_msgs.action
import ComputePathToPose, FollowWaypoints
23 from nav2_msgs.srv
import ManageLifecycleNodes
24 from rcl_interfaces.srv
import SetParameters
26 from rclpy.action
import ActionClient
27 from rclpy.action.client
import ClientGoalHandle
28 from rclpy.node
import Node
29 from rclpy.parameter
import Parameter
30 from rclpy.qos
import QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy
35 def __init__(self) -> None:
36 super().__init__(node_name=
'nav2_waypoint_tester', namespace=
'')
38 self.
action_clientaction_client = ActionClient(self, FollowWaypoints,
'follow_waypoints')
40 PoseWithCovarianceStamped,
'initialpose', 10
43 self.
goal_handlegoal_handle: Optional[ClientGoalHandle[
44 FollowWaypoints.Goal, FollowWaypoints.Result,
45 FollowWaypoints.Feedback]] =
None
48 pose_qos = QoSProfile(
49 durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
50 reliability=QoSReliabilityPolicy.RELIABLE,
51 history=QoSHistoryPolicy.KEEP_LAST,
56 PoseWithCovarianceStamped,
'amcl_pose', self.
poseCallbackposeCallback, pose_qos
58 self.
param_cliparam_cli = self.create_client(
59 SetParameters,
'/waypoint_follower/set_parameters'
62 def setInitialPose(self, pose: list[float]) ->
None:
63 self.
init_poseinit_pose = PoseWithCovarianceStamped()
64 self.
init_poseinit_pose.pose.pose.position.x = pose[0]
65 self.
init_poseinit_pose.pose.pose.position.y = pose[1]
66 self.
init_poseinit_pose.header.frame_id =
'map'
70 def poseCallback(self, msg: PoseWithCovarianceStamped) ->
None:
71 self.
info_msginfo_msg(
'Received amcl_pose')
74 def setWaypoints(self, waypoints: list[list[float]]) ->
None:
78 msg.header.frame_id =
'map'
79 msg.pose.position.x = wp[0]
80 msg.pose.position.y = wp[1]
81 msg.pose.orientation.w = 1.0
84 def run(self, block: bool, cancel: bool) -> bool:
89 while not self.
action_clientaction_client.wait_for_server(timeout_sec=1.0):
90 self.
info_msginfo_msg(
"'follow_waypoints' action server not available, waiting...")
92 action_request = FollowWaypoints.Goal()
93 action_request.poses = self.
waypointswaypoints
95 self.
info_msginfo_msg(
'Sending goal request...')
96 send_goal_future = self.
action_clientaction_client.send_goal_async(action_request)
98 rclpy.spin_until_future_complete(self, send_goal_future)
99 self.
goal_handlegoal_handle = send_goal_future.result()
100 except Exception
as e:
101 self.
error_msgerror_msg(f
'Service call failed {e!r}')
107 self.
info_msginfo_msg(
'Goal accepted')
111 get_result_future = self.
goal_handlegoal_handle.get_result_async()
116 self.
info_msginfo_msg(
"Waiting for 'follow_waypoints' action to complete")
118 rclpy.spin_until_future_complete(self, get_result_future)
119 status = get_result_future.result().status
120 result = get_result_future.result().result
122 except Exception
as e:
123 self.
error_msgerror_msg(f
'Service call failed {e!r}')
125 if status != GoalStatus.STATUS_SUCCEEDED:
126 self.
info_msginfo_msg(f
'Goal failed with status code: {status}')
128 if len(self.
action_resultaction_result.missed_waypoints) > 0:
130 'Goal failed to process all waypoints,'
131 f
' missed {len(self.action_result.missed_waypoints)} wps.'
135 self.
info_msginfo_msg(
'Goal succeeded!')
138 def publishInitialPose(self) -> None:
141 def setStopFailureParam(self, value: bool) ->
None:
142 req = SetParameters.Request()
144 Parameter(
'stop_on_failure', Parameter.Type.BOOL, value).to_parameter_msg()
146 future = self.
param_cliparam_cli.call_async(req)
147 rclpy.spin_until_future_complete(self, future)
149 def shutdown(self) -> None:
150 self.
info_msginfo_msg(
'Shutting down')
153 self.
info_msginfo_msg(
'Destroyed follow_waypoints action client')
155 transition_service =
'lifecycle_manager_navigation/manage_nodes'
156 mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
157 while not mgr_client.wait_for_service(timeout_sec=1.0):
158 self.
info_msginfo_msg(f
'{transition_service} service not available, waiting...')
160 req = ManageLifecycleNodes.Request()
161 req.command = ManageLifecycleNodes.Request().SHUTDOWN
162 future = mgr_client.call_async(req)
164 rclpy.spin_until_future_complete(self, future)
166 except Exception
as e:
167 self.
error_msgerror_msg(f
'{transition_service} service call failed {e!r}')
169 self.
info_msginfo_msg(f
'{transition_service} finished')
171 transition_service =
'lifecycle_manager_localization/manage_nodes'
172 mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
173 while not mgr_client.wait_for_service(timeout_sec=1.0):
174 self.
info_msginfo_msg(f
'{transition_service} service not available, waiting...')
176 req = ManageLifecycleNodes.Request()
177 req.command = ManageLifecycleNodes.Request().SHUTDOWN
178 future = mgr_client.call_async(req)
180 rclpy.spin_until_future_complete(self, future)
182 except Exception
as e:
183 self.
error_msgerror_msg(f
'{transition_service} service call failed {e!r}')
185 self.
info_msginfo_msg(f
'{transition_service} finished')
187 def cancel_goal(self) -> None:
188 cancel_future = self.
goal_handlegoal_handle.cancel_goal_async()
189 rclpy.spin_until_future_complete(self, cancel_future)
191 def info_msg(self, msg: str) ->
None:
192 self.get_logger().info(msg)
194 def warn_msg(self, msg: str) ->
None:
195 self.get_logger().warn(msg)
197 def error_msg(self, msg: str) ->
None:
198 self.get_logger().error(msg)
201 def main(argv: list[str] = sys.argv[1:]):
207 wps = [[-0.52, -0.54], [0.58, -0.55], [1.78, -0.57]]
208 starting_pose = [-2.0, -0.5]
211 test.setWaypoints(wps)
215 while not test.initial_pose_received
and retry_count <= retries:
217 test.info_msg(
'Setting initial pose')
218 test.setInitialPose(starting_pose)
219 test.info_msg(
'Waiting for amcl_pose to be received')
220 rclpy.spin_once(test, timeout_sec=1.0)
222 result = test.run(
True,
False)
226 test.setWaypoints([starting_pose])
227 result = test.run(
False,
False)
229 test.setWaypoints([wps[1]])
230 result = test.run(
False,
False)
238 test.setWaypoints([[100.0, 100.0]])
239 result = test.run(
True,
False)
243 test.action_result.missed_waypoints[0].error_code
244 == ComputePathToPose.Result().GOAL_OUTSIDE_MAP
246 assert (test.action_result.missed_waypoints[0].error_msg !=
'')
249 test.setStopFailureParam(
True)
250 bwps = [[-0.52, -0.54], [100.0, 100.0], [0.58, 0.52]]
251 test.setWaypoints(bwps)
252 result = test.run(
True,
False)
255 mwps = test.action_result.missed_waypoints
256 result = (len(mwps) == 1) & (mwps[0] == 1)
257 test.setStopFailureParam(
False)
260 test.setWaypoints([])
261 result = test.run(
True,
False)
264 test.setWaypoints(wps)
265 result = test.run(
True,
True)
270 test.info_msg(
'Done Shutting Down.')
273 test.info_msg(
'Exiting failed')
276 test.info_msg(
'Exiting passed')
280 if __name__ ==
'__main__':
None poseCallback(self, PoseWithCovarianceStamped msg)
None info_msg(self, str msg)
None error_msg(self, str msg)
None publishInitialPose(self)