19 from action_msgs.msg
import GoalStatus
20 from geographic_msgs.msg
import GeoPose
21 from nav2_msgs.action
import ComputePathToPose, FollowGPSWaypoints
22 from nav2_msgs.srv
import ManageLifecycleNodes
23 from rcl_interfaces.srv
import SetParameters
27 from rclpy.action
import ActionClient
28 from rclpy.node
import Node
29 from rclpy.parameter
import Parameter
35 super().__init__(node_name=
'nav2_gps_waypoint_tester', namespace=
'')
38 self, FollowGPSWaypoints,
'follow_gps_waypoints'
43 self.
param_cliparam_cli = self.create_client(
44 SetParameters,
'/waypoint_follower/set_parameters'
47 def setWaypoints(self, waypoints):
51 msg.position.latitude = wp[0]
52 msg.position.longitude = wp[1]
53 msg.orientation.w = 1.0
56 def run(self, block, cancel):
61 while not self.
action_clientaction_client.wait_for_server(timeout_sec=1.0):
63 "'follow_gps_waypoints' action server not available, waiting..."
66 action_request = FollowGPSWaypoints.Goal()
67 action_request.gps_poses = self.
waypointswaypoints
69 self.
info_msginfo_msg(
'Sending goal request...')
70 send_goal_future = self.
action_clientaction_client.send_goal_async(action_request)
72 rclpy.spin_until_future_complete(self, send_goal_future)
73 self.
goal_handlegoal_handle = send_goal_future.result()
74 except Exception
as e:
75 self.
error_msgerror_msg(f
'Service call failed {e!r}')
81 self.
info_msginfo_msg(
'Goal accepted')
85 get_result_future = self.
goal_handlegoal_handle.get_result_async()
90 self.
info_msginfo_msg(
"Waiting for 'follow_gps_waypoints' action to complete")
92 rclpy.spin_until_future_complete(self, get_result_future)
93 status = get_result_future.result().status
94 result = get_result_future.result().result
96 except Exception
as e:
97 self.
error_msgerror_msg(f
'Service call failed {e!r}')
99 if status != GoalStatus.STATUS_SUCCEEDED:
100 self.
info_msginfo_msg(f
'Goal failed with status code: {status}')
102 if len(result.missed_waypoints) > 0:
104 'Goal failed to process all waypoints,'
105 ' missed {0} wps.'.format(len(result.missed_waypoints))
109 self.
info_msginfo_msg(
'Goal succeeded!')
112 def setStopFailureParam(self, value):
113 req = SetParameters.Request()
115 Parameter(
'stop_on_failure', Parameter.Type.BOOL, value).to_parameter_msg()
117 future = self.
param_cliparam_cli.call_async(req)
118 rclpy.spin_until_future_complete(self, future)
121 self.
info_msginfo_msg(
'Shutting down')
124 self.
info_msginfo_msg(
'Destroyed follow_gps_waypoints action client')
126 transition_service =
'lifecycle_manager_navigation/manage_nodes'
127 mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
128 while not mgr_client.wait_for_service(timeout_sec=1.0):
129 self.
info_msginfo_msg(f
'{transition_service} service not available, waiting...')
131 req = ManageLifecycleNodes.Request()
132 req.command = ManageLifecycleNodes.Request().SHUTDOWN
133 future = mgr_client.call_async(req)
135 rclpy.spin_until_future_complete(self, future)
137 except Exception
as e:
138 self.
error_msgerror_msg(f
'{transition_service} service call failed {e!r}')
140 self.
info_msginfo_msg(f
'{transition_service} finished')
142 def cancel_goal(self):
143 cancel_future = self.
goal_handlegoal_handle.cancel_goal_async()
144 rclpy.spin_until_future_complete(self, cancel_future)
146 def info_msg(self, msg: str):
147 self.get_logger().info(msg)
149 def warn_msg(self, msg: str):
150 self.get_logger().warn(msg)
152 def error_msg(self, msg: str):
153 self.get_logger().error(msg)
156 def main(argv=sys.argv[1:]):
162 wps = [[55.944831, -3.186998], [55.944818, -3.187075], [55.944782, -3.187060]]
165 test.setWaypoints(wps)
169 result = test.run(
True,
False)
173 test.setWaypoints([wps[0]])
174 result = test.run(
False,
False)
176 test.setWaypoints([wps[1]])
177 result = test.run(
False,
False)
185 test.setWaypoints([[35.0, -118.0]])
186 result = test.run(
True,
False)
190 test.action_result.missed_waypoints[0].error_code
191 == ComputePathToPose.Result().GOAL_OUTSIDE_MAP
195 test.setStopFailureParam(
True)
196 bwps = [[55.944831, -3.186998], [35.0, -118.0], [55.944782, -3.187060]]
197 test.setWaypoints(bwps)
198 result = test.run(
True,
False)
201 mwps = test.action_result.missed_waypoints
202 result = (len(mwps) == 1) & (mwps[0] == 1)
203 test.setStopFailureParam(
False)
206 test.setWaypoints([])
207 result = test.run(
True,
False)
210 test.setWaypoints(wps)
211 result = test.run(
True,
True)
216 test.info_msg(
'Done Shutting Down.')
219 test.info_msg(
'Exiting failed')
222 test.info_msg(
'Exiting passed')
226 if __name__ ==
'__main__':
def info_msg(self, str msg)
def error_msg(self, str msg)