18 from typing
import Optional
20 from action_msgs.msg
import GoalStatus
21 from geographic_msgs.msg
import GeoPose
22 from nav2_msgs.action
import ComputePathToPose, FollowGPSWaypoints
23 from nav2_msgs.srv
import ManageLifecycleNodes
24 from rcl_interfaces.srv
import SetParameters
26 from rclpy.action
import ActionClient
27 from rclpy.action.client
import ClientGoalHandle
28 from rclpy.client
import Client
29 from rclpy.node
import Node
30 from rclpy.parameter
import Parameter
35 def __init__(self) -> None:
36 super().__init__(node_name=
'nav2_gps_waypoint_tester', namespace=
'')
38 self.action_client: ActionClient[
39 FollowGPSWaypoints.Goal,
40 FollowGPSWaypoints.Result,
41 FollowGPSWaypoints.Feedback
43 self, FollowGPSWaypoints,
'follow_gps_waypoints')
44 self.
goal_handlegoal_handle: Optional[ClientGoalHandle[
45 FollowGPSWaypoints.Goal, FollowGPSWaypoints.Result,
46 FollowGPSWaypoints.Feedback]] =
None
49 self.param_cli: Client[SetParameters.Request, SetParameters.Response] \
51 SetParameters,
'/waypoint_follower/set_parameters'
54 def setWaypoints(self, waypoints: list[list[float]]) ->
None:
58 msg.position.latitude = wp[0]
59 msg.position.longitude = wp[1]
60 msg.orientation.w = 1.0
63 def run(self, block: bool, cancel: bool) -> bool:
68 while not self.action_client.wait_for_server(timeout_sec=1.0):
70 "'follow_gps_waypoints' action server not available, waiting..."
73 action_request = FollowGPSWaypoints.Goal()
74 action_request.gps_poses = self.
waypointswaypoints
76 self.
info_msginfo_msg(
'Sending goal request...')
77 send_goal_future = self.action_client.send_goal_async(action_request)
79 rclpy.spin_until_future_complete(self, send_goal_future)
80 self.
goal_handlegoal_handle = send_goal_future.result()
81 except Exception
as e:
82 self.
error_msgerror_msg(f
'Service call failed {e!r}')
88 self.
info_msginfo_msg(
'Goal accepted')
92 get_result_future = self.
goal_handlegoal_handle.get_result_async()
97 self.
info_msginfo_msg(
"Waiting for 'follow_gps_waypoints' action to complete")
99 rclpy.spin_until_future_complete(self, get_result_future)
100 status = get_result_future.result().status
101 result = get_result_future.result().result
103 except Exception
as e:
104 self.
error_msgerror_msg(f
'Service call failed {e!r}')
106 if status != GoalStatus.STATUS_SUCCEEDED:
107 self.
info_msginfo_msg(f
'Goal failed with status code: {status}')
109 if len(result.missed_waypoints) > 0:
111 'Goal failed to process all waypoints,'
112 f
' missed {len(result.missed_waypoints)} wps.'
116 self.
info_msginfo_msg(
'Goal succeeded!')
119 def setStopFailureParam(self, value: bool) ->
None:
120 req = SetParameters.Request()
122 Parameter(
'stop_on_failure', Parameter.Type.BOOL, value).to_parameter_msg()
124 future = self.param_cli.call_async(req)
125 rclpy.spin_until_future_complete(self, future)
127 def shutdown(self) -> None:
128 self.
info_msginfo_msg(
'Shutting down')
130 self.action_client.destroy()
131 self.
info_msginfo_msg(
'Destroyed follow_gps_waypoints action client')
133 transition_service =
'lifecycle_manager_navigation/manage_nodes'
134 mgr_client: Client[ManageLifecycleNodes.Request, ManageLifecycleNodes.Response] \
135 = self.create_client(ManageLifecycleNodes, transition_service)
136 while not mgr_client.wait_for_service(timeout_sec=1.0):
137 self.
info_msginfo_msg(f
'{transition_service} service not available, waiting...')
139 req = ManageLifecycleNodes.Request()
140 req.command = ManageLifecycleNodes.Request().SHUTDOWN
141 future = mgr_client.call_async(req)
143 rclpy.spin_until_future_complete(self, future)
145 except Exception
as e:
146 self.
error_msgerror_msg(f
'{transition_service} service call failed {e!r}')
148 self.
info_msginfo_msg(f
'{transition_service} finished')
150 def cancel_goal(self) -> None:
151 cancel_future = self.
goal_handlegoal_handle.cancel_goal_async()
152 rclpy.spin_until_future_complete(self, cancel_future)
154 def info_msg(self, msg: str) ->
None:
155 self.get_logger().info(msg)
157 def warn_msg(self, msg: str) ->
None:
158 self.get_logger().warning(msg)
160 def error_msg(self, msg: str) ->
None:
161 self.get_logger().error(msg)
164 def main(argv: list[str] = sys.argv[1:]):
170 wps = [[55.944831, -3.186998], [55.944818, -3.187075], [55.944782, -3.187060]]
173 test.setWaypoints(wps)
177 result = test.run(
True,
False)
181 test.setWaypoints([wps[0]])
182 result = test.run(
False,
False)
184 test.setWaypoints([wps[1]])
185 result = test.run(
False,
False)
193 test.setWaypoints([[55.929834, -3.184343]])
194 result = test.run(
True,
False)
198 test.action_result.missed_waypoints[0].error_code
199 == ComputePathToPose.Result().GOAL_OUTSIDE_MAP
201 assert (test.action_result.missed_waypoints[0].error_msg !=
'')
204 test.setStopFailureParam(
True)
205 bwps = [[55.944831, -3.186998], [55.929834, -3.184343], [55.944782, -3.187060]]
206 test.setWaypoints(bwps)
207 result = test.run(
True,
False)
210 mwps = test.action_result.missed_waypoints
211 result = (len(mwps) == 1) & (mwps[0] == 1)
212 test.setStopFailureParam(
False)
215 test.setWaypoints([])
216 result = test.run(
True,
False)
219 test.setWaypoints(wps)
220 result = test.run(
True,
True)
225 test.info_msg(
'Done Shutting Down.')
228 test.info_msg(
'Exiting failed')
231 test.info_msg(
'Exiting passed')
235 if __name__ ==
'__main__':
None info_msg(self, str msg)
None error_msg(self, str msg)