Nav2 Navigation Stack - kilted  kilted
ROS 2 Navigation Stack
tester.py
1 #! /usr/bin/env python3
2 # Copyright 2019 Samsung Research America
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 
16 import sys
17 import time
18 from typing import Optional
19 
20 from action_msgs.msg import GoalStatus
21 from geometry_msgs.msg import PoseStamped, PoseWithCovarianceStamped
22 from nav2_msgs.action import ComputePathToPose, FollowWaypoints
23 from nav2_msgs.srv import ManageLifecycleNodes
24 from rcl_interfaces.srv import SetParameters
25 import rclpy
26 from rclpy.action import ActionClient # type: ignore[attr-defined]
27 from rclpy.action.client import ClientGoalHandle
28 from rclpy.node import Node
29 from rclpy.parameter import Parameter
30 from rclpy.qos import QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy
31 
32 
34 
35  def __init__(self) -> None:
36  super().__init__(node_name='nav2_waypoint_tester', namespace='')
37  self.waypointswaypoints: list[float] = []
38  self.action_clientaction_client = ActionClient(self, FollowWaypoints, 'follow_waypoints')
39  self.initial_pose_pubinitial_pose_pub = self.create_publisher(
40  PoseWithCovarianceStamped, 'initialpose', 10
41  )
42  self.initial_pose_receivedinitial_pose_received = False
43  self.goal_handlegoal_handle: Optional[ClientGoalHandle[
44  FollowWaypoints.Goal, FollowWaypoints.Result,
45  FollowWaypoints.Feedback]] = None
46  self.action_resultaction_result = FollowWaypoints.Result()
47 
48  pose_qos = QoSProfile(
49  durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
50  reliability=QoSReliabilityPolicy.RELIABLE,
51  history=QoSHistoryPolicy.KEEP_LAST,
52  depth=1,
53  )
54 
55  self.model_pose_submodel_pose_sub = self.create_subscription(
56  PoseWithCovarianceStamped, 'amcl_pose', self.poseCallbackposeCallback, pose_qos
57  )
58  self.param_cliparam_cli = self.create_client(
59  SetParameters, '/waypoint_follower/set_parameters'
60  )
61 
62  def setInitialPose(self, pose: list[float]) -> None:
63  self.init_poseinit_pose = PoseWithCovarianceStamped()
64  self.init_poseinit_pose.pose.pose.position.x = pose[0]
65  self.init_poseinit_pose.pose.pose.position.y = pose[1]
66  self.init_poseinit_pose.header.frame_id = 'map'
67  self.publishInitialPosepublishInitialPose()
68  time.sleep(5)
69 
70  def poseCallback(self, msg: PoseWithCovarianceStamped) -> None:
71  self.info_msginfo_msg('Received amcl_pose')
72  self.initial_pose_receivedinitial_pose_received = True
73 
74  def setWaypoints(self, waypoints: list[list[float]]) -> None:
75  self.waypointswaypoints = []
76  for wp in waypoints:
77  msg = PoseStamped()
78  msg.header.frame_id = 'map'
79  msg.pose.position.x = wp[0]
80  msg.pose.position.y = wp[1]
81  msg.pose.orientation.w = 1.0
82  self.waypointswaypoints.append(msg)
83 
84  def run(self, block: bool, cancel: bool) -> bool:
85  # if not self.waypoints:
86  # rclpy.error_msg('Did not set valid waypoints before running test!')
87  # return False
88 
89  while not self.action_clientaction_client.wait_for_server(timeout_sec=1.0):
90  self.info_msginfo_msg("'follow_waypoints' action server not available, waiting...")
91 
92  action_request = FollowWaypoints.Goal()
93  action_request.poses = self.waypointswaypoints
94 
95  self.info_msginfo_msg('Sending goal request...')
96  send_goal_future = self.action_clientaction_client.send_goal_async(action_request)
97  try:
98  rclpy.spin_until_future_complete(self, send_goal_future)
99  self.goal_handlegoal_handle = send_goal_future.result()
100  except Exception as e: # noqa: B902
101  self.error_msgerror_msg(f'Service call failed {e!r}')
102 
103  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
104  self.error_msgerror_msg('Goal rejected')
105  return False
106 
107  self.info_msginfo_msg('Goal accepted')
108  if not block:
109  return True
110 
111  get_result_future = self.goal_handlegoal_handle.get_result_async()
112  if cancel:
113  time.sleep(2)
114  self.cancel_goalcancel_goal()
115 
116  self.info_msginfo_msg("Waiting for 'follow_waypoints' action to complete")
117  try:
118  rclpy.spin_until_future_complete(self, get_result_future)
119  status = get_result_future.result().status # type: ignore[union-attr]
120  result = get_result_future.result().result # type: ignore[union-attr]
121  self.action_resultaction_result = result
122  except Exception as e: # noqa: B902
123  self.error_msgerror_msg(f'Service call failed {e!r}')
124 
125  if status != GoalStatus.STATUS_SUCCEEDED:
126  self.info_msginfo_msg(f'Goal failed with status code: {status}')
127  return False
128  if len(self.action_resultaction_result.missed_waypoints) > 0:
129  self.info_msginfo_msg(
130  'Goal failed to process all waypoints,'
131  f' missed {len(self.action_result.missed_waypoints)} wps.'
132  )
133  return False
134 
135  self.info_msginfo_msg('Goal succeeded!')
136  return True
137 
138  def publishInitialPose(self) -> None:
139  self.initial_pose_pubinitial_pose_pub.publish(self.init_poseinit_pose)
140 
141  def setStopFailureParam(self, value: bool) -> None:
142  req = SetParameters.Request()
143  req.parameters = [
144  Parameter('stop_on_failure', Parameter.Type.BOOL, value).to_parameter_msg()
145  ]
146  future = self.param_cliparam_cli.call_async(req)
147  rclpy.spin_until_future_complete(self, future)
148 
149  def shutdown(self) -> None:
150  self.info_msginfo_msg('Shutting down')
151 
152  self.action_clientaction_client.destroy()
153  self.info_msginfo_msg('Destroyed follow_waypoints action client')
154 
155  transition_service = 'lifecycle_manager_navigation/manage_nodes'
156  mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
157  while not mgr_client.wait_for_service(timeout_sec=1.0):
158  self.info_msginfo_msg(f'{transition_service} service not available, waiting...')
159 
160  req = ManageLifecycleNodes.Request()
161  req.command = ManageLifecycleNodes.Request().SHUTDOWN
162  future = mgr_client.call_async(req)
163  try:
164  rclpy.spin_until_future_complete(self, future)
165  future.result()
166  except Exception as e: # noqa: B902
167  self.error_msgerror_msg(f'{transition_service} service call failed {e!r}')
168 
169  self.info_msginfo_msg(f'{transition_service} finished')
170 
171  transition_service = 'lifecycle_manager_localization/manage_nodes'
172  mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
173  while not mgr_client.wait_for_service(timeout_sec=1.0):
174  self.info_msginfo_msg(f'{transition_service} service not available, waiting...')
175 
176  req = ManageLifecycleNodes.Request()
177  req.command = ManageLifecycleNodes.Request().SHUTDOWN
178  future = mgr_client.call_async(req)
179  try:
180  rclpy.spin_until_future_complete(self, future)
181  future.result()
182  except Exception as e: # noqa: B902
183  self.error_msgerror_msg(f'{transition_service} service call failed {e!r}')
184 
185  self.info_msginfo_msg(f'{transition_service} finished')
186 
187  def cancel_goal(self) -> None:
188  cancel_future = self.goal_handlegoal_handle.cancel_goal_async() # type: ignore[union-attr]
189  rclpy.spin_until_future_complete(self, cancel_future)
190 
191  def info_msg(self, msg: str) -> None:
192  self.get_logger().info(msg)
193 
194  def warn_msg(self, msg: str) -> None:
195  self.get_logger().warn(msg)
196 
197  def error_msg(self, msg: str) -> None:
198  self.get_logger().error(msg)
199 
200 
201 def main(argv: list[str] = sys.argv[1:]): # type: ignore[no-untyped-def]
202  rclpy.init()
203 
204  # wait a few seconds to make sure entire stacks are up
205  time.sleep(10)
206 
207  wps = [[-0.52, -0.54], [0.58, -0.55], [1.78, -0.57]]
208  starting_pose = [-2.0, -0.5]
209 
210  test = WaypointFollowerTest()
211  test.setWaypoints(wps)
212 
213  retry_count = 0
214  retries = 2
215  while not test.initial_pose_received and retry_count <= retries:
216  retry_count += 1
217  test.info_msg('Setting initial pose')
218  test.setInitialPose(starting_pose)
219  test.info_msg('Waiting for amcl_pose to be received')
220  rclpy.spin_once(test, timeout_sec=1.0) # wait for poseCallback
221 
222  result = test.run(True, False)
223  assert result
224 
225  # preempt with new point
226  test.setWaypoints([starting_pose])
227  result = test.run(False, False)
228  time.sleep(2)
229  test.setWaypoints([wps[1]])
230  result = test.run(False, False)
231 
232  # cancel
233  time.sleep(2)
234  test.cancel_goal()
235 
236  # set waypoint outside of map
237  time.sleep(2)
238  test.setWaypoints([[100.0, 100.0]])
239  result = test.run(True, False)
240  assert not result
241  result = not result
242  assert (
243  test.action_result.missed_waypoints[0].error_code
244  == ComputePathToPose.Result().GOAL_OUTSIDE_MAP
245  )
246  assert (test.action_result.missed_waypoints[0].error_msg != '')
247 
248  # stop on failure test with bogus waypoint
249  test.setStopFailureParam(True)
250  bwps = [[-0.52, -0.54], [100.0, 100.0], [0.58, 0.52]]
251  test.setWaypoints(bwps)
252  result = test.run(True, False)
253  assert not result
254  result = not result
255  mwps = test.action_result.missed_waypoints
256  result = (len(mwps) == 1) & (mwps[0] == 1)
257  test.setStopFailureParam(False)
258 
259  # Zero goal test
260  test.setWaypoints([])
261  result = test.run(True, False)
262 
263  # Cancel test
264  test.setWaypoints(wps)
265  result = test.run(True, True)
266  assert not result
267  result = not result
268 
269  test.shutdown()
270  test.info_msg('Done Shutting Down.')
271 
272  if not result:
273  test.info_msg('Exiting failed')
274  exit(1)
275  else:
276  test.info_msg('Exiting passed')
277  exit(0)
278 
279 
280 if __name__ == '__main__':
281  main()
None poseCallback(self, PoseWithCovarianceStamped msg)
Definition: tester.py:70
None info_msg(self, str msg)
Definition: tester.py:191
None error_msg(self, str msg)
Definition: tester.py:197
None publishInitialPose(self)
Definition: tester.py:138