Nav2 Navigation Stack - rolling  main
ROS 2 Navigation Stack
backup_tester.py
1 #! /usr/bin/env python3
2 # Copyright 2024 Open Navigation LLC
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 
16 import sys
17 import time
18 from typing import Optional
19 
20 from action_msgs.msg import GoalStatus
21 from geometry_msgs.msg import Point32, PolygonStamped
22 from nav2_msgs.action import BackUp
23 from nav2_msgs.msg import Costmap
24 from nav2_msgs.srv import ManageLifecycleNodes
25 import rclpy
26 from rclpy.action import ActionClient
27 from rclpy.action.client import ClientGoalHandle
28 from rclpy.client import Client
29 from rclpy.duration import Duration
30 from rclpy.node import Node
31 from rclpy.qos import QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy
32 
33 
34 class BackupTest(Node):
35 
36  def __init__(self) -> None:
37  super().__init__(node_name='backup_tester', namespace='')
38  self.costmap_qoscostmap_qos = QoSProfile(
39  durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
40  reliability=QoSReliabilityPolicy.RELIABLE,
41  history=QoSHistoryPolicy.KEEP_LAST,
42  depth=1,
43  )
44  self.action_client: ActionClient[
45  BackUp.Goal,
46  BackUp.Result,
47  BackUp.Feedback
48  ] = ActionClient(self, BackUp, 'backup')
49  self.costmap_pubcostmap_pub = self.create_publisher(
50  Costmap, 'local_costmap/costmap_raw', self.costmap_qoscostmap_qos)
51  self.footprint_pubfootprint_pub = self.create_publisher(
52  PolygonStamped, 'local_costmap/published_footprint', 10)
53  self.goal_handlegoal_handle: Optional[ClientGoalHandle[
54  BackUp.Goal, BackUp.Result, BackUp.Feedback]] = None
55  self.action_resultaction_result = BackUp.Result()
56 
57  def sendCommand(self, command: BackUp.Goal) -> bool:
58  self.info_msginfo_msg('Sending goal request...')
59  self.goal_futuregoal_future = self.action_client.send_goal_async(command)
60  try:
61  rclpy.spin_until_future_complete(self, self.goal_futuregoal_future)
62  self.goal_handlegoal_handle = self.goal_futuregoal_future.result()
63  except Exception as e: # noqa: B902
64  self.error_msgerror_msg(f'Service call failed {e!r}')
65 
66  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
67  self.error_msgerror_msg('Goal rejected')
68  return False
69 
70  self.info_msginfo_msg('Goal accepted')
71  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
72 
73  self.info_msginfo_msg("Waiting for 'Backup' action to complete")
74  try:
75  rclpy.spin_until_future_complete(self, self.result_futureresult_future)
76  status = self.result_futureresult_future.result().status # type: ignore[union-attr]
77  result = self.result_futureresult_future.result().result # type: ignore[union-attr]
78  self.action_resultaction_result = result
79  except Exception as e: # noqa: B902
80  self.error_msgerror_msg(f'Service call failed {e!r}')
81 
82  if status != GoalStatus.STATUS_SUCCEEDED:
83  self.info_msginfo_msg(f'Goal failed with status code: {status}')
84  return False
85  if self.action_resultaction_result.error_code == 0:
86  self.info_msginfo_msg('Backup was successful!')
87  return True
88  self.info_msginfo_msg('Backup failed to meet target!')
89  return False
90 
91  def sendAndPreemptWithFasterCommand(self, command: BackUp.Goal) -> bool:
92  # Send initial goal
93  self.info_msginfo_msg('Sending goal request...')
94  self.goal_futuregoal_future = self.action_client.send_goal_async(command)
95  try:
96  rclpy.spin_until_future_complete(self, self.goal_futuregoal_future)
97  self.goal_handlegoal_handle = self.goal_futuregoal_future.result()
98  except Exception as e: # noqa: B902
99  self.error_msgerror_msg(f'Service call failed {e!r}')
100 
101  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
102  self.error_msgerror_msg('Goal rejected')
103  return False
104 
105  self.info_msginfo_msg('Goal accepted')
106  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
107 
108  # Now preempt it
109  time.sleep(0.5)
110  self.info_msginfo_msg('Sending preemption request...')
111  command.speed = 0.2
112  self.goal_futuregoal_future = self.action_client.send_goal_async(command)
113  try:
114  rclpy.spin_until_future_complete(self, self.goal_futuregoal_future)
115  self.goal_handlegoal_handle = self.goal_futuregoal_future.result()
116  except Exception as e: # noqa: B902
117  self.error_msgerror_msg(f'Service call failed {e!r}')
118 
119  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
120  self.error_msgerror_msg('Preemption rejected')
121  return False
122 
123  self.info_msginfo_msg('Preemption accepted')
124  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
125 
126  # Wait for new goal to complete
127  self.info_msginfo_msg("Waiting for 'backup' action Preemption to complete")
128  try:
129  rclpy.spin_until_future_complete(self, self.result_futureresult_future)
130  status = self.result_futureresult_future.result().status # type: ignore[union-attr]
131  result = self.result_futureresult_future.result().result # type: ignore[union-attr]
132  self.action_resultaction_result = result
133  except Exception as e: # noqa: B902
134  self.error_msgerror_msg(f'Service call failed {e!r}')
135 
136  if status != GoalStatus.STATUS_SUCCEEDED:
137  self.info_msginfo_msg(f'Goal failed with status code: {status}')
138  return False
139  if self.action_resultaction_result.error_code == 0:
140  self.info_msginfo_msg('Backup was successful!')
141  return True
142  self.info_msginfo_msg('Backup failed to meet target!')
143  return False
144 
145  def sendAndCancelCommand(self, command: BackUp.Goal) -> bool:
146  self.info_msginfo_msg('Sending goal request...')
147  self.goal_futuregoal_future = self.action_client.send_goal_async(command)
148  try:
149  rclpy.spin_until_future_complete(self, self.goal_futuregoal_future)
150  self.goal_handlegoal_handle = self.goal_futuregoal_future.result()
151  except Exception as e: # noqa: B902
152  self.error_msgerror_msg(f'Service call failed {e!r}')
153 
154  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
155  self.error_msgerror_msg('Goal rejected')
156  return False
157 
158  self.info_msginfo_msg('Goal accepted')
159  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
160 
161  # Now cancel it
162  time.sleep(0.5)
163  cancel_future = self.goal_handlegoal_handle.cancel_goal_async()
164  rclpy.spin_until_future_complete(self, cancel_future)
165  rclpy.spin_until_future_complete(self, self.result_futureresult_future)
166  status = self.result_futureresult_future.result().status # type: ignore[union-attr]
167  if status != GoalStatus.STATUS_CANCELED:
168  self.info_msginfo_msg(f'Goal failed with status code: {status}')
169  return False
170  else:
171  self.info_msginfo_msg('Goal was canceled successfully')
172  return True
173 
174  def sendFreeCostmap(self) -> None:
175  costmap_msg = Costmap()
176  costmap_msg.header.frame_id = 'odom'
177  costmap_msg.header.stamp = self.get_clock().now().to_msg()
178  costmap_msg.metadata.resolution = 0.05
179  costmap_msg.metadata.size_x = 100
180  costmap_msg.metadata.size_y = 100
181  costmap_msg.metadata.origin.position.x = -2.5
182  costmap_msg.metadata.origin.position.y = -2.5
183  costmap_msg.data = [0] * (costmap_msg.metadata.size_x * costmap_msg.metadata.size_y)
184  self.costmap_pubcostmap_pub.publish(costmap_msg)
185 
186  footprint_msg = PolygonStamped()
187  footprint_msg.header.frame_id = 'odom'
188  footprint_msg.header.stamp = self.get_clock().now().to_msg()
189  footprint_msg.polygon.points = [
190  Point32(x=0.1, y=0.1, z=0.0),
191  Point32(x=0.1, y=-0.1, z=0.0),
192  Point32(x=-0.1, y=-0.1, z=0.0),
193  Point32(x=-0.1, y=0.1, z=0.0)
194  ]
195  self.footprint_pubfootprint_pub.publish(footprint_msg)
196 
197  def sendOccupiedCostmap(self) -> None:
198  costmap_msg = Costmap()
199  costmap_msg.header.frame_id = 'odom'
200  costmap_msg.header.stamp = self.get_clock().now().to_msg()
201  costmap_msg.metadata.resolution = 0.05
202  costmap_msg.metadata.size_x = 100
203  costmap_msg.metadata.size_y = 100
204  costmap_msg.metadata.origin.position.x = -2.5
205  costmap_msg.metadata.origin.position.y = -2.5
206  costmap_msg.data = [254] * (costmap_msg.metadata.size_x * costmap_msg.metadata.size_y)
207  self.costmap_pubcostmap_pub.publish(costmap_msg)
208 
209  footprint_msg = PolygonStamped()
210  footprint_msg.header.frame_id = 'odom'
211  footprint_msg.header.stamp = self.get_clock().now().to_msg()
212  footprint_msg.polygon.points = [
213  Point32(x=0.1, y=0.1, z=0.0),
214  Point32(x=0.1, y=-0.1, z=0.0),
215  Point32(x=-0.1, y=-0.1, z=0.0),
216  Point32(x=-0.1, y=0.1, z=0.0)
217  ]
218  self.footprint_pubfootprint_pub.publish(footprint_msg)
219 
220  def run(self) -> bool:
221  while not self.action_client.wait_for_server(timeout_sec=1.0):
222  self.info_msginfo_msg("'Backup' action server not available, waiting...")
223 
224  # Test A: Send without valid costmap
225  action_request = BackUp.Goal()
226  action_request.speed = 0.15
227  action_request.target.x = 0.15
228  action_request.time_allowance = Duration(seconds=5).to_msg()
229  cmd1 = self.sendCommandsendCommand(action_request)
230 
231  if cmd1:
232  self.error_msgerror_msg('Test A failed: Passed while costmap was not available!')
233  return not cmd1
234  else:
235  self.info_msginfo_msg('Test A passed')
236 
237  # Test B: Send with valid costmap and Backup a couple of times
238  self.sendFreeCostmapsendFreeCostmap()
239  time.sleep(1)
240  cmd1 = self.sendCommandsendCommand(action_request)
241  action_request.target.x = 0.1
242  cmd2 = self.sendCommandsendCommand(action_request)
243 
244  if not cmd1 or not cmd2:
245  self.error_msgerror_msg('Test B failed: Failed to Backup with valid costmap!')
246  return not cmd1 or not cmd2
247 
248  action_request.target.x = 0.5
249  cmd_preempt = self.sendAndPreemptWithFasterCommandsendAndPreemptWithFasterCommand(action_request)
250  if not cmd_preempt:
251  self.error_msgerror_msg('Test B failed: Failed to preempt and invert command!')
252  return not cmd_preempt
253 
254  cmd_cancel = self.sendAndCancelCommandsendAndCancelCommand(action_request)
255  if not cmd_cancel:
256  self.error_msgerror_msg('Test B failed: Failed to cancel command!')
257  return not cmd_cancel
258  else:
259  self.info_msginfo_msg('Test B passed')
260 
261  # Test C: Send with impossible command in time allowance & target
262  action_request.time_allowance = Duration(seconds=0.1).to_msg()
263  cmd3 = self.sendCommandsendCommand(action_request)
264  if cmd3:
265  self.error_msgerror_msg('Test C failed: Passed while impoossible timing requested!')
266  return not cmd3
267 
268  action_request.target.y = 0.5
269  cmd_invalid_target = self.sendCommandsendCommand(action_request)
270  if cmd_invalid_target:
271  self.error_msgerror_msg('Test C failed: Passed while impoossible target requested!')
272  return not cmd_invalid_target
273  else:
274  action_request.target.y = 0.0
275  self.info_msginfo_msg('Test C passed')
276 
277  # Test D: Send with lethal costmap and Backup
278  action_request.time_allowance = Duration(seconds=5).to_msg()
279  self.sendOccupiedCostmapsendOccupiedCostmap()
280  time.sleep(1)
281  cmd4 = self.sendCommandsendCommand(action_request)
282  if cmd4:
283  self.error_msgerror_msg('Test D failed: Passed while costmap was not lethal!')
284  return not cmd4
285  else:
286  self.info_msginfo_msg('Test D passed')
287  return True
288 
289  def shutdown(self) -> None:
290  self.info_msginfo_msg('Shutting down')
291 
292  self.action_client.destroy()
293  self.info_msginfo_msg('Destroyed backup action client')
294 
295  transition_service = 'lifecycle_manager_navigation/manage_nodes'
296  mgr_client: Client[ManageLifecycleNodes.Request, ManageLifecycleNodes.Response] \
297  = self.create_client(ManageLifecycleNodes, transition_service)
298  while not mgr_client.wait_for_service(timeout_sec=1.0):
299  self.info_msginfo_msg(f'{transition_service} service not available, waiting...')
300 
301  req = ManageLifecycleNodes.Request()
302  req.command = ManageLifecycleNodes.Request().SHUTDOWN
303  future = mgr_client.call_async(req)
304  try:
305  rclpy.spin_until_future_complete(self, future)
306  future.result()
307  except Exception as e: # noqa: B902
308  self.error_msgerror_msg(f'{transition_service} service call failed {e!r}')
309 
310  self.info_msginfo_msg(f'{transition_service} finished')
311 
312  def info_msg(self, msg: str) -> None:
313  self.get_logger().info(msg)
314 
315  def warn_msg(self, msg: str) -> None:
316  self.get_logger().warning(msg)
317 
318  def error_msg(self, msg: str) -> None:
319  self.get_logger().error(msg)
320 
321 
322 def main(argv: list[str] = sys.argv[1:]): # type: ignore[no-untyped-def]
323  rclpy.init()
324  time.sleep(10)
325  test = BackupTest()
326  result = test.run()
327  test.shutdown()
328 
329  if not result:
330  test.info_msg('Exiting failed')
331  exit(1)
332  else:
333  test.info_msg('Exiting passed')
334  exit(0)
335 
336 
337 if __name__ == '__main__':
338  main()
bool sendAndPreemptWithFasterCommand(self, BackUp.Goal command)
bool sendAndCancelCommand(self, BackUp.Goal command)
bool sendCommand(self, BackUp.Goal command)
None error_msg(self, str msg)
None info_msg(self, str msg)