Nav2 Navigation Stack - rolling  main
ROS 2 Navigation Stack
spin_tester.py
1 #! /usr/bin/env python3
2 # Copyright 2024 Open Navigation LLC
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 
16 import sys
17 import time
18 from typing import Optional
19 
20 from action_msgs.msg import GoalStatus
21 from geometry_msgs.msg import Point32, PolygonStamped
22 from nav2_msgs.action import Spin
23 from nav2_msgs.msg import Costmap
24 from nav2_msgs.srv import ManageLifecycleNodes
25 import rclpy
26 from rclpy.action import ActionClient
27 from rclpy.action.client import ClientGoalHandle
28 from rclpy.client import Client
29 from rclpy.duration import Duration
30 from rclpy.node import Node
31 from rclpy.qos import QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy
32 
33 
34 class SpinTest(Node):
35 
36  def __init__(self) -> None:
37  super().__init__(node_name='spin_tester', namespace='')
38  self.costmap_qoscostmap_qos = QoSProfile(
39  durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
40  reliability=QoSReliabilityPolicy.RELIABLE,
41  history=QoSHistoryPolicy.KEEP_LAST,
42  depth=1,
43  )
44  self.action_client: ActionClient[Spin.Goal, Spin.Result, Spin.Feedback] \
45  = ActionClient(self, Spin, 'spin')
46 
47  self.costmap_pubcostmap_pub = self.create_publisher(
48  Costmap, 'local_costmap/costmap_raw', self.costmap_qoscostmap_qos)
49  self.footprint_pubfootprint_pub = self.create_publisher(
50  PolygonStamped, 'local_costmap/published_footprint', 10)
51  self.goal_handlegoal_handle: Optional[ClientGoalHandle[
52  Spin.Goal, Spin.Result, Spin.Feedback]] = None
53  self.action_resultaction_result = Spin.Result()
54 
55  def sendCommand(self, command: Spin.Goal) -> bool:
56  self.info_msginfo_msg('Sending goal request...')
57  self.goal_futuregoal_future = self.action_client.send_goal_async(command)
58  try:
59  rclpy.spin_until_future_complete(self, self.goal_futuregoal_future)
60  self.goal_handlegoal_handle = self.goal_futuregoal_future.result()
61  except Exception as e: # noqa: B902
62  self.error_msgerror_msg(f'Service call failed {e!r}')
63 
64  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
65  self.error_msgerror_msg('Goal rejected')
66  return False
67 
68  self.info_msginfo_msg('Goal accepted')
69  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
70 
71  self.info_msginfo_msg("Waiting for 'spin' action to complete")
72  try:
73  rclpy.spin_until_future_complete(self, self.result_futureresult_future)
74  status = self.result_futureresult_future.result().status # type: ignore[union-attr]
75  result = self.result_futureresult_future.result().result # type: ignore[union-attr]
76  self.action_resultaction_result = result
77  except Exception as e: # noqa: B902
78  self.error_msgerror_msg(f'Service call failed {e!r}')
79 
80  if status != GoalStatus.STATUS_SUCCEEDED:
81  self.info_msginfo_msg(f'Goal failed with status code: {status}')
82  return False
83  if self.action_resultaction_result.error_code == 0:
84  self.info_msginfo_msg('Spin was successful!')
85  return True
86  self.info_msginfo_msg('Spin failed to meet target!')
87  return False
88 
89  def sendAndPreemptWithInvertedCommand(self, command: Spin.Goal) -> bool:
90  # Send initial goal
91  self.info_msginfo_msg('Sending goal request...')
92  self.goal_futuregoal_future = self.action_client.send_goal_async(command)
93  try:
94  rclpy.spin_until_future_complete(self, self.goal_futuregoal_future)
95  self.goal_handlegoal_handle = self.goal_futuregoal_future.result()
96  except Exception as e: # noqa: B902
97  self.error_msgerror_msg(f'Service call failed {e!r}')
98 
99  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
100  self.error_msgerror_msg('Goal rejected')
101  return False
102 
103  self.info_msginfo_msg('Goal accepted')
104  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
105 
106  # Now preempt it
107  time.sleep(0.5)
108  self.info_msginfo_msg('Sending preemption request...')
109  command.target_yaw = -command.target_yaw
110  self.goal_futuregoal_future = self.action_client.send_goal_async(command)
111  try:
112  rclpy.spin_until_future_complete(self, self.goal_futuregoal_future)
113  self.goal_handlegoal_handle = self.goal_futuregoal_future.result()
114  except Exception as e: # noqa: B902
115  self.error_msgerror_msg(f'Service call failed {e!r}')
116 
117  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
118  self.error_msgerror_msg('Preemption rejected')
119  return False
120 
121  self.info_msginfo_msg('Preemption accepted')
122  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
123 
124  # Wait for new goal to complete
125  self.info_msginfo_msg("Waiting for 'spin' action Preemption to complete")
126  try:
127  rclpy.spin_until_future_complete(self, self.result_futureresult_future)
128  status = self.result_futureresult_future.result().status # type: ignore[union-attr]
129  result = self.result_futureresult_future.result().result # type: ignore[union-attr]
130  self.action_resultaction_result = result
131  except Exception as e: # noqa: B902
132  self.error_msgerror_msg(f'Service call failed {e!r}')
133 
134  if status != GoalStatus.STATUS_SUCCEEDED:
135  self.info_msginfo_msg(f'Goal failed with status code: {status}')
136  return False
137  if self.action_resultaction_result.error_code == 0:
138  self.info_msginfo_msg('Spin was successful!')
139  return True
140  self.info_msginfo_msg('Spin failed to meet target!')
141  return False
142 
143  def sendAndCancelCommand(self, command: Spin.Goal) -> bool:
144  self.info_msginfo_msg('Sending goal request...')
145  self.goal_futuregoal_future = self.action_client.send_goal_async(command)
146  try:
147  rclpy.spin_until_future_complete(self, self.goal_futuregoal_future)
148  self.goal_handlegoal_handle = self.goal_futuregoal_future.result()
149  except Exception as e: # noqa: B902
150  self.error_msgerror_msg(f'Service call failed {e!r}')
151 
152  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
153  self.error_msgerror_msg('Goal rejected')
154  return False
155 
156  self.info_msginfo_msg('Goal accepted')
157  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
158 
159  # Now cancel it
160  time.sleep(0.5)
161  cancel_future = self.goal_handlegoal_handle.cancel_goal_async()
162  rclpy.spin_until_future_complete(self, cancel_future)
163  rclpy.spin_until_future_complete(self, self.result_futureresult_future)
164  status = self.result_futureresult_future.result().status # type: ignore[union-attr]
165  if status != GoalStatus.STATUS_CANCELED:
166  self.info_msginfo_msg(f'Goal failed with status code: {status}')
167  return False
168  else:
169  self.info_msginfo_msg('Goal was canceled successfully')
170  return True
171 
172  def sendFreeCostmap(self) -> None:
173  costmap_msg = Costmap()
174  costmap_msg.header.frame_id = 'odom'
175  costmap_msg.header.stamp = self.get_clock().now().to_msg()
176  costmap_msg.metadata.resolution = 0.05
177  costmap_msg.metadata.size_x = 100
178  costmap_msg.metadata.size_y = 100
179  costmap_msg.metadata.origin.position.x = -2.5
180  costmap_msg.metadata.origin.position.y = -2.5
181  costmap_msg.data = [0] * (costmap_msg.metadata.size_x * costmap_msg.metadata.size_y)
182  self.costmap_pubcostmap_pub.publish(costmap_msg)
183 
184  footprint_msg = PolygonStamped()
185  footprint_msg.header.frame_id = 'odom'
186  footprint_msg.header.stamp = self.get_clock().now().to_msg()
187  footprint_msg.polygon.points = [
188  Point32(x=0.1, y=0.1, z=0.0),
189  Point32(x=0.1, y=-0.1, z=0.0),
190  Point32(x=-0.1, y=-0.1, z=0.0),
191  Point32(x=-0.1, y=0.1, z=0.0)
192  ]
193  self.footprint_pubfootprint_pub.publish(footprint_msg)
194 
195  def sendOccupiedCostmap(self) -> None:
196  costmap_msg = Costmap()
197  costmap_msg.header.frame_id = 'odom'
198  costmap_msg.header.stamp = self.get_clock().now().to_msg()
199  costmap_msg.metadata.resolution = 0.05
200  costmap_msg.metadata.size_x = 100
201  costmap_msg.metadata.size_y = 100
202  costmap_msg.metadata.origin.position.x = -2.5
203  costmap_msg.metadata.origin.position.y = -2.5
204  costmap_msg.data = [254] * (costmap_msg.metadata.size_x * costmap_msg.metadata.size_y)
205  self.costmap_pubcostmap_pub.publish(costmap_msg)
206 
207  footprint_msg = PolygonStamped()
208  footprint_msg.header.frame_id = 'odom'
209  footprint_msg.header.stamp = self.get_clock().now().to_msg()
210  footprint_msg.polygon.points = [
211  Point32(x=0.1, y=0.1, z=0.0),
212  Point32(x=0.1, y=-0.1, z=0.0),
213  Point32(x=-0.1, y=-0.1, z=0.0),
214  Point32(x=-0.1, y=0.1, z=0.0)
215  ]
216  self.footprint_pubfootprint_pub.publish(footprint_msg)
217 
218  def run(self) -> bool:
219  while not self.action_client.wait_for_server(timeout_sec=1.0):
220  self.info_msginfo_msg("'spin' action server not available, waiting...")
221 
222  # Test A: Send without valid costmap
223  action_request = Spin.Goal()
224  action_request.target_yaw = 0.349066 # 20 deg
225  action_request.time_allowance = Duration(seconds=5).to_msg()
226  cmd1 = self.sendCommandsendCommand(action_request)
227 
228  if cmd1:
229  self.error_msgerror_msg('Test A failed: Passed while costmap was not available!')
230  return not cmd1
231  else:
232  self.info_msginfo_msg('Test A passed')
233 
234  # Test B: Send with valid costmap and spin a couple of times
235  self.sendFreeCostmapsendFreeCostmap()
236  time.sleep(1)
237  cmd1 = self.sendCommandsendCommand(action_request)
238  action_request.target_yaw = -3.49066 # -200 deg
239  cmd2 = self.sendCommandsendCommand(action_request)
240 
241  if not cmd1 or not cmd2:
242  self.error_msgerror_msg('Test B failed: Failed to spin with valid costmap!')
243  return not cmd1 or not cmd2
244 
245  action_request.target_yaw = 0.349066 # 20 deg
246  cmd_preempt = self.sendAndPreemptWithInvertedCommandsendAndPreemptWithInvertedCommand(action_request)
247  if not cmd_preempt:
248  self.error_msgerror_msg('Test B failed: Failed to preempt and invert command!')
249  return not cmd_preempt
250 
251  cmd_cancel = self.sendAndCancelCommandsendAndCancelCommand(action_request)
252  if not cmd_cancel:
253  self.error_msgerror_msg('Test B failed: Failed to cancel command!')
254  return not cmd_cancel
255  else:
256  self.info_msginfo_msg('Test B passed')
257 
258  # Test C: Send with impossible command in time allowance
259  action_request.time_allowance = Duration(seconds=0.1).to_msg()
260  cmd3 = self.sendCommandsendCommand(action_request)
261  if cmd3:
262  self.error_msgerror_msg('Test C failed: Passed while impoossible timing requested!')
263  return not cmd3
264  else:
265  self.info_msginfo_msg('Test C passed')
266 
267  # Test D: Send with lethal costmap and spin
268  action_request.time_allowance = Duration(seconds=5).to_msg()
269  self.sendOccupiedCostmapsendOccupiedCostmap()
270  time.sleep(1)
271  cmd4 = self.sendCommandsendCommand(action_request)
272  if cmd4:
273  self.error_msgerror_msg('Test D failed: Passed while costmap was not lethal!')
274  return not cmd4
275  else:
276  self.info_msginfo_msg('Test D passed')
277  return True
278 
279  def shutdown(self) -> None:
280  self.info_msginfo_msg('Shutting down')
281 
282  self.action_client.destroy()
283  self.info_msginfo_msg('Destroyed backup action client')
284 
285  transition_service = 'lifecycle_manager_navigation/manage_nodes'
286  mgr_client: Client[ManageLifecycleNodes.Request, ManageLifecycleNodes.Response] \
287  = self.create_client(ManageLifecycleNodes, transition_service)
288  while not mgr_client.wait_for_service(timeout_sec=1.0):
289  self.info_msginfo_msg(f'{transition_service} service not available, waiting...')
290 
291  req = ManageLifecycleNodes.Request()
292  req.command = ManageLifecycleNodes.Request().SHUTDOWN
293  future = mgr_client.call_async(req)
294  try:
295  rclpy.spin_until_future_complete(self, future)
296  future.result()
297  except Exception as e: # noqa: B902
298  self.error_msgerror_msg(f'{transition_service} service call failed {e!r}')
299 
300  self.info_msginfo_msg(f'{transition_service} finished')
301 
302  def info_msg(self, msg: str) -> None:
303  self.get_logger().info(msg)
304 
305  def warn_msg(self, msg: str) -> None:
306  self.get_logger().warning(msg)
307 
308  def error_msg(self, msg: str) -> None:
309  self.get_logger().error(msg)
310 
311 
312 def main(argv: list[str] = sys.argv[1:]): # type: ignore[no-untyped-def]
313  rclpy.init()
314  time.sleep(10)
315  test = SpinTest()
316  result = test.run()
317  test.shutdown()
318 
319  if not result:
320  test.info_msg('Exiting failed')
321  exit(1)
322  else:
323  test.info_msg('Exiting passed')
324  exit(0)
325 
326 
327 if __name__ == '__main__':
328  main()
None sendOccupiedCostmap(self)
Definition: spin_tester.py:195
bool sendAndCancelCommand(self, Spin.Goal command)
Definition: spin_tester.py:143
None error_msg(self, str msg)
Definition: spin_tester.py:308
None sendFreeCostmap(self)
Definition: spin_tester.py:172
bool sendCommand(self, Spin.Goal command)
Definition: spin_tester.py:55
None info_msg(self, str msg)
Definition: spin_tester.py:302
bool sendAndPreemptWithInvertedCommand(self, Spin.Goal command)
Definition: spin_tester.py:89