19 from action_msgs.msg
import GoalStatus
20 from geometry_msgs.msg
import PoseStamped, PoseWithCovarianceStamped
21 from nav2_msgs.action
import FollowWaypoints
22 from nav2_msgs.srv
import ManageLifecycleNodes
25 from rclpy.action
import ActionClient
26 from rclpy.node
import Node
27 from rclpy.qos
import QoSDurabilityPolicy, QoSHistoryPolicy, QoSReliabilityPolicy
28 from rclpy.qos
import QoSProfile
34 super().__init__(node_name=
'nav2_waypoint_tester', namespace=
'')
36 self.
action_clientaction_client = ActionClient(self, FollowWaypoints,
'follow_waypoints')
37 self.
initial_pose_pubinitial_pose_pub = self.create_publisher(PoseWithCovarianceStamped,
42 pose_qos = QoSProfile(
43 durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
44 reliability=QoSReliabilityPolicy.RELIABLE,
45 history=QoSHistoryPolicy.KEEP_LAST,
48 self.
model_pose_submodel_pose_sub = self.create_subscription(PoseWithCovarianceStamped,
51 def setInitialPose(self, pose):
52 self.
init_poseinit_pose = PoseWithCovarianceStamped()
53 self.
init_poseinit_pose.pose.pose.position.x = pose[0]
54 self.
init_poseinit_pose.pose.pose.position.y = pose[1]
55 self.
init_poseinit_pose.header.frame_id =
'map'
59 def poseCallback(self, msg):
60 self.
info_msginfo_msg(
'Received amcl_pose')
63 def setWaypoints(self, waypoints):
67 msg.header.frame_id =
'map'
68 msg.pose.position.x = wp[0]
69 msg.pose.position.y = wp[1]
70 msg.pose.orientation.w = 1.0
75 rclpy.error_msg(
'Did not set valid waypoints before running test!')
78 while not self.
action_clientaction_client.wait_for_server(timeout_sec=1.0):
79 self.
info_msginfo_msg(
"'follow_waypoints' action server not available, waiting...")
81 action_request = FollowWaypoints.Goal()
82 action_request.poses = self.
waypointswaypoints
84 self.
info_msginfo_msg(
'Sending goal request...')
85 send_goal_future = self.
action_clientaction_client.send_goal_async(action_request)
87 rclpy.spin_until_future_complete(self, send_goal_future)
88 self.
goal_handlegoal_handle = send_goal_future.result()
89 except Exception
as e:
90 self.
error_msgerror_msg(f
'Service call failed {e!r}')
96 self.
info_msginfo_msg(
'Goal accepted')
100 get_result_future = self.
goal_handlegoal_handle.get_result_async()
102 self.
info_msginfo_msg(
"Waiting for 'follow_waypoints' action to complete")
104 rclpy.spin_until_future_complete(self, get_result_future)
105 status = get_result_future.result().status
106 result = get_result_future.result().result
107 except Exception
as e:
108 self.
error_msgerror_msg(f
'Service call failed {e!r}')
110 if status != GoalStatus.STATUS_SUCCEEDED:
111 self.
info_msginfo_msg(f
'Goal failed with status code: {status}')
113 if len(result.missed_waypoints) > 0:
114 self.
info_msginfo_msg(
'Goal failed to process all waypoints,'
115 ' missed {0} wps.'.format(len(result.missed_waypoints)))
118 self.
info_msginfo_msg(
'Goal succeeded!')
121 def publishInitialPose(self):
125 self.
info_msginfo_msg(
'Shutting down')
128 self.
info_msginfo_msg(
'Destroyed follow_waypoints action client')
130 transition_service =
'lifecycle_manager_navigation/manage_nodes'
131 mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
132 while not mgr_client.wait_for_service(timeout_sec=1.0):
133 self.
info_msginfo_msg(f
'{transition_service} service not available, waiting...')
135 req = ManageLifecycleNodes.Request()
136 req.command = ManageLifecycleNodes.Request().SHUTDOWN
137 future = mgr_client.call_async(req)
139 rclpy.spin_until_future_complete(self, future)
141 except Exception
as e:
142 self.
error_msgerror_msg(f
'{transition_service} service call failed {e!r}')
144 self.
info_msginfo_msg(f
'{transition_service} finished')
146 transition_service =
'lifecycle_manager_localization/manage_nodes'
147 mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
148 while not mgr_client.wait_for_service(timeout_sec=1.0):
149 self.
info_msginfo_msg(f
'{transition_service} service not available, waiting...')
151 req = ManageLifecycleNodes.Request()
152 req.command = ManageLifecycleNodes.Request().SHUTDOWN
153 future = mgr_client.call_async(req)
155 rclpy.spin_until_future_complete(self, future)
157 except Exception
as e:
158 self.
error_msgerror_msg(f
'{transition_service} service call failed {e!r}')
160 self.
info_msginfo_msg(f
'{transition_service} finished')
162 def cancel_goal(self):
163 cancel_future = self.
goal_handlegoal_handle.cancel_goal_async()
164 rclpy.spin_until_future_complete(self, cancel_future)
166 def info_msg(self, msg: str):
167 self.get_logger().info(msg)
169 def warn_msg(self, msg: str):
170 self.get_logger().warn(msg)
172 def error_msg(self, msg: str):
173 self.get_logger().error(msg)
176 def main(argv=sys.argv[1:]):
182 wps = [[-0.52, -0.54], [0.58, -0.55], [0.58, 0.52]]
183 starting_pose = [-2.0, -0.5]
186 test.setWaypoints(wps)
190 while not test.initial_pose_received
and retry_count <= retries:
192 test.info_msg(
'Setting initial pose')
193 test.setInitialPose(starting_pose)
194 test.info_msg(
'Waiting for amcl_pose to be received')
195 rclpy.spin_once(test, timeout_sec=1.0)
197 result = test.run(
True)
201 test.setWaypoints([starting_pose])
202 result = test.run(
False)
204 test.setWaypoints([wps[1]])
205 result = test.run(
False)
213 test.setWaypoints([[100.0, 100.0]])
214 result = test.run(
True)
219 test.info_msg(
'Done Shutting Down.')
222 test.info_msg(
'Exiting failed')
225 test.info_msg(
'Exiting passed')
229 if __name__ ==
'__main__':
def publishInitialPose(self)
def info_msg(self, str msg)
def poseCallback(self, msg)
def error_msg(self, str msg)