Nav2 Navigation Stack - rolling  main
ROS 2 Navigation Stack
tester.py
1 #! /usr/bin/env python3
2 # Copyright 2019 Samsung Research America
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 
16 import sys
17 import time
18 from typing import Optional
19 
20 from action_msgs.msg import GoalStatus
21 from geometry_msgs.msg import PoseStamped, PoseWithCovarianceStamped
22 from nav2_msgs.action import ComputePathToPose, FollowWaypoints
23 from nav2_msgs.srv import ManageLifecycleNodes
24 from rcl_interfaces.srv import SetParameters
25 import rclpy
26 from rclpy.action import ActionClient
27 from rclpy.action.client import ClientGoalHandle
28 from rclpy.client import Client
29 from rclpy.node import Node
30 from rclpy.parameter import Parameter
31 from rclpy.qos import QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy
32 
33 
35 
36  def __init__(self) -> None:
37  super().__init__(node_name='nav2_waypoint_tester', namespace='')
38  self.waypointswaypoints: list[float] = []
39  self.action_client: ActionClient[
40  FollowWaypoints.Goal,
41  FollowWaypoints.Result,
42  FollowWaypoints.Feedback
43  ] = ActionClient(self, FollowWaypoints, 'follow_waypoints')
44  self.initial_pose_pubinitial_pose_pub = self.create_publisher(
45  PoseWithCovarianceStamped, 'initialpose', 10
46  )
47  self.initial_pose_receivedinitial_pose_received = False
48  self.goal_handlegoal_handle: Optional[ClientGoalHandle[
49  FollowWaypoints.Goal, FollowWaypoints.Result,
50  FollowWaypoints.Feedback]] = None
51  self.action_resultaction_result = FollowWaypoints.Result()
52 
53  pose_qos = QoSProfile(
54  durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
55  reliability=QoSReliabilityPolicy.RELIABLE,
56  history=QoSHistoryPolicy.KEEP_LAST,
57  depth=1,
58  )
59 
60  self.model_pose_submodel_pose_sub = self.create_subscription(
61  PoseWithCovarianceStamped, 'amcl_pose', self.poseCallbackposeCallback, pose_qos
62  )
63  self.param_cli: Client[SetParameters.Request, SetParameters.Response] = \
64  self.create_client(
65  SetParameters, '/waypoint_follower/set_parameters'
66  )
67 
68  def setInitialPose(self, pose: list[float]) -> None:
69  self.init_poseinit_pose = PoseWithCovarianceStamped()
70  self.init_poseinit_pose.pose.pose.position.x = pose[0]
71  self.init_poseinit_pose.pose.pose.position.y = pose[1]
72  self.init_poseinit_pose.header.frame_id = 'map'
73  self.publishInitialPosepublishInitialPose()
74  time.sleep(5)
75 
76  def poseCallback(self, msg: PoseWithCovarianceStamped) -> None:
77  self.info_msginfo_msg('Received amcl_pose')
78  self.initial_pose_receivedinitial_pose_received = True
79 
80  def setWaypoints(self, waypoints: list[list[float]]) -> None:
81  self.waypointswaypoints = []
82  for wp in waypoints:
83  msg = PoseStamped()
84  msg.header.frame_id = 'map'
85  msg.pose.position.x = wp[0]
86  msg.pose.position.y = wp[1]
87  msg.pose.orientation.w = 1.0
88  self.waypointswaypoints.append(msg)
89 
90  def run(self, block: bool, cancel: bool) -> bool:
91  # if not self.waypoints:
92  # rclpy.error_msg('Did not set valid waypoints before running test!')
93  # return False
94 
95  while not self.action_client.wait_for_server(timeout_sec=1.0):
96  self.info_msginfo_msg("'follow_waypoints' action server not available, waiting...")
97 
98  action_request = FollowWaypoints.Goal()
99  action_request.poses = self.waypointswaypoints
100 
101  self.info_msginfo_msg('Sending goal request...')
102  send_goal_future = self.action_client.send_goal_async(action_request)
103  try:
104  rclpy.spin_until_future_complete(self, send_goal_future)
105  self.goal_handlegoal_handle = send_goal_future.result()
106  except Exception as e: # noqa: B902
107  self.error_msgerror_msg(f'Service call failed {e!r}')
108 
109  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
110  self.error_msgerror_msg('Goal rejected')
111  return False
112 
113  self.info_msginfo_msg('Goal accepted')
114  if not block:
115  return True
116 
117  get_result_future = self.goal_handlegoal_handle.get_result_async()
118  if cancel:
119  time.sleep(2)
120  self.cancel_goalcancel_goal()
121 
122  self.info_msginfo_msg("Waiting for 'follow_waypoints' action to complete")
123  try:
124  rclpy.spin_until_future_complete(self, get_result_future)
125  status = get_result_future.result().status # type: ignore[union-attr]
126  result = get_result_future.result().result # type: ignore[union-attr]
127  self.action_resultaction_result = result
128  except Exception as e: # noqa: B902
129  self.error_msgerror_msg(f'Service call failed {e!r}')
130 
131  if status != GoalStatus.STATUS_SUCCEEDED:
132  self.info_msginfo_msg(f'Goal failed with status code: {status}')
133  return False
134  if len(self.action_resultaction_result.missed_waypoints) > 0:
135  self.info_msginfo_msg(
136  'Goal failed to process all waypoints,'
137  f' missed {len(self.action_result.missed_waypoints)} wps.'
138  )
139  return False
140 
141  self.info_msginfo_msg('Goal succeeded!')
142  return True
143 
144  def publishInitialPose(self) -> None:
145  self.initial_pose_pubinitial_pose_pub.publish(self.init_poseinit_pose)
146 
147  def setStopFailureParam(self, value: bool) -> None:
148  req = SetParameters.Request()
149  req.parameters = [
150  Parameter('stop_on_failure', Parameter.Type.BOOL, value).to_parameter_msg()
151  ]
152  future = self.param_cli.call_async(req)
153  rclpy.spin_until_future_complete(self, future)
154 
155  def shutdown(self) -> None:
156  self.info_msginfo_msg('Shutting down')
157 
158  self.action_client.destroy()
159  self.info_msginfo_msg('Destroyed follow_waypoints action client')
160 
161  transition_service = 'lifecycle_manager_navigation/manage_nodes'
162  mgr_client: Client[ManageLifecycleNodes.Request, ManageLifecycleNodes.Response] = \
163  self.create_client(ManageLifecycleNodes, transition_service)
164  while not mgr_client.wait_for_service(timeout_sec=1.0):
165  self.info_msginfo_msg(f'{transition_service} service not available, waiting...')
166 
167  req = ManageLifecycleNodes.Request()
168  req.command = ManageLifecycleNodes.Request().SHUTDOWN
169  future = mgr_client.call_async(req)
170  try:
171  rclpy.spin_until_future_complete(self, future)
172  future.result()
173  except Exception as e: # noqa: B902
174  self.error_msgerror_msg(f'{transition_service} service call failed {e!r}')
175 
176  self.info_msginfo_msg(f'{transition_service} finished')
177 
178  transition_service = 'lifecycle_manager_localization/manage_nodes'
179  mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
180  while not mgr_client.wait_for_service(timeout_sec=1.0):
181  self.info_msginfo_msg(f'{transition_service} service not available, waiting...')
182 
183  req = ManageLifecycleNodes.Request()
184  req.command = ManageLifecycleNodes.Request().SHUTDOWN
185  future = mgr_client.call_async(req)
186  try:
187  rclpy.spin_until_future_complete(self, future)
188  future.result()
189  except Exception as e: # noqa: B902
190  self.error_msgerror_msg(f'{transition_service} service call failed {e!r}')
191 
192  self.info_msginfo_msg(f'{transition_service} finished')
193 
194  def cancel_goal(self) -> None:
195  cancel_future = self.goal_handlegoal_handle.cancel_goal_async() # type: ignore[union-attr]
196  rclpy.spin_until_future_complete(self, cancel_future)
197 
198  def info_msg(self, msg: str) -> None:
199  self.get_logger().info(msg)
200 
201  def warn_msg(self, msg: str) -> None:
202  self.get_logger().warning(msg)
203 
204  def error_msg(self, msg: str) -> None:
205  self.get_logger().error(msg)
206 
207 
208 def main(argv: list[str] = sys.argv[1:]): # type: ignore[no-untyped-def]
209  rclpy.init()
210 
211  # wait a few seconds to make sure entire stacks are up
212  time.sleep(10)
213 
214  wps = [[-0.52, -0.54], [0.58, -0.55], [1.78, -0.57]]
215  starting_pose = [-2.0, -0.5]
216 
217  test = WaypointFollowerTest()
218  test.setWaypoints(wps)
219 
220  retry_count = 0
221  retries = 2
222  while not test.initial_pose_received and retry_count <= retries:
223  retry_count += 1
224  test.info_msg('Setting initial pose')
225  test.setInitialPose(starting_pose)
226  test.info_msg('Waiting for amcl_pose to be received')
227  rclpy.spin_once(test, timeout_sec=1.0) # wait for poseCallback
228 
229  result = test.run(True, False)
230  assert result
231 
232  # preempt with new point
233  test.setWaypoints([starting_pose])
234  result = test.run(False, False)
235  time.sleep(2)
236  test.setWaypoints([wps[1]])
237  result = test.run(False, False)
238 
239  # cancel
240  time.sleep(2)
241  test.cancel_goal()
242 
243  # set waypoint outside of map
244  time.sleep(2)
245  test.setWaypoints([[100.0, 100.0]])
246  result = test.run(True, False)
247  assert not result
248  result = not result
249  assert (
250  test.action_result.missed_waypoints[0].error_code
251  == ComputePathToPose.Result().GOAL_OUTSIDE_MAP
252  )
253  assert (test.action_result.missed_waypoints[0].error_msg != '')
254 
255  # stop on failure test with bogus waypoint
256  test.setStopFailureParam(True)
257  bwps = [[-0.52, -0.54], [100.0, 100.0], [0.58, 0.52]]
258  test.setWaypoints(bwps)
259  result = test.run(True, False)
260  assert not result
261  result = not result
262  mwps = test.action_result.missed_waypoints
263  result = (len(mwps) == 1) & (mwps[0] == 1)
264  test.setStopFailureParam(False)
265 
266  # Zero goal test
267  test.setWaypoints([])
268  result = test.run(True, False)
269 
270  # Cancel test
271  test.setWaypoints(wps)
272  result = test.run(True, True)
273  assert not result
274  result = not result
275 
276  test.shutdown()
277  test.info_msg('Done Shutting Down.')
278 
279  if not result:
280  test.info_msg('Exiting failed')
281  exit(1)
282  else:
283  test.info_msg('Exiting passed')
284  exit(0)
285 
286 
287 if __name__ == '__main__':
288  main()
None poseCallback(self, PoseWithCovarianceStamped msg)
Definition: tester.py:76
None info_msg(self, str msg)
Definition: tester.py:198
None error_msg(self, str msg)
Definition: tester.py:204
None publishInitialPose(self)
Definition: tester.py:144