Nav2 Navigation Stack - kilted  kilted
ROS 2 Navigation Stack
spin_tester.py
1 #! /usr/bin/env python3
2 # Copyright 2024 Open Navigation LLC
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 
16 import sys
17 import time
18 from typing import Optional
19 
20 from action_msgs.msg import GoalStatus
21 from geometry_msgs.msg import Point32, PolygonStamped
22 from nav2_msgs.action import Spin
23 from nav2_msgs.msg import Costmap
24 from nav2_msgs.srv import ManageLifecycleNodes
25 import rclpy
26 from rclpy.action import ActionClient # type: ignore[attr-defined]
27 from rclpy.action.client import ClientGoalHandle
28 from rclpy.duration import Duration
29 from rclpy.node import Node
30 from rclpy.qos import QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy
31 
32 
33 class SpinTest(Node):
34 
35  def __init__(self) -> None:
36  super().__init__(node_name='spin_tester', namespace='')
37  self.costmap_qoscostmap_qos = QoSProfile(
38  durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
39  reliability=QoSReliabilityPolicy.RELIABLE,
40  history=QoSHistoryPolicy.KEEP_LAST,
41  depth=1,
42  )
43  self.action_clientaction_client = ActionClient(self, Spin, 'spin')
44  self.costmap_pubcostmap_pub = self.create_publisher(
45  Costmap, 'local_costmap/costmap_raw', self.costmap_qoscostmap_qos)
46  self.footprint_pubfootprint_pub = self.create_publisher(
47  PolygonStamped, 'local_costmap/published_footprint', 10)
48  self.goal_handlegoal_handle: Optional[ClientGoalHandle[
49  Spin.Goal, Spin.Result, Spin.Feedback]] = None
50  self.action_resultaction_result = Spin.Result()
51 
52  def sendCommand(self, command: Spin.Goal) -> bool:
53  self.info_msginfo_msg('Sending goal request...')
54  self.goal_futuregoal_future = self.action_clientaction_client.send_goal_async(command)
55  try:
56  rclpy.spin_until_future_complete(self, self.goal_futuregoal_future)
57  self.goal_handlegoal_handle = self.goal_futuregoal_future.result()
58  except Exception as e: # noqa: B902
59  self.error_msgerror_msg(f'Service call failed {e!r}')
60 
61  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
62  self.error_msgerror_msg('Goal rejected')
63  return False
64 
65  self.info_msginfo_msg('Goal accepted')
66  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
67 
68  self.info_msginfo_msg("Waiting for 'spin' action to complete")
69  try:
70  rclpy.spin_until_future_complete(self, self.result_futureresult_future)
71  status = self.result_futureresult_future.result().status # type: ignore[union-attr]
72  result = self.result_futureresult_future.result().result # type: ignore[union-attr]
73  self.action_resultaction_result = result
74  except Exception as e: # noqa: B902
75  self.error_msgerror_msg(f'Service call failed {e!r}')
76 
77  if status != GoalStatus.STATUS_SUCCEEDED:
78  self.info_msginfo_msg(f'Goal failed with status code: {status}')
79  return False
80  if self.action_resultaction_result.error_code == 0:
81  self.info_msginfo_msg('Spin was successful!')
82  return True
83  self.info_msginfo_msg('Spin failed to meet target!')
84  return False
85 
86  def sendAndPreemptWithInvertedCommand(self, command: Spin.Goal) -> bool:
87  # Send initial goal
88  self.info_msginfo_msg('Sending goal request...')
89  self.goal_futuregoal_future = self.action_clientaction_client.send_goal_async(command)
90  try:
91  rclpy.spin_until_future_complete(self, self.goal_futuregoal_future)
92  self.goal_handlegoal_handle = self.goal_futuregoal_future.result()
93  except Exception as e: # noqa: B902
94  self.error_msgerror_msg(f'Service call failed {e!r}')
95 
96  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
97  self.error_msgerror_msg('Goal rejected')
98  return False
99 
100  self.info_msginfo_msg('Goal accepted')
101  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
102 
103  # Now preempt it
104  time.sleep(0.5)
105  self.info_msginfo_msg('Sending preemption request...')
106  command.target_yaw = -command.target_yaw
107  self.goal_futuregoal_future = self.action_clientaction_client.send_goal_async(command)
108  try:
109  rclpy.spin_until_future_complete(self, self.goal_futuregoal_future)
110  self.goal_handlegoal_handle = self.goal_futuregoal_future.result()
111  except Exception as e: # noqa: B902
112  self.error_msgerror_msg(f'Service call failed {e!r}')
113 
114  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
115  self.error_msgerror_msg('Preemption rejected')
116  return False
117 
118  self.info_msginfo_msg('Preemption accepted')
119  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
120 
121  # Wait for new goal to complete
122  self.info_msginfo_msg("Waiting for 'spin' action Preemption to complete")
123  try:
124  rclpy.spin_until_future_complete(self, self.result_futureresult_future)
125  status = self.result_futureresult_future.result().status # type: ignore[union-attr]
126  result = self.result_futureresult_future.result().result # type: ignore[union-attr]
127  self.action_resultaction_result = result
128  except Exception as e: # noqa: B902
129  self.error_msgerror_msg(f'Service call failed {e!r}')
130 
131  if status != GoalStatus.STATUS_SUCCEEDED:
132  self.info_msginfo_msg(f'Goal failed with status code: {status}')
133  return False
134  if self.action_resultaction_result.error_code == 0:
135  self.info_msginfo_msg('Spin was successful!')
136  return True
137  self.info_msginfo_msg('Spin failed to meet target!')
138  return False
139 
140  def sendAndCancelCommand(self, command: Spin.Goal) -> bool:
141  self.info_msginfo_msg('Sending goal request...')
142  self.goal_futuregoal_future = self.action_clientaction_client.send_goal_async(command)
143  try:
144  rclpy.spin_until_future_complete(self, self.goal_futuregoal_future)
145  self.goal_handlegoal_handle = self.goal_futuregoal_future.result()
146  except Exception as e: # noqa: B902
147  self.error_msgerror_msg(f'Service call failed {e!r}')
148 
149  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
150  self.error_msgerror_msg('Goal rejected')
151  return False
152 
153  self.info_msginfo_msg('Goal accepted')
154  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
155 
156  # Now cancel it
157  time.sleep(0.5)
158  cancel_future = self.goal_handlegoal_handle.cancel_goal_async()
159  rclpy.spin_until_future_complete(self, cancel_future)
160  rclpy.spin_until_future_complete(self, self.result_futureresult_future)
161  status = self.result_futureresult_future.result().status # type: ignore[union-attr]
162  if status != GoalStatus.STATUS_CANCELED:
163  self.info_msginfo_msg(f'Goal failed with status code: {status}')
164  return False
165  else:
166  self.info_msginfo_msg('Goal was canceled successfully')
167  return True
168 
169  def sendFreeCostmap(self) -> None:
170  costmap_msg = Costmap()
171  costmap_msg.header.frame_id = 'odom'
172  costmap_msg.header.stamp = self.get_clock().now().to_msg()
173  costmap_msg.metadata.resolution = 0.05
174  costmap_msg.metadata.size_x = 100
175  costmap_msg.metadata.size_y = 100
176  costmap_msg.metadata.origin.position.x = -2.5
177  costmap_msg.metadata.origin.position.y = -2.5
178  costmap_msg.data = [0] * (costmap_msg.metadata.size_x * costmap_msg.metadata.size_y)
179  self.costmap_pubcostmap_pub.publish(costmap_msg)
180 
181  footprint_msg = PolygonStamped()
182  footprint_msg.header.frame_id = 'odom'
183  footprint_msg.header.stamp = self.get_clock().now().to_msg()
184  footprint_msg.polygon.points = [
185  Point32(x=0.1, y=0.1, z=0.0),
186  Point32(x=0.1, y=-0.1, z=0.0),
187  Point32(x=-0.1, y=-0.1, z=0.0),
188  Point32(x=-0.1, y=0.1, z=0.0)
189  ]
190  self.footprint_pubfootprint_pub.publish(footprint_msg)
191 
192  def sendOccupiedCostmap(self) -> None:
193  costmap_msg = Costmap()
194  costmap_msg.header.frame_id = 'odom'
195  costmap_msg.header.stamp = self.get_clock().now().to_msg()
196  costmap_msg.metadata.resolution = 0.05
197  costmap_msg.metadata.size_x = 100
198  costmap_msg.metadata.size_y = 100
199  costmap_msg.metadata.origin.position.x = -2.5
200  costmap_msg.metadata.origin.position.y = -2.5
201  costmap_msg.data = [254] * (costmap_msg.metadata.size_x * costmap_msg.metadata.size_y)
202  self.costmap_pubcostmap_pub.publish(costmap_msg)
203 
204  footprint_msg = PolygonStamped()
205  footprint_msg.header.frame_id = 'odom'
206  footprint_msg.header.stamp = self.get_clock().now().to_msg()
207  footprint_msg.polygon.points = [
208  Point32(x=0.1, y=0.1, z=0.0),
209  Point32(x=0.1, y=-0.1, z=0.0),
210  Point32(x=-0.1, y=-0.1, z=0.0),
211  Point32(x=-0.1, y=0.1, z=0.0)
212  ]
213  self.footprint_pubfootprint_pub.publish(footprint_msg)
214 
215  def run(self) -> bool:
216  while not self.action_clientaction_client.wait_for_server(timeout_sec=1.0):
217  self.info_msginfo_msg("'spin' action server not available, waiting...")
218 
219  # Test A: Send without valid costmap
220  action_request = Spin.Goal()
221  action_request.target_yaw = 0.349066 # 20 deg
222  action_request.time_allowance = Duration(seconds=5).to_msg()
223  cmd1 = self.sendCommandsendCommand(action_request)
224 
225  if cmd1:
226  self.error_msgerror_msg('Test A failed: Passed while costmap was not available!')
227  return not cmd1
228  else:
229  self.info_msginfo_msg('Test A passed')
230 
231  # Test B: Send with valid costmap and spin a couple of times
232  self.sendFreeCostmapsendFreeCostmap()
233  time.sleep(1)
234  cmd1 = self.sendCommandsendCommand(action_request)
235  action_request.target_yaw = -3.49066 # -200 deg
236  cmd2 = self.sendCommandsendCommand(action_request)
237 
238  if not cmd1 or not cmd2:
239  self.error_msgerror_msg('Test B failed: Failed to spin with valid costmap!')
240  return not cmd1 or not cmd2
241 
242  action_request.target_yaw = 0.349066 # 20 deg
243  cmd_preempt = self.sendAndPreemptWithInvertedCommandsendAndPreemptWithInvertedCommand(action_request)
244  if not cmd_preempt:
245  self.error_msgerror_msg('Test B failed: Failed to preempt and invert command!')
246  return not cmd_preempt
247 
248  cmd_cancel = self.sendAndCancelCommandsendAndCancelCommand(action_request)
249  if not cmd_cancel:
250  self.error_msgerror_msg('Test B failed: Failed to cancel command!')
251  return not cmd_cancel
252  else:
253  self.info_msginfo_msg('Test B passed')
254 
255  # Test C: Send with impossible command in time allowance
256  action_request.time_allowance = Duration(seconds=0.1).to_msg()
257  cmd3 = self.sendCommandsendCommand(action_request)
258  if cmd3:
259  self.error_msgerror_msg('Test C failed: Passed while impoossible timing requested!')
260  return not cmd3
261  else:
262  self.info_msginfo_msg('Test C passed')
263 
264  # Test D: Send with lethal costmap and spin
265  action_request.time_allowance = Duration(seconds=5).to_msg()
266  self.sendOccupiedCostmapsendOccupiedCostmap()
267  time.sleep(1)
268  cmd4 = self.sendCommandsendCommand(action_request)
269  if cmd4:
270  self.error_msgerror_msg('Test D failed: Passed while costmap was not lethal!')
271  return not cmd4
272  else:
273  self.info_msginfo_msg('Test D passed')
274  return True
275 
276  def shutdown(self) -> None:
277  self.info_msginfo_msg('Shutting down')
278 
279  self.action_clientaction_client.destroy()
280  self.info_msginfo_msg('Destroyed backup action client')
281 
282  transition_service = 'lifecycle_manager_navigation/manage_nodes'
283  mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
284  while not mgr_client.wait_for_service(timeout_sec=1.0):
285  self.info_msginfo_msg(f'{transition_service} service not available, waiting...')
286 
287  req = ManageLifecycleNodes.Request()
288  req.command = ManageLifecycleNodes.Request().SHUTDOWN
289  future = mgr_client.call_async(req)
290  try:
291  rclpy.spin_until_future_complete(self, future)
292  future.result()
293  except Exception as e: # noqa: B902
294  self.error_msgerror_msg(f'{transition_service} service call failed {e!r}')
295 
296  self.info_msginfo_msg(f'{transition_service} finished')
297 
298  def info_msg(self, msg: str) -> None:
299  self.get_logger().info(msg)
300 
301  def warn_msg(self, msg: str) -> None:
302  self.get_logger().warn(msg)
303 
304  def error_msg(self, msg: str) -> None:
305  self.get_logger().error(msg)
306 
307 
308 def main(argv: list[str] = sys.argv[1:]): # type: ignore[no-untyped-def]
309  rclpy.init()
310  time.sleep(10)
311  test = SpinTest()
312  result = test.run()
313  test.shutdown()
314 
315  if not result:
316  test.info_msg('Exiting failed')
317  exit(1)
318  else:
319  test.info_msg('Exiting passed')
320  exit(0)
321 
322 
323 if __name__ == '__main__':
324  main()
None sendOccupiedCostmap(self)
Definition: spin_tester.py:192
bool sendAndCancelCommand(self, Spin.Goal command)
Definition: spin_tester.py:140
None error_msg(self, str msg)
Definition: spin_tester.py:304
None sendFreeCostmap(self)
Definition: spin_tester.py:169
bool sendCommand(self, Spin.Goal command)
Definition: spin_tester.py:52
None info_msg(self, str msg)
Definition: spin_tester.py:298
bool sendAndPreemptWithInvertedCommand(self, Spin.Goal command)
Definition: spin_tester.py:86