18 from typing
import Optional
20 from action_msgs.msg
import GoalStatus
21 from geographic_msgs.msg
import GeoPose
22 from nav2_msgs.action
import ComputePathToPose, FollowGPSWaypoints
23 from nav2_msgs.srv
import ManageLifecycleNodes
24 from rcl_interfaces.srv
import SetParameters
26 from rclpy.action
import ActionClient
27 from rclpy.action.client
import ClientGoalHandle
28 from rclpy.node
import Node
29 from rclpy.parameter
import Parameter
34 def __init__(self) -> None:
35 super().__init__(node_name=
'nav2_gps_waypoint_tester', namespace=
'')
38 self, FollowGPSWaypoints,
'follow_gps_waypoints'
40 self.
goal_handlegoal_handle: Optional[ClientGoalHandle[
41 FollowGPSWaypoints.Goal, FollowGPSWaypoints.Result,
42 FollowGPSWaypoints.Feedback]] =
None
45 self.
param_cliparam_cli = self.create_client(
46 SetParameters,
'/waypoint_follower/set_parameters'
49 def setWaypoints(self, waypoints: list[list[float]]) ->
None:
53 msg.position.latitude = wp[0]
54 msg.position.longitude = wp[1]
55 msg.orientation.w = 1.0
58 def run(self, block: bool, cancel: bool) -> bool:
63 while not self.
action_clientaction_client.wait_for_server(timeout_sec=1.0):
65 "'follow_gps_waypoints' action server not available, waiting..."
68 action_request = FollowGPSWaypoints.Goal()
69 action_request.gps_poses = self.
waypointswaypoints
71 self.
info_msginfo_msg(
'Sending goal request...')
72 send_goal_future = self.
action_clientaction_client.send_goal_async(action_request)
74 rclpy.spin_until_future_complete(self, send_goal_future)
75 self.
goal_handlegoal_handle = send_goal_future.result()
76 except Exception
as e:
77 self.
error_msgerror_msg(f
'Service call failed {e!r}')
83 self.
info_msginfo_msg(
'Goal accepted')
87 get_result_future = self.
goal_handlegoal_handle.get_result_async()
92 self.
info_msginfo_msg(
"Waiting for 'follow_gps_waypoints' action to complete")
94 rclpy.spin_until_future_complete(self, get_result_future)
95 status = get_result_future.result().status
96 result = get_result_future.result().result
98 except Exception
as e:
99 self.
error_msgerror_msg(f
'Service call failed {e!r}')
101 if status != GoalStatus.STATUS_SUCCEEDED:
102 self.
info_msginfo_msg(f
'Goal failed with status code: {status}')
104 if len(result.missed_waypoints) > 0:
106 'Goal failed to process all waypoints,'
107 f
' missed {len(result.missed_waypoints)} wps.'
111 self.
info_msginfo_msg(
'Goal succeeded!')
114 def setStopFailureParam(self, value: bool) ->
None:
115 req = SetParameters.Request()
117 Parameter(
'stop_on_failure', Parameter.Type.BOOL, value).to_parameter_msg()
119 future = self.
param_cliparam_cli.call_async(req)
120 rclpy.spin_until_future_complete(self, future)
122 def shutdown(self) -> None:
123 self.
info_msginfo_msg(
'Shutting down')
126 self.
info_msginfo_msg(
'Destroyed follow_gps_waypoints action client')
128 transition_service =
'lifecycle_manager_navigation/manage_nodes'
129 mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
130 while not mgr_client.wait_for_service(timeout_sec=1.0):
131 self.
info_msginfo_msg(f
'{transition_service} service not available, waiting...')
133 req = ManageLifecycleNodes.Request()
134 req.command = ManageLifecycleNodes.Request().SHUTDOWN
135 future = mgr_client.call_async(req)
137 rclpy.spin_until_future_complete(self, future)
139 except Exception
as e:
140 self.
error_msgerror_msg(f
'{transition_service} service call failed {e!r}')
142 self.
info_msginfo_msg(f
'{transition_service} finished')
144 def cancel_goal(self) -> None:
145 cancel_future = self.
goal_handlegoal_handle.cancel_goal_async()
146 rclpy.spin_until_future_complete(self, cancel_future)
148 def info_msg(self, msg: str) ->
None:
149 self.get_logger().info(msg)
151 def warn_msg(self, msg: str) ->
None:
152 self.get_logger().warn(msg)
154 def error_msg(self, msg: str) ->
None:
155 self.get_logger().error(msg)
158 def main(argv: list[str] = sys.argv[1:]):
164 wps = [[55.944831, -3.186998], [55.944818, -3.187075], [55.944782, -3.187060]]
167 test.setWaypoints(wps)
171 result = test.run(
True,
False)
175 test.setWaypoints([wps[0]])
176 result = test.run(
False,
False)
178 test.setWaypoints([wps[1]])
179 result = test.run(
False,
False)
187 test.setWaypoints([[55.929834, -3.184343]])
188 result = test.run(
True,
False)
192 test.action_result.missed_waypoints[0].error_code
193 == ComputePathToPose.Result().GOAL_OUTSIDE_MAP
195 assert (test.action_result.missed_waypoints[0].error_msg !=
'')
198 test.setStopFailureParam(
True)
199 bwps = [[55.944831, -3.186998], [55.929834, -3.184343], [55.944782, -3.187060]]
200 test.setWaypoints(bwps)
201 result = test.run(
True,
False)
204 mwps = test.action_result.missed_waypoints
205 result = (len(mwps) == 1) & (mwps[0] == 1)
206 test.setStopFailureParam(
False)
209 test.setWaypoints([])
210 result = test.run(
True,
False)
213 test.setWaypoints(wps)
214 result = test.run(
True,
True)
219 test.info_msg(
'Done Shutting Down.')
222 test.info_msg(
'Exiting failed')
225 test.info_msg(
'Exiting passed')
229 if __name__ ==
'__main__':
None info_msg(self, str msg)
None error_msg(self, str msg)