Nav2 Navigation Stack - jazzy  jazzy
ROS 2 Navigation Stack
tester.py
1 #! /usr/bin/env python3
2 # Copyright 2019 Samsung Research America
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 
16 import sys
17 import time
18 
19 from action_msgs.msg import GoalStatus
20 from geometry_msgs.msg import PoseStamped, PoseWithCovarianceStamped
21 from nav2_msgs.action import ComputePathToPose, FollowWaypoints
22 from nav2_msgs.srv import ManageLifecycleNodes
23 from rcl_interfaces.srv import SetParameters
24 
25 import rclpy
26 from rclpy.action import ActionClient
27 from rclpy.node import Node
28 from rclpy.parameter import Parameter
29 from rclpy.qos import QoSDurabilityPolicy, QoSHistoryPolicy, QoSReliabilityPolicy
30 from rclpy.qos import QoSProfile
31 
32 
34 
35  def __init__(self):
36  super().__init__(node_name='nav2_waypoint_tester', namespace='')
37  self.waypointswaypoints = None
38  self.action_clientaction_client = ActionClient(self, FollowWaypoints, 'follow_waypoints')
39  self.initial_pose_pubinitial_pose_pub = self.create_publisher(
40  PoseWithCovarianceStamped, 'initialpose', 10
41  )
42  self.initial_pose_receivedinitial_pose_received = False
43  self.goal_handlegoal_handle = None
44  self.action_resultaction_result = None
45 
46  pose_qos = QoSProfile(
47  durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
48  reliability=QoSReliabilityPolicy.RELIABLE,
49  history=QoSHistoryPolicy.KEEP_LAST,
50  depth=1,
51  )
52 
53  self.model_pose_submodel_pose_sub = self.create_subscription(
54  PoseWithCovarianceStamped, 'amcl_pose', self.poseCallbackposeCallback, pose_qos
55  )
56  self.param_cliparam_cli = self.create_client(
57  SetParameters, '/waypoint_follower/set_parameters'
58  )
59 
60  def setInitialPose(self, pose):
61  self.init_poseinit_pose = PoseWithCovarianceStamped()
62  self.init_poseinit_pose.pose.pose.position.x = pose[0]
63  self.init_poseinit_pose.pose.pose.position.y = pose[1]
64  self.init_poseinit_pose.header.frame_id = 'map'
65  self.publishInitialPosepublishInitialPose()
66  time.sleep(5)
67 
68  def poseCallback(self, msg):
69  self.info_msginfo_msg('Received amcl_pose')
70  self.initial_pose_receivedinitial_pose_received = True
71 
72  def setWaypoints(self, waypoints):
73  self.waypointswaypoints = []
74  for wp in waypoints:
75  msg = PoseStamped()
76  msg.header.frame_id = 'map'
77  msg.pose.position.x = wp[0]
78  msg.pose.position.y = wp[1]
79  msg.pose.orientation.w = 1.0
80  self.waypointswaypoints.append(msg)
81 
82  def run(self, block, cancel):
83  # if not self.waypoints:
84  # rclpy.error_msg('Did not set valid waypoints before running test!')
85  # return False
86 
87  while not self.action_clientaction_client.wait_for_server(timeout_sec=1.0):
88  self.info_msginfo_msg("'follow_waypoints' action server not available, waiting...")
89 
90  action_request = FollowWaypoints.Goal()
91  action_request.poses = self.waypointswaypoints
92 
93  self.info_msginfo_msg('Sending goal request...')
94  send_goal_future = self.action_clientaction_client.send_goal_async(action_request)
95  try:
96  rclpy.spin_until_future_complete(self, send_goal_future)
97  self.goal_handlegoal_handle = send_goal_future.result()
98  except Exception as e: # noqa: B902
99  self.error_msgerror_msg(f'Service call failed {e!r}')
100 
101  if not self.goal_handlegoal_handle.accepted:
102  self.error_msgerror_msg('Goal rejected')
103  return False
104 
105  self.info_msginfo_msg('Goal accepted')
106  if not block:
107  return True
108 
109  get_result_future = self.goal_handlegoal_handle.get_result_async()
110  if cancel:
111  time.sleep(2)
112  self.cancel_goalcancel_goal()
113 
114  self.info_msginfo_msg("Waiting for 'follow_waypoints' action to complete")
115  try:
116  rclpy.spin_until_future_complete(self, get_result_future)
117  status = get_result_future.result().status
118  result = get_result_future.result().result
119  self.action_resultaction_result = result
120  except Exception as e: # noqa: B902
121  self.error_msgerror_msg(f'Service call failed {e!r}')
122 
123  if status != GoalStatus.STATUS_SUCCEEDED:
124  self.info_msginfo_msg(f'Goal failed with status code: {status}')
125  return False
126  if len(self.action_resultaction_result.missed_waypoints) > 0:
127  self.info_msginfo_msg(
128  'Goal failed to process all waypoints,'
129  ' missed {0} wps.'.format(len(self.action_resultaction_result.missed_waypoints))
130  )
131  return False
132 
133  self.info_msginfo_msg('Goal succeeded!')
134  return True
135 
136  def publishInitialPose(self):
137  self.initial_pose_pubinitial_pose_pub.publish(self.init_poseinit_pose)
138 
139  def setStopFailureParam(self, value):
140  req = SetParameters.Request()
141  req.parameters = [
142  Parameter('stop_on_failure', Parameter.Type.BOOL, value).to_parameter_msg()
143  ]
144  future = self.param_cliparam_cli.call_async(req)
145  rclpy.spin_until_future_complete(self, future)
146 
147  def shutdown(self):
148  self.info_msginfo_msg('Shutting down')
149 
150  self.action_clientaction_client.destroy()
151  self.info_msginfo_msg('Destroyed follow_waypoints action client')
152 
153  transition_service = 'lifecycle_manager_navigation/manage_nodes'
154  mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
155  while not mgr_client.wait_for_service(timeout_sec=1.0):
156  self.info_msginfo_msg(f'{transition_service} service not available, waiting...')
157 
158  req = ManageLifecycleNodes.Request()
159  req.command = ManageLifecycleNodes.Request().SHUTDOWN
160  future = mgr_client.call_async(req)
161  try:
162  rclpy.spin_until_future_complete(self, future)
163  future.result()
164  except Exception as e: # noqa: B902
165  self.error_msgerror_msg(f'{transition_service} service call failed {e!r}')
166 
167  self.info_msginfo_msg(f'{transition_service} finished')
168 
169  transition_service = 'lifecycle_manager_localization/manage_nodes'
170  mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
171  while not mgr_client.wait_for_service(timeout_sec=1.0):
172  self.info_msginfo_msg(f'{transition_service} service not available, waiting...')
173 
174  req = ManageLifecycleNodes.Request()
175  req.command = ManageLifecycleNodes.Request().SHUTDOWN
176  future = mgr_client.call_async(req)
177  try:
178  rclpy.spin_until_future_complete(self, future)
179  future.result()
180  except Exception as e: # noqa: B902
181  self.error_msgerror_msg(f'{transition_service} service call failed {e!r}')
182 
183  self.info_msginfo_msg(f'{transition_service} finished')
184 
185  def cancel_goal(self):
186  cancel_future = self.goal_handlegoal_handle.cancel_goal_async()
187  rclpy.spin_until_future_complete(self, cancel_future)
188 
189  def info_msg(self, msg: str):
190  self.get_logger().info(msg)
191 
192  def warn_msg(self, msg: str):
193  self.get_logger().warn(msg)
194 
195  def error_msg(self, msg: str):
196  self.get_logger().error(msg)
197 
198 
199 def main(argv=sys.argv[1:]):
200  rclpy.init()
201 
202  # wait a few seconds to make sure entire stacks are up
203  time.sleep(10)
204 
205  wps = [[-0.52, -0.54], [0.58, -0.55], [1.78, -0.57]]
206  starting_pose = [-2.0, -0.5]
207 
208  test = WaypointFollowerTest()
209  test.setWaypoints(wps)
210 
211  retry_count = 0
212  retries = 2
213  while not test.initial_pose_received and retry_count <= retries:
214  retry_count += 1
215  test.info_msg('Setting initial pose')
216  test.setInitialPose(starting_pose)
217  test.info_msg('Waiting for amcl_pose to be received')
218  rclpy.spin_once(test, timeout_sec=1.0) # wait for poseCallback
219 
220  result = test.run(True, False)
221  assert result
222 
223  # preempt with new point
224  test.setWaypoints([starting_pose])
225  result = test.run(False, False)
226  time.sleep(2)
227  test.setWaypoints([wps[1]])
228  result = test.run(False, False)
229 
230  # cancel
231  time.sleep(2)
232  test.cancel_goal()
233 
234  # set waypoint outside of map
235  time.sleep(2)
236  test.setWaypoints([[100.0, 100.0]])
237  result = test.run(True, False)
238  assert not result
239  result = not result
240  assert (
241  test.action_result.missed_waypoints[0].error_code
242  == ComputePathToPose.Result().GOAL_OUTSIDE_MAP
243  )
244 
245  # stop on failure test with bogous waypoint
246  test.setStopFailureParam(True)
247  bwps = [[-0.52, -0.54], [100.0, 100.0], [0.58, 0.52]]
248  test.setWaypoints(bwps)
249  result = test.run(True, False)
250  assert not result
251  result = not result
252  mwps = test.action_result.missed_waypoints
253  result = (len(mwps) == 1) & (mwps[0] == 1)
254  test.setStopFailureParam(False)
255 
256  # Zero goal test
257  test.setWaypoints([])
258  result = test.run(True, False)
259 
260  # Cancel test
261  test.setWaypoints(wps)
262  result = test.run(True, True)
263  assert not result
264  result = not result
265 
266  test.shutdown()
267  test.info_msg('Done Shutting Down.')
268 
269  if not result:
270  test.info_msg('Exiting failed')
271  exit(1)
272  else:
273  test.info_msg('Exiting passed')
274  exit(0)
275 
276 
277 if __name__ == '__main__':
278  main()
def publishInitialPose(self)
Definition: tester.py:136
def info_msg(self, str msg)
Definition: tester.py:189
def poseCallback(self, msg)
Definition: tester.py:68
def error_msg(self, str msg)
Definition: tester.py:195