Nav2 Navigation Stack - kilted  kilted
ROS 2 Navigation Stack
backup_tester.py
1 #! /usr/bin/env python3
2 # Copyright 2024 Open Navigation LLC
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 
16 import sys
17 import time
18 from typing import Optional
19 
20 from action_msgs.msg import GoalStatus
21 from geometry_msgs.msg import Point32, PolygonStamped
22 from nav2_msgs.action import BackUp
23 from nav2_msgs.msg import Costmap
24 from nav2_msgs.srv import ManageLifecycleNodes
25 import rclpy
26 from rclpy.action import ActionClient # type: ignore[attr-defined]
27 from rclpy.action.client import ClientGoalHandle
28 from rclpy.duration import Duration
29 from rclpy.node import Node
30 from rclpy.qos import QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy
31 
32 
33 class BackupTest(Node):
34 
35  def __init__(self) -> None:
36  super().__init__(node_name='backup_tester', namespace='')
37  self.costmap_qoscostmap_qos = QoSProfile(
38  durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
39  reliability=QoSReliabilityPolicy.RELIABLE,
40  history=QoSHistoryPolicy.KEEP_LAST,
41  depth=1,
42  )
43  self.action_clientaction_client = ActionClient(self, BackUp, 'backup')
44  self.costmap_pubcostmap_pub = self.create_publisher(
45  Costmap, 'local_costmap/costmap_raw', self.costmap_qoscostmap_qos)
46  self.footprint_pubfootprint_pub = self.create_publisher(
47  PolygonStamped, 'local_costmap/published_footprint', 10)
48  self.goal_handlegoal_handle: Optional[ClientGoalHandle[
49  BackUp.Goal, BackUp.Result, BackUp.Feedback]] = None
50  self.action_resultaction_result = BackUp.Result()
51 
52  def sendCommand(self, command: BackUp.Goal) -> bool:
53  self.info_msginfo_msg('Sending goal request...')
54  self.goal_futuregoal_future = self.action_clientaction_client.send_goal_async(command)
55  try:
56  rclpy.spin_until_future_complete(self, self.goal_futuregoal_future)
57  self.goal_handlegoal_handle = self.goal_futuregoal_future.result()
58  except Exception as e: # noqa: B902
59  self.error_msgerror_msg(f'Service call failed {e!r}')
60 
61  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
62  self.error_msgerror_msg('Goal rejected')
63  return False
64 
65  self.info_msginfo_msg('Goal accepted')
66  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
67 
68  self.info_msginfo_msg("Waiting for 'Backup' action to complete")
69  try:
70  rclpy.spin_until_future_complete(self, self.result_futureresult_future)
71  status = self.result_futureresult_future.result().status # type: ignore[union-attr]
72  result = self.result_futureresult_future.result().result # type: ignore[union-attr]
73  self.action_resultaction_result = result
74  except Exception as e: # noqa: B902
75  self.error_msgerror_msg(f'Service call failed {e!r}')
76 
77  if status != GoalStatus.STATUS_SUCCEEDED:
78  self.info_msginfo_msg(f'Goal failed with status code: {status}')
79  return False
80  if self.action_resultaction_result.error_code == 0:
81  self.info_msginfo_msg('Backup was successful!')
82  return True
83  self.info_msginfo_msg('Backup failed to meet target!')
84  return False
85 
86  def sendAndPreemptWithFasterCommand(self, command: BackUp.Goal) -> bool:
87  # Send initial goal
88  self.info_msginfo_msg('Sending goal request...')
89  self.goal_futuregoal_future = self.action_clientaction_client.send_goal_async(command)
90  try:
91  rclpy.spin_until_future_complete(self, self.goal_futuregoal_future)
92  self.goal_handlegoal_handle = self.goal_futuregoal_future.result()
93  except Exception as e: # noqa: B902
94  self.error_msgerror_msg(f'Service call failed {e!r}')
95 
96  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
97  self.error_msgerror_msg('Goal rejected')
98  return False
99 
100  self.info_msginfo_msg('Goal accepted')
101  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
102 
103  # Now preempt it
104  time.sleep(0.5)
105  self.info_msginfo_msg('Sending preemption request...')
106  command.speed = 0.2
107  self.goal_futuregoal_future = self.action_clientaction_client.send_goal_async(command)
108  try:
109  rclpy.spin_until_future_complete(self, self.goal_futuregoal_future)
110  self.goal_handlegoal_handle = self.goal_futuregoal_future.result()
111  except Exception as e: # noqa: B902
112  self.error_msgerror_msg(f'Service call failed {e!r}')
113 
114  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
115  self.error_msgerror_msg('Preemption rejected')
116  return False
117 
118  self.info_msginfo_msg('Preemption accepted')
119  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
120 
121  # Wait for new goal to complete
122  self.info_msginfo_msg("Waiting for 'backup' action Preemption to complete")
123  try:
124  rclpy.spin_until_future_complete(self, self.result_futureresult_future)
125  status = self.result_futureresult_future.result().status # type: ignore[union-attr]
126  result = self.result_futureresult_future.result().result # type: ignore[union-attr]
127  self.action_resultaction_result = result
128  except Exception as e: # noqa: B902
129  self.error_msgerror_msg(f'Service call failed {e!r}')
130 
131  if status != GoalStatus.STATUS_SUCCEEDED:
132  self.info_msginfo_msg(f'Goal failed with status code: {status}')
133  return False
134  if self.action_resultaction_result.error_code == 0:
135  self.info_msginfo_msg('Backup was successful!')
136  return True
137  self.info_msginfo_msg('Backup failed to meet target!')
138  return False
139 
140  def sendAndCancelCommand(self, command: BackUp.Goal) -> bool:
141  self.info_msginfo_msg('Sending goal request...')
142  self.goal_futuregoal_future = self.action_clientaction_client.send_goal_async(command)
143  try:
144  rclpy.spin_until_future_complete(self, self.goal_futuregoal_future)
145  self.goal_handlegoal_handle = self.goal_futuregoal_future.result()
146  except Exception as e: # noqa: B902
147  self.error_msgerror_msg(f'Service call failed {e!r}')
148 
149  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
150  self.error_msgerror_msg('Goal rejected')
151  return False
152 
153  self.info_msginfo_msg('Goal accepted')
154  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
155 
156  # Now cancel it
157  time.sleep(0.5)
158  cancel_future = self.goal_handlegoal_handle.cancel_goal_async()
159  rclpy.spin_until_future_complete(self, cancel_future)
160  rclpy.spin_until_future_complete(self, self.result_futureresult_future)
161  status = self.result_futureresult_future.result().status # type: ignore[union-attr]
162  if status != GoalStatus.STATUS_CANCELED:
163  self.info_msginfo_msg(f'Goal failed with status code: {status}')
164  return False
165  else:
166  self.info_msginfo_msg('Goal was canceled successfully')
167  return True
168 
169  def sendFreeCostmap(self) -> None:
170  costmap_msg = Costmap()
171  costmap_msg.header.frame_id = 'odom'
172  costmap_msg.header.stamp = self.get_clock().now().to_msg()
173  costmap_msg.metadata.resolution = 0.05
174  costmap_msg.metadata.size_x = 100
175  costmap_msg.metadata.size_y = 100
176  costmap_msg.metadata.origin.position.x = -2.5
177  costmap_msg.metadata.origin.position.y = -2.5
178  costmap_msg.data = [0] * (costmap_msg.metadata.size_x * costmap_msg.metadata.size_y)
179  self.costmap_pubcostmap_pub.publish(costmap_msg)
180 
181  footprint_msg = PolygonStamped()
182  footprint_msg.header.frame_id = 'odom'
183  footprint_msg.header.stamp = self.get_clock().now().to_msg()
184  footprint_msg.polygon.points = [
185  Point32(x=0.1, y=0.1, z=0.0),
186  Point32(x=0.1, y=-0.1, z=0.0),
187  Point32(x=-0.1, y=-0.1, z=0.0),
188  Point32(x=-0.1, y=0.1, z=0.0)
189  ]
190  self.footprint_pubfootprint_pub.publish(footprint_msg)
191 
192  def sendOccupiedCostmap(self) -> None:
193  costmap_msg = Costmap()
194  costmap_msg.header.frame_id = 'odom'
195  costmap_msg.header.stamp = self.get_clock().now().to_msg()
196  costmap_msg.metadata.resolution = 0.05
197  costmap_msg.metadata.size_x = 100
198  costmap_msg.metadata.size_y = 100
199  costmap_msg.metadata.origin.position.x = -2.5
200  costmap_msg.metadata.origin.position.y = -2.5
201  costmap_msg.data = [254] * (costmap_msg.metadata.size_x * costmap_msg.metadata.size_y)
202  self.costmap_pubcostmap_pub.publish(costmap_msg)
203 
204  footprint_msg = PolygonStamped()
205  footprint_msg.header.frame_id = 'odom'
206  footprint_msg.header.stamp = self.get_clock().now().to_msg()
207  footprint_msg.polygon.points = [
208  Point32(x=0.1, y=0.1, z=0.0),
209  Point32(x=0.1, y=-0.1, z=0.0),
210  Point32(x=-0.1, y=-0.1, z=0.0),
211  Point32(x=-0.1, y=0.1, z=0.0)
212  ]
213  self.footprint_pubfootprint_pub.publish(footprint_msg)
214 
215  def run(self) -> bool:
216  while not self.action_clientaction_client.wait_for_server(timeout_sec=1.0):
217  self.info_msginfo_msg("'Backup' action server not available, waiting...")
218 
219  # Test A: Send without valid costmap
220  action_request = BackUp.Goal()
221  action_request.speed = 0.15
222  action_request.target.x = 0.15
223  action_request.time_allowance = Duration(seconds=5).to_msg()
224  cmd1 = self.sendCommandsendCommand(action_request)
225 
226  if cmd1:
227  self.error_msgerror_msg('Test A failed: Passed while costmap was not available!')
228  return not cmd1
229  else:
230  self.info_msginfo_msg('Test A passed')
231 
232  # Test B: Send with valid costmap and Backup a couple of times
233  self.sendFreeCostmapsendFreeCostmap()
234  time.sleep(1)
235  cmd1 = self.sendCommandsendCommand(action_request)
236  action_request.target.x = 0.1
237  cmd2 = self.sendCommandsendCommand(action_request)
238 
239  if not cmd1 or not cmd2:
240  self.error_msgerror_msg('Test B failed: Failed to Backup with valid costmap!')
241  return not cmd1 or not cmd2
242 
243  action_request.target.x = 0.5
244  cmd_preempt = self.sendAndPreemptWithFasterCommandsendAndPreemptWithFasterCommand(action_request)
245  if not cmd_preempt:
246  self.error_msgerror_msg('Test B failed: Failed to preempt and invert command!')
247  return not cmd_preempt
248 
249  cmd_cancel = self.sendAndCancelCommandsendAndCancelCommand(action_request)
250  if not cmd_cancel:
251  self.error_msgerror_msg('Test B failed: Failed to cancel command!')
252  return not cmd_cancel
253  else:
254  self.info_msginfo_msg('Test B passed')
255 
256  # Test C: Send with impossible command in time allowance & target
257  action_request.time_allowance = Duration(seconds=0.1).to_msg()
258  cmd3 = self.sendCommandsendCommand(action_request)
259  if cmd3:
260  self.error_msgerror_msg('Test C failed: Passed while impoossible timing requested!')
261  return not cmd3
262 
263  action_request.target.y = 0.5
264  cmd_invalid_target = self.sendCommandsendCommand(action_request)
265  if cmd_invalid_target:
266  self.error_msgerror_msg('Test C failed: Passed while impoossible target requested!')
267  return not cmd_invalid_target
268  else:
269  action_request.target.y = 0.0
270  self.info_msginfo_msg('Test C passed')
271 
272  # Test D: Send with lethal costmap and Backup
273  action_request.time_allowance = Duration(seconds=5).to_msg()
274  self.sendOccupiedCostmapsendOccupiedCostmap()
275  time.sleep(1)
276  cmd4 = self.sendCommandsendCommand(action_request)
277  if cmd4:
278  self.error_msgerror_msg('Test D failed: Passed while costmap was not lethal!')
279  return not cmd4
280  else:
281  self.info_msginfo_msg('Test D passed')
282  return True
283 
284  def shutdown(self) -> None:
285  self.info_msginfo_msg('Shutting down')
286 
287  self.action_clientaction_client.destroy()
288  self.info_msginfo_msg('Destroyed backup action client')
289 
290  transition_service = 'lifecycle_manager_navigation/manage_nodes'
291  mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
292  while not mgr_client.wait_for_service(timeout_sec=1.0):
293  self.info_msginfo_msg(f'{transition_service} service not available, waiting...')
294 
295  req = ManageLifecycleNodes.Request()
296  req.command = ManageLifecycleNodes.Request().SHUTDOWN
297  future = mgr_client.call_async(req)
298  try:
299  rclpy.spin_until_future_complete(self, future)
300  future.result()
301  except Exception as e: # noqa: B902
302  self.error_msgerror_msg(f'{transition_service} service call failed {e!r}')
303 
304  self.info_msginfo_msg(f'{transition_service} finished')
305 
306  def info_msg(self, msg: str) -> None:
307  self.get_logger().info(msg)
308 
309  def warn_msg(self, msg: str) -> None:
310  self.get_logger().warn(msg)
311 
312  def error_msg(self, msg: str) -> None:
313  self.get_logger().error(msg)
314 
315 
316 def main(argv: list[str] = sys.argv[1:]): # type: ignore[no-untyped-def]
317  rclpy.init()
318  time.sleep(10)
319  test = BackupTest()
320  result = test.run()
321  test.shutdown()
322 
323  if not result:
324  test.info_msg('Exiting failed')
325  exit(1)
326  else:
327  test.info_msg('Exiting passed')
328  exit(0)
329 
330 
331 if __name__ == '__main__':
332  main()
bool sendAndPreemptWithFasterCommand(self, BackUp.Goal command)
bool sendAndCancelCommand(self, BackUp.Goal command)
bool sendCommand(self, BackUp.Goal command)
None error_msg(self, str msg)
None info_msg(self, str msg)