19 from action_msgs.msg
import GoalStatus
20 from geometry_msgs.msg
import PoseStamped, PoseWithCovarianceStamped
21 from nav2_msgs.action
import ComputePathToPose, FollowWaypoints
22 from nav2_msgs.srv
import ManageLifecycleNodes
23 from rcl_interfaces.srv
import SetParameters
26 from rclpy.action
import ActionClient
27 from rclpy.node
import Node
28 from rclpy.parameter
import Parameter
29 from rclpy.qos
import QoSDurabilityPolicy, QoSHistoryPolicy, QoSReliabilityPolicy
30 from rclpy.qos
import QoSProfile
36 super().__init__(node_name=
'nav2_waypoint_tester', namespace=
'')
38 self.
action_clientaction_client = ActionClient(self, FollowWaypoints,
'follow_waypoints')
40 PoseWithCovarianceStamped,
'initialpose', 10
46 pose_qos = QoSProfile(
47 durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
48 reliability=QoSReliabilityPolicy.RELIABLE,
49 history=QoSHistoryPolicy.KEEP_LAST,
54 PoseWithCovarianceStamped,
'amcl_pose', self.
poseCallbackposeCallback, pose_qos
56 self.
param_cliparam_cli = self.create_client(
57 SetParameters,
'/waypoint_follower/set_parameters'
60 def setInitialPose(self, pose):
61 self.
init_poseinit_pose = PoseWithCovarianceStamped()
62 self.
init_poseinit_pose.pose.pose.position.x = pose[0]
63 self.
init_poseinit_pose.pose.pose.position.y = pose[1]
64 self.
init_poseinit_pose.header.frame_id =
'map'
68 def poseCallback(self, msg):
69 self.
info_msginfo_msg(
'Received amcl_pose')
72 def setWaypoints(self, waypoints):
76 msg.header.frame_id =
'map'
77 msg.pose.position.x = wp[0]
78 msg.pose.position.y = wp[1]
79 msg.pose.orientation.w = 1.0
82 def run(self, block, cancel):
87 while not self.
action_clientaction_client.wait_for_server(timeout_sec=1.0):
88 self.
info_msginfo_msg(
"'follow_waypoints' action server not available, waiting...")
90 action_request = FollowWaypoints.Goal()
91 action_request.poses = self.
waypointswaypoints
93 self.
info_msginfo_msg(
'Sending goal request...')
94 send_goal_future = self.
action_clientaction_client.send_goal_async(action_request)
96 rclpy.spin_until_future_complete(self, send_goal_future)
97 self.
goal_handlegoal_handle = send_goal_future.result()
98 except Exception
as e:
99 self.
error_msgerror_msg(f
'Service call failed {e!r}')
105 self.
info_msginfo_msg(
'Goal accepted')
109 get_result_future = self.
goal_handlegoal_handle.get_result_async()
114 self.
info_msginfo_msg(
"Waiting for 'follow_waypoints' action to complete")
116 rclpy.spin_until_future_complete(self, get_result_future)
117 status = get_result_future.result().status
118 result = get_result_future.result().result
120 except Exception
as e:
121 self.
error_msgerror_msg(f
'Service call failed {e!r}')
123 if status != GoalStatus.STATUS_SUCCEEDED:
124 self.
info_msginfo_msg(f
'Goal failed with status code: {status}')
126 if len(self.
action_resultaction_result.missed_waypoints) > 0:
128 'Goal failed to process all waypoints,'
129 ' missed {0} wps.'.format(len(self.
action_resultaction_result.missed_waypoints))
133 self.
info_msginfo_msg(
'Goal succeeded!')
136 def publishInitialPose(self):
139 def setStopFailureParam(self, value):
140 req = SetParameters.Request()
142 Parameter(
'stop_on_failure', Parameter.Type.BOOL, value).to_parameter_msg()
144 future = self.
param_cliparam_cli.call_async(req)
145 rclpy.spin_until_future_complete(self, future)
148 self.
info_msginfo_msg(
'Shutting down')
151 self.
info_msginfo_msg(
'Destroyed follow_waypoints action client')
153 transition_service =
'lifecycle_manager_navigation/manage_nodes'
154 mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
155 while not mgr_client.wait_for_service(timeout_sec=1.0):
156 self.
info_msginfo_msg(f
'{transition_service} service not available, waiting...')
158 req = ManageLifecycleNodes.Request()
159 req.command = ManageLifecycleNodes.Request().SHUTDOWN
160 future = mgr_client.call_async(req)
162 rclpy.spin_until_future_complete(self, future)
164 except Exception
as e:
165 self.
error_msgerror_msg(f
'{transition_service} service call failed {e!r}')
167 self.
info_msginfo_msg(f
'{transition_service} finished')
169 transition_service =
'lifecycle_manager_localization/manage_nodes'
170 mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
171 while not mgr_client.wait_for_service(timeout_sec=1.0):
172 self.
info_msginfo_msg(f
'{transition_service} service not available, waiting...')
174 req = ManageLifecycleNodes.Request()
175 req.command = ManageLifecycleNodes.Request().SHUTDOWN
176 future = mgr_client.call_async(req)
178 rclpy.spin_until_future_complete(self, future)
180 except Exception
as e:
181 self.
error_msgerror_msg(f
'{transition_service} service call failed {e!r}')
183 self.
info_msginfo_msg(f
'{transition_service} finished')
185 def cancel_goal(self):
186 cancel_future = self.
goal_handlegoal_handle.cancel_goal_async()
187 rclpy.spin_until_future_complete(self, cancel_future)
189 def info_msg(self, msg: str):
190 self.get_logger().info(msg)
192 def warn_msg(self, msg: str):
193 self.get_logger().warn(msg)
195 def error_msg(self, msg: str):
196 self.get_logger().error(msg)
199 def main(argv=sys.argv[1:]):
205 wps = [[-0.52, -0.54], [0.58, -0.55], [1.78, -0.57]]
206 starting_pose = [-2.0, -0.5]
209 test.setWaypoints(wps)
213 while not test.initial_pose_received
and retry_count <= retries:
215 test.info_msg(
'Setting initial pose')
216 test.setInitialPose(starting_pose)
217 test.info_msg(
'Waiting for amcl_pose to be received')
218 rclpy.spin_once(test, timeout_sec=1.0)
220 result = test.run(
True,
False)
224 test.setWaypoints([starting_pose])
225 result = test.run(
False,
False)
227 test.setWaypoints([wps[1]])
228 result = test.run(
False,
False)
236 test.setWaypoints([[100.0, 100.0]])
237 result = test.run(
True,
False)
241 test.action_result.missed_waypoints[0].error_code
242 == ComputePathToPose.Result().GOAL_OUTSIDE_MAP
246 test.setStopFailureParam(
True)
247 bwps = [[-0.52, -0.54], [100.0, 100.0], [0.58, 0.52]]
248 test.setWaypoints(bwps)
249 result = test.run(
True,
False)
252 mwps = test.action_result.missed_waypoints
253 result = (len(mwps) == 1) & (mwps[0] == 1)
254 test.setStopFailureParam(
False)
257 test.setWaypoints([])
258 result = test.run(
True,
False)
261 test.setWaypoints(wps)
262 result = test.run(
True,
True)
267 test.info_msg(
'Done Shutting Down.')
270 test.info_msg(
'Exiting failed')
273 test.info_msg(
'Exiting passed')
277 if __name__ ==
'__main__':
def publishInitialPose(self)
def info_msg(self, str msg)
def poseCallback(self, msg)
def error_msg(self, str msg)