Nav2 Navigation Stack - rolling  main
ROS 2 Navigation Stack
tester.py
1 #! /usr/bin/env python3
2 # Copyright 2019 Samsung Research America
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 
16 import sys
17 import time
18 from typing import Optional
19 
20 from action_msgs.msg import GoalStatus
21 from geographic_msgs.msg import GeoPose
22 from nav2_msgs.action import ComputePathToPose, FollowGPSWaypoints
23 from nav2_msgs.srv import ManageLifecycleNodes
24 from rcl_interfaces.srv import SetParameters
25 import rclpy
26 from rclpy.action import ActionClient
27 from rclpy.action.client import ClientGoalHandle
28 from rclpy.client import Client
29 from rclpy.node import Node
30 from rclpy.parameter import Parameter
31 
32 
34 
35  def __init__(self) -> None:
36  super().__init__(node_name='nav2_gps_waypoint_tester', namespace='')
37  self.waypointswaypoints: list[float] = []
38  self.action_client: ActionClient[
39  FollowGPSWaypoints.Goal,
40  FollowGPSWaypoints.Result,
41  FollowGPSWaypoints.Feedback
42  ] = ActionClient(
43  self, FollowGPSWaypoints, 'follow_gps_waypoints')
44  self.goal_handlegoal_handle: Optional[ClientGoalHandle[
45  FollowGPSWaypoints.Goal, FollowGPSWaypoints.Result,
46  FollowGPSWaypoints.Feedback]] = None
47  self.action_resultaction_result = FollowGPSWaypoints.Result()
48 
49  self.param_cli: Client[SetParameters.Request, SetParameters.Response] \
50  = self.create_client(
51  SetParameters, '/waypoint_follower/set_parameters'
52  )
53 
54  def setWaypoints(self, waypoints: list[list[float]]) -> None:
55  self.waypointswaypoints = []
56  for wp in waypoints:
57  msg = GeoPose()
58  msg.position.latitude = wp[0]
59  msg.position.longitude = wp[1]
60  msg.orientation.w = 1.0
61  self.waypointswaypoints.append(msg)
62 
63  def run(self, block: bool, cancel: bool) -> bool:
64  # if not self.waypoints:
65  # rclpy.error_msg('Did not set valid waypoints before running test!')
66  # return False
67 
68  while not self.action_client.wait_for_server(timeout_sec=1.0):
69  self.info_msginfo_msg(
70  "'follow_gps_waypoints' action server not available, waiting..."
71  )
72 
73  action_request = FollowGPSWaypoints.Goal()
74  action_request.gps_poses = self.waypointswaypoints
75 
76  self.info_msginfo_msg('Sending goal request...')
77  send_goal_future = self.action_client.send_goal_async(action_request)
78  try:
79  rclpy.spin_until_future_complete(self, send_goal_future)
80  self.goal_handlegoal_handle = send_goal_future.result()
81  except Exception as e: # noqa: B902
82  self.error_msgerror_msg(f'Service call failed {e!r}')
83 
84  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
85  self.error_msgerror_msg('Goal rejected')
86  return False
87 
88  self.info_msginfo_msg('Goal accepted')
89  if not block:
90  return True
91 
92  get_result_future = self.goal_handlegoal_handle.get_result_async()
93  if cancel:
94  time.sleep(2)
95  self.cancel_goalcancel_goal()
96 
97  self.info_msginfo_msg("Waiting for 'follow_gps_waypoints' action to complete")
98  try:
99  rclpy.spin_until_future_complete(self, get_result_future)
100  status = get_result_future.result().status # type: ignore[union-attr]
101  result = get_result_future.result().result # type: ignore[union-attr]
102  self.action_resultaction_result = result
103  except Exception as e: # noqa: B902
104  self.error_msgerror_msg(f'Service call failed {e!r}')
105 
106  if status != GoalStatus.STATUS_SUCCEEDED:
107  self.info_msginfo_msg(f'Goal failed with status code: {status}')
108  return False
109  if len(result.missed_waypoints) > 0:
110  self.info_msginfo_msg(
111  'Goal failed to process all waypoints,'
112  f' missed {len(result.missed_waypoints)} wps.'
113  )
114  return False
115 
116  self.info_msginfo_msg('Goal succeeded!')
117  return True
118 
119  def setStopFailureParam(self, value: bool) -> None:
120  req = SetParameters.Request()
121  req.parameters = [
122  Parameter('stop_on_failure', Parameter.Type.BOOL, value).to_parameter_msg()
123  ]
124  future = self.param_cli.call_async(req)
125  rclpy.spin_until_future_complete(self, future)
126 
127  def shutdown(self) -> None:
128  self.info_msginfo_msg('Shutting down')
129 
130  self.action_client.destroy()
131  self.info_msginfo_msg('Destroyed follow_gps_waypoints action client')
132 
133  transition_service = 'lifecycle_manager_navigation/manage_nodes'
134  mgr_client: Client[ManageLifecycleNodes.Request, ManageLifecycleNodes.Response] \
135  = self.create_client(ManageLifecycleNodes, transition_service)
136  while not mgr_client.wait_for_service(timeout_sec=1.0):
137  self.info_msginfo_msg(f'{transition_service} service not available, waiting...')
138 
139  req = ManageLifecycleNodes.Request()
140  req.command = ManageLifecycleNodes.Request().SHUTDOWN
141  future = mgr_client.call_async(req)
142  try:
143  rclpy.spin_until_future_complete(self, future)
144  future.result()
145  except Exception as e: # noqa: B902
146  self.error_msgerror_msg(f'{transition_service} service call failed {e!r}')
147 
148  self.info_msginfo_msg(f'{transition_service} finished')
149 
150  def cancel_goal(self) -> None:
151  cancel_future = self.goal_handlegoal_handle.cancel_goal_async() # type: ignore[union-attr]
152  rclpy.spin_until_future_complete(self, cancel_future)
153 
154  def info_msg(self, msg: str) -> None:
155  self.get_logger().info(msg)
156 
157  def warn_msg(self, msg: str) -> None:
158  self.get_logger().warning(msg)
159 
160  def error_msg(self, msg: str) -> None:
161  self.get_logger().error(msg)
162 
163 
164 def main(argv: list[str] = sys.argv[1:]): # type: ignore[no-untyped-def]
165  rclpy.init()
166 
167  # wait a few seconds to make sure entire stacks are up
168  time.sleep(10)
169 
170  wps = [[55.944831, -3.186998], [55.944818, -3.187075], [55.944782, -3.187060]]
171 
172  test = GpsWaypointFollowerTest()
173  test.setWaypoints(wps)
174 
175  # wait for poseCallback
176 
177  result = test.run(True, False)
178  assert result
179 
180  # preempt with new point
181  test.setWaypoints([wps[0]])
182  result = test.run(False, False)
183  time.sleep(2)
184  test.setWaypoints([wps[1]])
185  result = test.run(False, False)
186 
187  # cancel
188  time.sleep(2)
189  test.cancel_goal()
190 
191  # set waypoint outside of map but not outide the utm zone
192  time.sleep(2)
193  test.setWaypoints([[55.929834, -3.184343]])
194  result = test.run(True, False)
195  assert not result
196  result = not result
197  assert (
198  test.action_result.missed_waypoints[0].error_code
199  == ComputePathToPose.Result().GOAL_OUTSIDE_MAP
200  )
201  assert (test.action_result.missed_waypoints[0].error_msg != '')
202 
203  # stop on failure test with bogus waypoint
204  test.setStopFailureParam(True)
205  bwps = [[55.944831, -3.186998], [55.929834, -3.184343], [55.944782, -3.187060]]
206  test.setWaypoints(bwps)
207  result = test.run(True, False)
208  assert not result
209  result = not result
210  mwps = test.action_result.missed_waypoints
211  result = (len(mwps) == 1) & (mwps[0] == 1)
212  test.setStopFailureParam(False)
213 
214  # Zero goal test
215  test.setWaypoints([])
216  result = test.run(True, False)
217 
218  # Cancel test
219  test.setWaypoints(wps)
220  result = test.run(True, True)
221  assert not result
222  result = not result
223 
224  test.shutdown()
225  test.info_msg('Done Shutting Down.')
226 
227  if not result:
228  test.info_msg('Exiting failed')
229  exit(1)
230  else:
231  test.info_msg('Exiting passed')
232  exit(0)
233 
234 
235 if __name__ == '__main__':
236  main()
None info_msg(self, str msg)
Definition: tester.py:154
None error_msg(self, str msg)
Definition: tester.py:160