Nav2 Navigation Stack - jazzy  jazzy
ROS 2 Navigation Stack
tester.py
1 #! /usr/bin/env python3
2 # Copyright 2019 Samsung Research America
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 
16 import sys
17 import time
18 
19 from action_msgs.msg import GoalStatus
20 from geographic_msgs.msg import GeoPose
21 from nav2_msgs.action import ComputePathToPose, FollowGPSWaypoints
22 from nav2_msgs.srv import ManageLifecycleNodes
23 from rcl_interfaces.srv import SetParameters
24 
25 import rclpy
26 
27 from rclpy.action import ActionClient
28 from rclpy.node import Node
29 from rclpy.parameter import Parameter
30 
31 
33 
34  def __init__(self):
35  super().__init__(node_name='nav2_gps_waypoint_tester', namespace='')
36  self.waypointswaypoints = None
37  self.action_clientaction_client = ActionClient(
38  self, FollowGPSWaypoints, 'follow_gps_waypoints'
39  )
40  self.goal_handlegoal_handle = None
41  self.action_resultaction_result = None
42 
43  self.param_cliparam_cli = self.create_client(
44  SetParameters, '/waypoint_follower/set_parameters'
45  )
46 
47  def setWaypoints(self, waypoints):
48  self.waypointswaypoints = []
49  for wp in waypoints:
50  msg = GeoPose()
51  msg.position.latitude = wp[0]
52  msg.position.longitude = wp[1]
53  msg.orientation.w = 1.0
54  self.waypointswaypoints.append(msg)
55 
56  def run(self, block, cancel):
57  # if not self.waypoints:
58  # rclpy.error_msg('Did not set valid waypoints before running test!')
59  # return False
60 
61  while not self.action_clientaction_client.wait_for_server(timeout_sec=1.0):
62  self.info_msginfo_msg(
63  "'follow_gps_waypoints' action server not available, waiting..."
64  )
65 
66  action_request = FollowGPSWaypoints.Goal()
67  action_request.gps_poses = self.waypointswaypoints
68 
69  self.info_msginfo_msg('Sending goal request...')
70  send_goal_future = self.action_clientaction_client.send_goal_async(action_request)
71  try:
72  rclpy.spin_until_future_complete(self, send_goal_future)
73  self.goal_handlegoal_handle = send_goal_future.result()
74  except Exception as e: # noqa: B902
75  self.error_msgerror_msg(f'Service call failed {e!r}')
76 
77  if not self.goal_handlegoal_handle.accepted:
78  self.error_msgerror_msg('Goal rejected')
79  return False
80 
81  self.info_msginfo_msg('Goal accepted')
82  if not block:
83  return True
84 
85  get_result_future = self.goal_handlegoal_handle.get_result_async()
86  if cancel:
87  time.sleep(2)
88  self.cancel_goalcancel_goal()
89 
90  self.info_msginfo_msg("Waiting for 'follow_gps_waypoints' action to complete")
91  try:
92  rclpy.spin_until_future_complete(self, get_result_future)
93  status = get_result_future.result().status
94  result = get_result_future.result().result
95  self.action_resultaction_result = result
96  except Exception as e: # noqa: B902
97  self.error_msgerror_msg(f'Service call failed {e!r}')
98 
99  if status != GoalStatus.STATUS_SUCCEEDED:
100  self.info_msginfo_msg(f'Goal failed with status code: {status}')
101  return False
102  if len(result.missed_waypoints) > 0:
103  self.info_msginfo_msg(
104  'Goal failed to process all waypoints,'
105  ' missed {0} wps.'.format(len(result.missed_waypoints))
106  )
107  return False
108 
109  self.info_msginfo_msg('Goal succeeded!')
110  return True
111 
112  def setStopFailureParam(self, value):
113  req = SetParameters.Request()
114  req.parameters = [
115  Parameter('stop_on_failure', Parameter.Type.BOOL, value).to_parameter_msg()
116  ]
117  future = self.param_cliparam_cli.call_async(req)
118  rclpy.spin_until_future_complete(self, future)
119 
120  def shutdown(self):
121  self.info_msginfo_msg('Shutting down')
122 
123  self.action_clientaction_client.destroy()
124  self.info_msginfo_msg('Destroyed follow_gps_waypoints action client')
125 
126  transition_service = 'lifecycle_manager_navigation/manage_nodes'
127  mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
128  while not mgr_client.wait_for_service(timeout_sec=1.0):
129  self.info_msginfo_msg(f'{transition_service} service not available, waiting...')
130 
131  req = ManageLifecycleNodes.Request()
132  req.command = ManageLifecycleNodes.Request().SHUTDOWN
133  future = mgr_client.call_async(req)
134  try:
135  rclpy.spin_until_future_complete(self, future)
136  future.result()
137  except Exception as e: # noqa: B902
138  self.error_msgerror_msg(f'{transition_service} service call failed {e!r}')
139 
140  self.info_msginfo_msg(f'{transition_service} finished')
141 
142  def cancel_goal(self):
143  cancel_future = self.goal_handlegoal_handle.cancel_goal_async()
144  rclpy.spin_until_future_complete(self, cancel_future)
145 
146  def info_msg(self, msg: str):
147  self.get_logger().info(msg)
148 
149  def warn_msg(self, msg: str):
150  self.get_logger().warn(msg)
151 
152  def error_msg(self, msg: str):
153  self.get_logger().error(msg)
154 
155 
156 def main(argv=sys.argv[1:]):
157  rclpy.init()
158 
159  # wait a few seconds to make sure entire stacks are up
160  time.sleep(10)
161 
162  wps = [[55.944831, -3.186998], [55.944818, -3.187075], [55.944782, -3.187060]]
163 
164  test = GpsWaypointFollowerTest()
165  test.setWaypoints(wps)
166 
167  # wait for poseCallback
168 
169  result = test.run(True, False)
170  assert result
171 
172  # preempt with new point
173  test.setWaypoints([wps[0]])
174  result = test.run(False, False)
175  time.sleep(2)
176  test.setWaypoints([wps[1]])
177  result = test.run(False, False)
178 
179  # cancel
180  time.sleep(2)
181  test.cancel_goal()
182 
183  # set waypoint outside of map
184  time.sleep(2)
185  test.setWaypoints([[35.0, -118.0]])
186  result = test.run(True, False)
187  assert not result
188  result = not result
189  assert (
190  test.action_result.missed_waypoints[0].error_code
191  == ComputePathToPose.Result().GOAL_OUTSIDE_MAP
192  )
193 
194  # stop on failure test with bogous waypoint
195  test.setStopFailureParam(True)
196  bwps = [[55.944831, -3.186998], [35.0, -118.0], [55.944782, -3.187060]]
197  test.setWaypoints(bwps)
198  result = test.run(True, False)
199  assert not result
200  result = not result
201  mwps = test.action_result.missed_waypoints
202  result = (len(mwps) == 1) & (mwps[0] == 1)
203  test.setStopFailureParam(False)
204 
205  # Zero goal test
206  test.setWaypoints([])
207  result = test.run(True, False)
208 
209  # Cancel test
210  test.setWaypoints(wps)
211  result = test.run(True, True)
212  assert not result
213  result = not result
214 
215  test.shutdown()
216  test.info_msg('Done Shutting Down.')
217 
218  if not result:
219  test.info_msg('Exiting failed')
220  exit(1)
221  else:
222  test.info_msg('Exiting passed')
223  exit(0)
224 
225 
226 if __name__ == '__main__':
227  main()
def info_msg(self, str msg)
Definition: tester.py:146
def error_msg(self, str msg)
Definition: tester.py:152