Nav2 Navigation Stack - humble  humble
ROS 2 Navigation Stack
tester.py
1 #! /usr/bin/env python3
2 # Copyright 2019 Samsung Research America
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 
16 import sys
17 import time
18 
19 from action_msgs.msg import GoalStatus
20 from geometry_msgs.msg import PoseStamped, PoseWithCovarianceStamped
21 from nav2_msgs.action import FollowWaypoints
22 from nav2_msgs.srv import ManageLifecycleNodes
23 
24 import rclpy
25 from rclpy.action import ActionClient
26 from rclpy.node import Node
27 from rclpy.qos import QoSDurabilityPolicy, QoSHistoryPolicy, QoSReliabilityPolicy
28 from rclpy.qos import QoSProfile
29 
30 
32 
33  def __init__(self):
34  super().__init__(node_name='nav2_waypoint_tester', namespace='')
35  self.waypointswaypoints = None
36  self.action_clientaction_client = ActionClient(self, FollowWaypoints, 'follow_waypoints')
37  self.initial_pose_pubinitial_pose_pub = self.create_publisher(PoseWithCovarianceStamped,
38  'initialpose', 10)
39  self.initial_pose_receivedinitial_pose_received = False
40  self.goal_handlegoal_handle = None
41 
42  pose_qos = QoSProfile(
43  durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
44  reliability=QoSReliabilityPolicy.RELIABLE,
45  history=QoSHistoryPolicy.KEEP_LAST,
46  depth=1)
47 
48  self.model_pose_submodel_pose_sub = self.create_subscription(PoseWithCovarianceStamped,
49  'amcl_pose', self.poseCallbackposeCallback, pose_qos)
50 
51  def setInitialPose(self, pose):
52  self.init_poseinit_pose = PoseWithCovarianceStamped()
53  self.init_poseinit_pose.pose.pose.position.x = pose[0]
54  self.init_poseinit_pose.pose.pose.position.y = pose[1]
55  self.init_poseinit_pose.header.frame_id = 'map'
56  self.publishInitialPosepublishInitialPose()
57  time.sleep(5)
58 
59  def poseCallback(self, msg):
60  self.info_msginfo_msg('Received amcl_pose')
61  self.initial_pose_receivedinitial_pose_received = True
62 
63  def setWaypoints(self, waypoints):
64  self.waypointswaypoints = []
65  for wp in waypoints:
66  msg = PoseStamped()
67  msg.header.frame_id = 'map'
68  msg.pose.position.x = wp[0]
69  msg.pose.position.y = wp[1]
70  msg.pose.orientation.w = 1.0
71  self.waypointswaypoints.append(msg)
72 
73  def run(self, block):
74  if not self.waypointswaypoints:
75  rclpy.error_msg('Did not set valid waypoints before running test!')
76  return False
77 
78  while not self.action_clientaction_client.wait_for_server(timeout_sec=1.0):
79  self.info_msginfo_msg("'follow_waypoints' action server not available, waiting...")
80 
81  action_request = FollowWaypoints.Goal()
82  action_request.poses = self.waypointswaypoints
83 
84  self.info_msginfo_msg('Sending goal request...')
85  send_goal_future = self.action_clientaction_client.send_goal_async(action_request)
86  try:
87  rclpy.spin_until_future_complete(self, send_goal_future)
88  self.goal_handlegoal_handle = send_goal_future.result()
89  except Exception as e: # noqa: B902
90  self.error_msgerror_msg(f'Service call failed {e!r}')
91 
92  if not self.goal_handlegoal_handle.accepted:
93  self.error_msgerror_msg('Goal rejected')
94  return False
95 
96  self.info_msginfo_msg('Goal accepted')
97  if not block:
98  return True
99 
100  get_result_future = self.goal_handlegoal_handle.get_result_async()
101 
102  self.info_msginfo_msg("Waiting for 'follow_waypoints' action to complete")
103  try:
104  rclpy.spin_until_future_complete(self, get_result_future)
105  status = get_result_future.result().status
106  result = get_result_future.result().result
107  except Exception as e: # noqa: B902
108  self.error_msgerror_msg(f'Service call failed {e!r}')
109 
110  if status != GoalStatus.STATUS_SUCCEEDED:
111  self.info_msginfo_msg(f'Goal failed with status code: {status}')
112  return False
113  if len(result.missed_waypoints) > 0:
114  self.info_msginfo_msg('Goal failed to process all waypoints,'
115  ' missed {0} wps.'.format(len(result.missed_waypoints)))
116  return False
117 
118  self.info_msginfo_msg('Goal succeeded!')
119  return True
120 
121  def publishInitialPose(self):
122  self.initial_pose_pubinitial_pose_pub.publish(self.init_poseinit_pose)
123 
124  def shutdown(self):
125  self.info_msginfo_msg('Shutting down')
126 
127  self.action_clientaction_client.destroy()
128  self.info_msginfo_msg('Destroyed follow_waypoints action client')
129 
130  transition_service = 'lifecycle_manager_navigation/manage_nodes'
131  mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
132  while not mgr_client.wait_for_service(timeout_sec=1.0):
133  self.info_msginfo_msg(f'{transition_service} service not available, waiting...')
134 
135  req = ManageLifecycleNodes.Request()
136  req.command = ManageLifecycleNodes.Request().SHUTDOWN
137  future = mgr_client.call_async(req)
138  try:
139  rclpy.spin_until_future_complete(self, future)
140  future.result()
141  except Exception as e: # noqa: B902
142  self.error_msgerror_msg(f'{transition_service} service call failed {e!r}')
143 
144  self.info_msginfo_msg(f'{transition_service} finished')
145 
146  transition_service = 'lifecycle_manager_localization/manage_nodes'
147  mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
148  while not mgr_client.wait_for_service(timeout_sec=1.0):
149  self.info_msginfo_msg(f'{transition_service} service not available, waiting...')
150 
151  req = ManageLifecycleNodes.Request()
152  req.command = ManageLifecycleNodes.Request().SHUTDOWN
153  future = mgr_client.call_async(req)
154  try:
155  rclpy.spin_until_future_complete(self, future)
156  future.result()
157  except Exception as e: # noqa: B902
158  self.error_msgerror_msg(f'{transition_service} service call failed {e!r}')
159 
160  self.info_msginfo_msg(f'{transition_service} finished')
161 
162  def cancel_goal(self):
163  cancel_future = self.goal_handlegoal_handle.cancel_goal_async()
164  rclpy.spin_until_future_complete(self, cancel_future)
165 
166  def info_msg(self, msg: str):
167  self.get_logger().info(msg)
168 
169  def warn_msg(self, msg: str):
170  self.get_logger().warn(msg)
171 
172  def error_msg(self, msg: str):
173  self.get_logger().error(msg)
174 
175 
176 def main(argv=sys.argv[1:]):
177  rclpy.init()
178 
179  # wait a few seconds to make sure entire stacks are up
180  time.sleep(10)
181 
182  wps = [[-0.52, -0.54], [0.58, -0.55], [0.58, 0.52]]
183  starting_pose = [-2.0, -0.5]
184 
185  test = WaypointFollowerTest()
186  test.setWaypoints(wps)
187 
188  retry_count = 0
189  retries = 2
190  while not test.initial_pose_received and retry_count <= retries:
191  retry_count += 1
192  test.info_msg('Setting initial pose')
193  test.setInitialPose(starting_pose)
194  test.info_msg('Waiting for amcl_pose to be received')
195  rclpy.spin_once(test, timeout_sec=1.0) # wait for poseCallback
196 
197  result = test.run(True)
198  assert result
199 
200  # preempt with new point
201  test.setWaypoints([starting_pose])
202  result = test.run(False)
203  time.sleep(2)
204  test.setWaypoints([wps[1]])
205  result = test.run(False)
206 
207  # cancel
208  time.sleep(2)
209  test.cancel_goal()
210 
211  # a failure case
212  time.sleep(2)
213  test.setWaypoints([[100.0, 100.0]])
214  result = test.run(True)
215  assert not result
216  result = not result
217 
218  test.shutdown()
219  test.info_msg('Done Shutting Down.')
220 
221  if not result:
222  test.info_msg('Exiting failed')
223  exit(1)
224  else:
225  test.info_msg('Exiting passed')
226  exit(0)
227 
228 
229 if __name__ == '__main__':
230  main()
def publishInitialPose(self)
Definition: tester.py:121
def info_msg(self, str msg)
Definition: tester.py:166
def poseCallback(self, msg)
Definition: tester.py:59
def error_msg(self, str msg)
Definition: tester.py:172