Nav2 Navigation Stack - kilted  kilted
ROS 2 Navigation Stack
tester.py
1 #! /usr/bin/env python3
2 # Copyright 2019 Samsung Research America
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 
16 import sys
17 import time
18 from typing import Optional
19 
20 from action_msgs.msg import GoalStatus
21 from geographic_msgs.msg import GeoPose
22 from nav2_msgs.action import ComputePathToPose, FollowGPSWaypoints
23 from nav2_msgs.srv import ManageLifecycleNodes
24 from rcl_interfaces.srv import SetParameters
25 import rclpy
26 from rclpy.action import ActionClient # type: ignore[attr-defined]
27 from rclpy.action.client import ClientGoalHandle
28 from rclpy.node import Node
29 from rclpy.parameter import Parameter
30 
31 
33 
34  def __init__(self) -> None:
35  super().__init__(node_name='nav2_gps_waypoint_tester', namespace='')
36  self.waypointswaypoints: list[float] = []
37  self.action_clientaction_client = ActionClient(
38  self, FollowGPSWaypoints, 'follow_gps_waypoints'
39  )
40  self.goal_handlegoal_handle: Optional[ClientGoalHandle[
41  FollowGPSWaypoints.Goal, FollowGPSWaypoints.Result,
42  FollowGPSWaypoints.Feedback]] = None
43  self.action_resultaction_result = FollowGPSWaypoints.Result()
44 
45  self.param_cliparam_cli = self.create_client(
46  SetParameters, '/waypoint_follower/set_parameters'
47  )
48 
49  def setWaypoints(self, waypoints: list[list[float]]) -> None:
50  self.waypointswaypoints = []
51  for wp in waypoints:
52  msg = GeoPose()
53  msg.position.latitude = wp[0]
54  msg.position.longitude = wp[1]
55  msg.orientation.w = 1.0
56  self.waypointswaypoints.append(msg)
57 
58  def run(self, block: bool, cancel: bool) -> bool:
59  # if not self.waypoints:
60  # rclpy.error_msg('Did not set valid waypoints before running test!')
61  # return False
62 
63  while not self.action_clientaction_client.wait_for_server(timeout_sec=1.0):
64  self.info_msginfo_msg(
65  "'follow_gps_waypoints' action server not available, waiting..."
66  )
67 
68  action_request = FollowGPSWaypoints.Goal()
69  action_request.gps_poses = self.waypointswaypoints
70 
71  self.info_msginfo_msg('Sending goal request...')
72  send_goal_future = self.action_clientaction_client.send_goal_async(action_request)
73  try:
74  rclpy.spin_until_future_complete(self, send_goal_future)
75  self.goal_handlegoal_handle = send_goal_future.result()
76  except Exception as e: # noqa: B902
77  self.error_msgerror_msg(f'Service call failed {e!r}')
78 
79  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
80  self.error_msgerror_msg('Goal rejected')
81  return False
82 
83  self.info_msginfo_msg('Goal accepted')
84  if not block:
85  return True
86 
87  get_result_future = self.goal_handlegoal_handle.get_result_async()
88  if cancel:
89  time.sleep(2)
90  self.cancel_goalcancel_goal()
91 
92  self.info_msginfo_msg("Waiting for 'follow_gps_waypoints' action to complete")
93  try:
94  rclpy.spin_until_future_complete(self, get_result_future)
95  status = get_result_future.result().status # type: ignore[union-attr]
96  result = get_result_future.result().result # type: ignore[union-attr]
97  self.action_resultaction_result = result
98  except Exception as e: # noqa: B902
99  self.error_msgerror_msg(f'Service call failed {e!r}')
100 
101  if status != GoalStatus.STATUS_SUCCEEDED:
102  self.info_msginfo_msg(f'Goal failed with status code: {status}')
103  return False
104  if len(result.missed_waypoints) > 0:
105  self.info_msginfo_msg(
106  'Goal failed to process all waypoints,'
107  f' missed {len(result.missed_waypoints)} wps.'
108  )
109  return False
110 
111  self.info_msginfo_msg('Goal succeeded!')
112  return True
113 
114  def setStopFailureParam(self, value: bool) -> None:
115  req = SetParameters.Request()
116  req.parameters = [
117  Parameter('stop_on_failure', Parameter.Type.BOOL, value).to_parameter_msg()
118  ]
119  future = self.param_cliparam_cli.call_async(req)
120  rclpy.spin_until_future_complete(self, future)
121 
122  def shutdown(self) -> None:
123  self.info_msginfo_msg('Shutting down')
124 
125  self.action_clientaction_client.destroy()
126  self.info_msginfo_msg('Destroyed follow_gps_waypoints action client')
127 
128  transition_service = 'lifecycle_manager_navigation/manage_nodes'
129  mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
130  while not mgr_client.wait_for_service(timeout_sec=1.0):
131  self.info_msginfo_msg(f'{transition_service} service not available, waiting...')
132 
133  req = ManageLifecycleNodes.Request()
134  req.command = ManageLifecycleNodes.Request().SHUTDOWN
135  future = mgr_client.call_async(req)
136  try:
137  rclpy.spin_until_future_complete(self, future)
138  future.result()
139  except Exception as e: # noqa: B902
140  self.error_msgerror_msg(f'{transition_service} service call failed {e!r}')
141 
142  self.info_msginfo_msg(f'{transition_service} finished')
143 
144  def cancel_goal(self) -> None:
145  cancel_future = self.goal_handlegoal_handle.cancel_goal_async() # type: ignore[union-attr]
146  rclpy.spin_until_future_complete(self, cancel_future)
147 
148  def info_msg(self, msg: str) -> None:
149  self.get_logger().info(msg)
150 
151  def warn_msg(self, msg: str) -> None:
152  self.get_logger().warn(msg)
153 
154  def error_msg(self, msg: str) -> None:
155  self.get_logger().error(msg)
156 
157 
158 def main(argv: list[str] = sys.argv[1:]): # type: ignore[no-untyped-def]
159  rclpy.init()
160 
161  # wait a few seconds to make sure entire stacks are up
162  time.sleep(10)
163 
164  wps = [[55.944831, -3.186998], [55.944818, -3.187075], [55.944782, -3.187060]]
165 
166  test = GpsWaypointFollowerTest()
167  test.setWaypoints(wps)
168 
169  # wait for poseCallback
170 
171  result = test.run(True, False)
172  assert result
173 
174  # preempt with new point
175  test.setWaypoints([wps[0]])
176  result = test.run(False, False)
177  time.sleep(2)
178  test.setWaypoints([wps[1]])
179  result = test.run(False, False)
180 
181  # cancel
182  time.sleep(2)
183  test.cancel_goal()
184 
185  # set waypoint outside of map but not outide the utm zone
186  time.sleep(2)
187  test.setWaypoints([[55.929834, -3.184343]])
188  result = test.run(True, False)
189  assert not result
190  result = not result
191  assert (
192  test.action_result.missed_waypoints[0].error_code
193  == ComputePathToPose.Result().GOAL_OUTSIDE_MAP
194  )
195  assert (test.action_result.missed_waypoints[0].error_msg != '')
196 
197  # stop on failure test with bogus waypoint
198  test.setStopFailureParam(True)
199  bwps = [[55.944831, -3.186998], [55.929834, -3.184343], [55.944782, -3.187060]]
200  test.setWaypoints(bwps)
201  result = test.run(True, False)
202  assert not result
203  result = not result
204  mwps = test.action_result.missed_waypoints
205  result = (len(mwps) == 1) & (mwps[0] == 1)
206  test.setStopFailureParam(False)
207 
208  # Zero goal test
209  test.setWaypoints([])
210  result = test.run(True, False)
211 
212  # Cancel test
213  test.setWaypoints(wps)
214  result = test.run(True, True)
215  assert not result
216  result = not result
217 
218  test.shutdown()
219  test.info_msg('Done Shutting Down.')
220 
221  if not result:
222  test.info_msg('Exiting failed')
223  exit(1)
224  else:
225  test.info_msg('Exiting passed')
226  exit(0)
227 
228 
229 if __name__ == '__main__':
230  main()
None info_msg(self, str msg)
Definition: tester.py:148
None error_msg(self, str msg)
Definition: tester.py:154