Nav2 Navigation Stack - jazzy  jazzy
ROS 2 Navigation Stack
spin_tester.py
1 #! /usr/bin/env python3
2 # Copyright 2024 Open Navigation LLC
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 
16 import sys
17 import time
18 
19 from action_msgs.msg import GoalStatus
20 from geometry_msgs.msg import Point32, PolygonStamped
21 from nav2_msgs.action import Spin
22 from nav2_msgs.msg import Costmap
23 from nav2_msgs.srv import ManageLifecycleNodes
24 
25 import rclpy
26 
27 from rclpy.action import ActionClient
28 from rclpy.duration import Duration
29 from rclpy.node import Node
30 from rclpy.qos import QoSDurabilityPolicy, QoSHistoryPolicy, QoSReliabilityPolicy
31 from rclpy.qos import QoSProfile
32 
33 
34 class SpinTest(Node):
35 
36  def __init__(self):
37  super().__init__(node_name='spin_tester', namespace='')
38  self.costmap_qoscostmap_qos = QoSProfile(
39  durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
40  reliability=QoSReliabilityPolicy.RELIABLE,
41  history=QoSHistoryPolicy.KEEP_LAST,
42  depth=1,
43  )
44  self.action_clientaction_client = ActionClient(self, Spin, 'spin')
45  self.costmap_pubcostmap_pub = self.create_publisher(
46  Costmap, 'local_costmap/costmap_raw', self.costmap_qoscostmap_qos)
47  self.footprint_pubfootprint_pub = self.create_publisher(
48  PolygonStamped, 'local_costmap/published_footprint', 10)
49  self.goal_handlegoal_handle = None
50  self.action_resultaction_result = None
51 
52  def sendCommand(self, command):
53  self.info_msginfo_msg('Sending goal request...')
54  self.goal_futuregoal_future = self.action_clientaction_client.send_goal_async(command)
55  try:
56  rclpy.spin_until_future_complete(self, self.goal_futuregoal_future)
57  self.goal_handlegoal_handle = self.goal_futuregoal_future.result()
58  except Exception as e: # noqa: B902
59  self.error_msgerror_msg(f'Service call failed {e!r}')
60 
61  if not self.goal_handlegoal_handle.accepted:
62  self.error_msgerror_msg('Goal rejected')
63  return False
64 
65  self.info_msginfo_msg('Goal accepted')
66  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
67 
68  self.info_msginfo_msg("Waiting for 'spin' action to complete")
69  try:
70  rclpy.spin_until_future_complete(self, self.result_futureresult_future)
71  status = self.result_futureresult_future.result().status
72  result = self.result_futureresult_future.result().result
73  self.action_resultaction_result = result
74  except Exception as e: # noqa: B902
75  self.error_msgerror_msg(f'Service call failed {e!r}')
76 
77  if status != GoalStatus.STATUS_SUCCEEDED:
78  self.info_msginfo_msg(f'Goal failed with status code: {status}')
79  return False
80  if self.action_resultaction_result.error_code == 0:
81  self.info_msginfo_msg('Spin was successful!')
82  return True
83  self.info_msginfo_msg('Spin failed to meet target!')
84  return False
85 
86  def sendAndPreemptWithInvertedCommand(self, command):
87  # Send initial goal
88  self.info_msginfo_msg('Sending goal request...')
89  self.goal_futuregoal_future = self.action_clientaction_client.send_goal_async(command)
90  try:
91  rclpy.spin_until_future_complete(self, self.goal_futuregoal_future)
92  self.goal_handlegoal_handle = self.goal_futuregoal_future.result()
93  except Exception as e: # noqa: B902
94  self.error_msgerror_msg(f'Service call failed {e!r}')
95 
96  if not self.goal_handlegoal_handle.accepted:
97  self.error_msgerror_msg('Goal rejected')
98  return False
99 
100  self.info_msginfo_msg('Goal accepted')
101  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
102 
103  # Now preempt it
104  time.sleep(0.5)
105  self.info_msginfo_msg('Sending preemption request...')
106  command.target_yaw = -command.target_yaw
107  self.goal_futuregoal_future = self.action_clientaction_client.send_goal_async(command)
108  try:
109  rclpy.spin_until_future_complete(self, self.goal_futuregoal_future)
110  self.goal_handlegoal_handle = self.goal_futuregoal_future.result()
111  except Exception as e: # noqa: B902
112  self.error_msgerror_msg(f'Service call failed {e!r}')
113 
114  if not self.goal_handlegoal_handle.accepted:
115  self.error_msgerror_msg('Preemption rejected')
116  return False
117 
118  self.info_msginfo_msg('Preemption accepted')
119  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
120 
121  # Wait for new goal to complete
122  self.info_msginfo_msg("Waiting for 'spin' action Preemption to complete")
123  try:
124  rclpy.spin_until_future_complete(self, self.result_futureresult_future)
125  status = self.result_futureresult_future.result().status
126  result = self.result_futureresult_future.result().result
127  self.action_resultaction_result = result
128  except Exception as e: # noqa: B902
129  self.error_msgerror_msg(f'Service call failed {e!r}')
130 
131  if status != GoalStatus.STATUS_SUCCEEDED:
132  self.info_msginfo_msg(f'Goal failed with status code: {status}')
133  return False
134  if self.action_resultaction_result.error_code == 0:
135  self.info_msginfo_msg('Spin was successful!')
136  return True
137  self.info_msginfo_msg('Spin failed to meet target!')
138  return False
139 
140  def sendAndCancelCommand(self, command):
141  self.info_msginfo_msg('Sending goal request...')
142  self.goal_futuregoal_future = self.action_clientaction_client.send_goal_async(command)
143  try:
144  rclpy.spin_until_future_complete(self, self.goal_futuregoal_future)
145  self.goal_handlegoal_handle = self.goal_futuregoal_future.result()
146  except Exception as e: # noqa: B902
147  self.error_msgerror_msg(f'Service call failed {e!r}')
148 
149  if not self.goal_handlegoal_handle.accepted:
150  self.error_msgerror_msg('Goal rejected')
151  return False
152 
153  self.info_msginfo_msg('Goal accepted')
154  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
155 
156  # Now cancel it
157  time.sleep(0.5)
158  cancel_future = self.goal_handlegoal_handle.cancel_goal_async()
159  rclpy.spin_until_future_complete(self, cancel_future)
160  rclpy.spin_until_future_complete(self, self.result_futureresult_future)
161  status = self.result_futureresult_future.result().status
162  if status != GoalStatus.STATUS_CANCELED:
163  self.info_msginfo_msg(f'Goal failed with status code: {status}')
164  return False
165  else:
166  self.info_msginfo_msg('Goal was canceled successfully')
167  return True
168 
169  def sendFreeCostmap(self):
170  costmap_msg = Costmap()
171  costmap_msg.header.frame_id = 'odom'
172  costmap_msg.header.stamp = self.get_clock().now().to_msg()
173  costmap_msg.metadata.resolution = 0.05
174  costmap_msg.metadata.size_x = 100
175  costmap_msg.metadata.size_y = 100
176  costmap_msg.metadata.origin.position.x = -2.5
177  costmap_msg.metadata.origin.position.y = -2.5
178  costmap_msg.data = [0] * (costmap_msg.metadata.size_x * costmap_msg.metadata.size_y)
179  self.costmap_pubcostmap_pub.publish(costmap_msg)
180 
181  footprint_msg = PolygonStamped()
182  footprint_msg.header.frame_id = 'odom'
183  footprint_msg.header.stamp = self.get_clock().now().to_msg()
184  footprint_msg.polygon.points = [
185  Point32(x=0.1, y=0.1, z=0.0),
186  Point32(x=0.1, y=-0.1, z=0.0),
187  Point32(x=-0.1, y=-0.1, z=0.0),
188  Point32(x=-0.1, y=0.1, z=0.0)
189  ]
190  self.footprint_pubfootprint_pub.publish(footprint_msg)
191 
192  def sendOccupiedCostmap(self):
193  costmap_msg = Costmap()
194  costmap_msg.header.frame_id = 'odom'
195  costmap_msg.header.stamp = self.get_clock().now().to_msg()
196  costmap_msg.metadata.resolution = 0.05
197  costmap_msg.metadata.size_x = 100
198  costmap_msg.metadata.size_y = 100
199  costmap_msg.metadata.origin.position.x = -2.5
200  costmap_msg.metadata.origin.position.y = -2.5
201  costmap_msg.data = [254] * (costmap_msg.metadata.size_x * costmap_msg.metadata.size_y)
202  self.costmap_pubcostmap_pub.publish(costmap_msg)
203 
204  footprint_msg = PolygonStamped()
205  footprint_msg.header.frame_id = 'odom'
206  footprint_msg.header.stamp = self.get_clock().now().to_msg()
207  footprint_msg.polygon.points = [
208  Point32(x=0.1, y=0.1, z=0.0),
209  Point32(x=0.1, y=-0.1, z=0.0),
210  Point32(x=-0.1, y=-0.1, z=0.0),
211  Point32(x=-0.1, y=0.1, z=0.0)
212  ]
213  self.footprint_pubfootprint_pub.publish(footprint_msg)
214 
215  def run(self):
216  while not self.action_clientaction_client.wait_for_server(timeout_sec=1.0):
217  self.info_msginfo_msg("'spin' action server not available, waiting...")
218 
219  # Test A: Send without valid costmap
220  action_request = Spin.Goal()
221  action_request.target_yaw = 0.349066 # 20 deg
222  action_request.time_allowance = Duration(seconds=5).to_msg()
223  cmd1 = self.sendCommandsendCommand(action_request)
224 
225  if cmd1:
226  self.error_msgerror_msg('Test A failed: Passed while costmap was not available!')
227  return not cmd1
228  else:
229  self.info_msginfo_msg('Test A passed')
230 
231  # Test B: Send with valid costmap and spin a couple of times
232  self.sendFreeCostmapsendFreeCostmap()
233  time.sleep(1)
234  cmd1 = self.sendCommandsendCommand(action_request)
235  action_request.target_yaw = -3.49066 # -200 deg
236  cmd2 = self.sendCommandsendCommand(action_request)
237 
238  if not cmd1 or not cmd2:
239  self.error_msgerror_msg('Test B failed: Failed to spin with valid costmap!')
240  return not cmd1 or not cmd2
241 
242  action_request.target_yaw = 0.349066 # 20 deg
243  cmd_preempt = self.sendAndPreemptWithInvertedCommandsendAndPreemptWithInvertedCommand(action_request)
244  if not cmd_preempt:
245  self.error_msgerror_msg('Test B failed: Failed to preempt and invert command!')
246  return not cmd_preempt
247 
248  cmd_cancel = self.sendAndCancelCommandsendAndCancelCommand(action_request)
249  if not cmd_cancel:
250  self.error_msgerror_msg('Test B failed: Failed to cancel command!')
251  return not cmd_cancel
252  else:
253  self.info_msginfo_msg('Test B passed')
254 
255  # Test C: Send with impossible command in time allowance
256  action_request.time_allowance = Duration(seconds=0.1).to_msg()
257  cmd3 = self.sendCommandsendCommand(action_request)
258  if cmd3:
259  self.error_msgerror_msg('Test C failed: Passed while impoossible timing requested!')
260  return not cmd3
261  else:
262  self.info_msginfo_msg('Test C passed')
263 
264  # Test D: Send with lethal costmap and spin
265  action_request.time_allowance = Duration(seconds=5).to_msg()
266  self.sendOccupiedCostmapsendOccupiedCostmap()
267  time.sleep(1)
268  cmd4 = self.sendCommandsendCommand(action_request)
269  if cmd4:
270  self.error_msgerror_msg('Test D failed: Passed while costmap was not lethal!')
271  return not cmd4
272  else:
273  self.info_msginfo_msg('Test D passed')
274  return True
275 
276  def shutdown(self):
277  self.info_msginfo_msg('Shutting down')
278 
279  self.action_clientaction_client.destroy()
280  self.info_msginfo_msg('Destroyed backup action client')
281 
282  transition_service = 'lifecycle_manager_navigation/manage_nodes'
283  mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
284  while not mgr_client.wait_for_service(timeout_sec=1.0):
285  self.info_msginfo_msg(f'{transition_service} service not available, waiting...')
286 
287  req = ManageLifecycleNodes.Request()
288  req.command = ManageLifecycleNodes.Request().SHUTDOWN
289  future = mgr_client.call_async(req)
290  try:
291  rclpy.spin_until_future_complete(self, future)
292  future.result()
293  except Exception as e: # noqa: B902
294  self.error_msgerror_msg(f'{transition_service} service call failed {e!r}')
295 
296  self.info_msginfo_msg(f'{transition_service} finished')
297 
298  def info_msg(self, msg: str):
299  self.get_logger().info(msg)
300 
301  def warn_msg(self, msg: str):
302  self.get_logger().warn(msg)
303 
304  def error_msg(self, msg: str):
305  self.get_logger().error(msg)
306 
307 
308 def main(argv=sys.argv[1:]):
309  rclpy.init()
310  time.sleep(10)
311  test = SpinTest()
312  result = test.run()
313  test.shutdown()
314 
315  if not result:
316  test.info_msg('Exiting failed')
317  exit(1)
318  else:
319  test.info_msg('Exiting passed')
320  exit(0)
321 
322 
323 if __name__ == '__main__':
324  main()
def sendFreeCostmap(self)
Definition: spin_tester.py:169
def sendCommand(self, command)
Definition: spin_tester.py:52
def info_msg(self, str msg)
Definition: spin_tester.py:298
def sendAndCancelCommand(self, command)
Definition: spin_tester.py:140
def error_msg(self, str msg)
Definition: spin_tester.py:304
def sendOccupiedCostmap(self)
Definition: spin_tester.py:192
def sendAndPreemptWithInvertedCommand(self, command)
Definition: spin_tester.py:86