Nav2 Navigation Stack - jazzy  jazzy
ROS 2 Navigation Stack
backup_tester.py
1 #! /usr/bin/env python3
2 # Copyright 2024 Open Navigation LLC
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 
16 import sys
17 import time
18 
19 from action_msgs.msg import GoalStatus
20 from geometry_msgs.msg import Point32, PolygonStamped
21 from nav2_msgs.action import BackUp
22 from nav2_msgs.msg import Costmap
23 from nav2_msgs.srv import ManageLifecycleNodes
24 
25 import rclpy
26 
27 from rclpy.action import ActionClient
28 from rclpy.duration import Duration
29 from rclpy.node import Node
30 from rclpy.qos import QoSDurabilityPolicy, QoSHistoryPolicy, QoSReliabilityPolicy
31 from rclpy.qos import QoSProfile
32 
33 
34 class BackupTest(Node):
35 
36  def __init__(self):
37  super().__init__(node_name='backup_tester', namespace='')
38  self.costmap_qoscostmap_qos = QoSProfile(
39  durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
40  reliability=QoSReliabilityPolicy.RELIABLE,
41  history=QoSHistoryPolicy.KEEP_LAST,
42  depth=1,
43  )
44  self.action_clientaction_client = ActionClient(self, BackUp, 'backup')
45  self.costmap_pubcostmap_pub = self.create_publisher(
46  Costmap, 'local_costmap/costmap_raw', self.costmap_qoscostmap_qos)
47  self.footprint_pubfootprint_pub = self.create_publisher(
48  PolygonStamped, 'local_costmap/published_footprint', 10)
49  self.goal_handlegoal_handle = None
50  self.action_resultaction_result = None
51 
52  def sendCommand(self, command):
53  self.info_msginfo_msg('Sending goal request...')
54  self.goal_futuregoal_future = self.action_clientaction_client.send_goal_async(command)
55  try:
56  rclpy.spin_until_future_complete(self, self.goal_futuregoal_future)
57  self.goal_handlegoal_handle = self.goal_futuregoal_future.result()
58  except Exception as e: # noqa: B902
59  self.error_msgerror_msg(f'Service call failed {e!r}')
60 
61  if not self.goal_handlegoal_handle.accepted:
62  self.error_msgerror_msg('Goal rejected')
63  return False
64 
65  self.info_msginfo_msg('Goal accepted')
66  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
67 
68  self.info_msginfo_msg("Waiting for 'Backup' action to complete")
69  try:
70  rclpy.spin_until_future_complete(self, self.result_futureresult_future)
71  status = self.result_futureresult_future.result().status
72  result = self.result_futureresult_future.result().result
73  self.action_resultaction_result = result
74  except Exception as e: # noqa: B902
75  self.error_msgerror_msg(f'Service call failed {e!r}')
76 
77  if status != GoalStatus.STATUS_SUCCEEDED:
78  self.info_msginfo_msg(f'Goal failed with status code: {status}')
79  return False
80  if self.action_resultaction_result.error_code == 0:
81  self.info_msginfo_msg('Backup was successful!')
82  return True
83  self.info_msginfo_msg('Backup failed to meet target!')
84  return False
85 
86  def sendAndPreemptWithFasterCommand(self, command):
87  # Send initial goal
88  self.info_msginfo_msg('Sending goal request...')
89  self.goal_futuregoal_future = self.action_clientaction_client.send_goal_async(command)
90  try:
91  rclpy.spin_until_future_complete(self, self.goal_futuregoal_future)
92  self.goal_handlegoal_handle = self.goal_futuregoal_future.result()
93  except Exception as e: # noqa: B902
94  self.error_msgerror_msg(f'Service call failed {e!r}')
95 
96  if not self.goal_handlegoal_handle.accepted:
97  self.error_msgerror_msg('Goal rejected')
98  return False
99 
100  self.info_msginfo_msg('Goal accepted')
101  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
102 
103  # Now preempt it
104  time.sleep(0.5)
105  self.info_msginfo_msg('Sending preemption request...')
106  command.speed = 0.2
107  self.goal_futuregoal_future = self.action_clientaction_client.send_goal_async(command)
108  try:
109  rclpy.spin_until_future_complete(self, self.goal_futuregoal_future)
110  self.goal_handlegoal_handle = self.goal_futuregoal_future.result()
111  except Exception as e: # noqa: B902
112  self.error_msgerror_msg(f'Service call failed {e!r}')
113 
114  if not self.goal_handlegoal_handle.accepted:
115  self.error_msgerror_msg('Preemption rejected')
116  return False
117 
118  self.info_msginfo_msg('Preemption accepted')
119  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
120 
121  # Wait for new goal to complete
122  self.info_msginfo_msg("Waiting for 'backup' action Preemption to complete")
123  try:
124  rclpy.spin_until_future_complete(self, self.result_futureresult_future)
125  status = self.result_futureresult_future.result().status
126  result = self.result_futureresult_future.result().result
127  self.action_resultaction_result = result
128  except Exception as e: # noqa: B902
129  self.error_msgerror_msg(f'Service call failed {e!r}')
130 
131  if status != GoalStatus.STATUS_SUCCEEDED:
132  self.info_msginfo_msg(f'Goal failed with status code: {status}')
133  return False
134  if self.action_resultaction_result.error_code == 0:
135  self.info_msginfo_msg('Backup was successful!')
136  return True
137  self.info_msginfo_msg('Backup failed to meet target!')
138  return False
139 
140  def sendAndCancelCommand(self, command):
141  self.info_msginfo_msg('Sending goal request...')
142  self.goal_futuregoal_future = self.action_clientaction_client.send_goal_async(command)
143  try:
144  rclpy.spin_until_future_complete(self, self.goal_futuregoal_future)
145  self.goal_handlegoal_handle = self.goal_futuregoal_future.result()
146  except Exception as e: # noqa: B902
147  self.error_msgerror_msg(f'Service call failed {e!r}')
148 
149  if not self.goal_handlegoal_handle.accepted:
150  self.error_msgerror_msg('Goal rejected')
151  return False
152 
153  self.info_msginfo_msg('Goal accepted')
154  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
155 
156  # Now cancel it
157  time.sleep(0.5)
158  cancel_future = self.goal_handlegoal_handle.cancel_goal_async()
159  rclpy.spin_until_future_complete(self, cancel_future)
160  rclpy.spin_until_future_complete(self, self.result_futureresult_future)
161  status = self.result_futureresult_future.result().status
162  if status != GoalStatus.STATUS_CANCELED:
163  self.info_msginfo_msg(f'Goal failed with status code: {status}')
164  return False
165  else:
166  self.info_msginfo_msg('Goal was canceled successfully')
167  return True
168 
169  def sendFreeCostmap(self):
170  costmap_msg = Costmap()
171  costmap_msg.header.frame_id = 'odom'
172  costmap_msg.header.stamp = self.get_clock().now().to_msg()
173  costmap_msg.metadata.resolution = 0.05
174  costmap_msg.metadata.size_x = 100
175  costmap_msg.metadata.size_y = 100
176  costmap_msg.metadata.origin.position.x = -2.5
177  costmap_msg.metadata.origin.position.y = -2.5
178  costmap_msg.data = [0] * (costmap_msg.metadata.size_x * costmap_msg.metadata.size_y)
179  self.costmap_pubcostmap_pub.publish(costmap_msg)
180 
181  footprint_msg = PolygonStamped()
182  footprint_msg.header.frame_id = 'odom'
183  footprint_msg.header.stamp = self.get_clock().now().to_msg()
184  footprint_msg.polygon.points = [
185  Point32(x=0.1, y=0.1, z=0.0),
186  Point32(x=0.1, y=-0.1, z=0.0),
187  Point32(x=-0.1, y=-0.1, z=0.0),
188  Point32(x=-0.1, y=0.1, z=0.0)
189  ]
190  self.footprint_pubfootprint_pub.publish(footprint_msg)
191 
192  def sendOccupiedCostmap(self):
193  costmap_msg = Costmap()
194  costmap_msg.header.frame_id = 'odom'
195  costmap_msg.header.stamp = self.get_clock().now().to_msg()
196  costmap_msg.metadata.resolution = 0.05
197  costmap_msg.metadata.size_x = 100
198  costmap_msg.metadata.size_y = 100
199  costmap_msg.metadata.origin.position.x = -2.5
200  costmap_msg.metadata.origin.position.y = -2.5
201  costmap_msg.data = [254] * (costmap_msg.metadata.size_x * costmap_msg.metadata.size_y)
202  self.costmap_pubcostmap_pub.publish(costmap_msg)
203 
204  footprint_msg = PolygonStamped()
205  footprint_msg.header.frame_id = 'odom'
206  footprint_msg.header.stamp = self.get_clock().now().to_msg()
207  footprint_msg.polygon.points = [
208  Point32(x=0.1, y=0.1, z=0.0),
209  Point32(x=0.1, y=-0.1, z=0.0),
210  Point32(x=-0.1, y=-0.1, z=0.0),
211  Point32(x=-0.1, y=0.1, z=0.0)
212  ]
213  self.footprint_pubfootprint_pub.publish(footprint_msg)
214 
215  def run(self):
216  while not self.action_clientaction_client.wait_for_server(timeout_sec=1.0):
217  self.info_msginfo_msg("'Backup' action server not available, waiting...")
218 
219  # Test A: Send without valid costmap
220  action_request = BackUp.Goal()
221  action_request.speed = 0.15
222  action_request.target.x = 0.15
223  action_request.time_allowance = Duration(seconds=5).to_msg()
224  cmd1 = self.sendCommandsendCommand(action_request)
225 
226  if cmd1:
227  self.error_msgerror_msg('Test A failed: Passed while costmap was not available!')
228  return not cmd1
229  else:
230  self.info_msginfo_msg('Test A passed')
231 
232  # Test B: Send with valid costmap and Backup a couple of times
233  self.sendFreeCostmapsendFreeCostmap()
234  time.sleep(1)
235  cmd1 = self.sendCommandsendCommand(action_request)
236  action_request.target.x = 0.1
237  cmd2 = self.sendCommandsendCommand(action_request)
238 
239  if not cmd1 or not cmd2:
240  self.error_msgerror_msg('Test B failed: Failed to Backup with valid costmap!')
241  return not cmd1 or not cmd2
242 
243  action_request.target.x = 0.5
244  cmd_preempt = self.sendAndPreemptWithFasterCommandsendAndPreemptWithFasterCommand(action_request)
245  if not cmd_preempt:
246  self.error_msgerror_msg('Test B failed: Failed to preempt and invert command!')
247  return not cmd_preempt
248 
249  cmd_cancel = self.sendAndCancelCommandsendAndCancelCommand(action_request)
250  if not cmd_cancel:
251  self.error_msgerror_msg('Test B failed: Failed to cancel command!')
252  return not cmd_cancel
253  else:
254  self.info_msginfo_msg('Test B passed')
255 
256  # Test C: Send with impossible command in time allowance & target
257  action_request.time_allowance = Duration(seconds=0.1).to_msg()
258  cmd3 = self.sendCommandsendCommand(action_request)
259  if cmd3:
260  self.error_msgerror_msg('Test C failed: Passed while impoossible timing requested!')
261  return not cmd3
262 
263  action_request.target.y = 0.5
264  cmd_invalid_target = self.sendCommandsendCommand(action_request)
265  if cmd_invalid_target:
266  self.error_msgerror_msg('Test C failed: Passed while impoossible target requested!')
267  return not cmd_invalid_target
268  else:
269  action_request.target.y = 0.0
270  self.info_msginfo_msg('Test C passed')
271 
272  # Test D: Send with lethal costmap and Backup
273  action_request.time_allowance = Duration(seconds=5).to_msg()
274  self.sendOccupiedCostmapsendOccupiedCostmap()
275  time.sleep(1)
276  cmd4 = self.sendCommandsendCommand(action_request)
277  if cmd4:
278  self.error_msgerror_msg('Test D failed: Passed while costmap was not lethal!')
279  return not cmd4
280  else:
281  self.info_msginfo_msg('Test D passed')
282  return True
283 
284  def shutdown(self):
285  self.info_msginfo_msg('Shutting down')
286 
287  self.action_clientaction_client.destroy()
288  self.info_msginfo_msg('Destroyed backup action client')
289 
290  transition_service = 'lifecycle_manager_navigation/manage_nodes'
291  mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
292  while not mgr_client.wait_for_service(timeout_sec=1.0):
293  self.info_msginfo_msg(f'{transition_service} service not available, waiting...')
294 
295  req = ManageLifecycleNodes.Request()
296  req.command = ManageLifecycleNodes.Request().SHUTDOWN
297  future = mgr_client.call_async(req)
298  try:
299  rclpy.spin_until_future_complete(self, future)
300  future.result()
301  except Exception as e: # noqa: B902
302  self.error_msgerror_msg(f'{transition_service} service call failed {e!r}')
303 
304  self.info_msginfo_msg(f'{transition_service} finished')
305 
306  def info_msg(self, msg: str):
307  self.get_logger().info(msg)
308 
309  def warn_msg(self, msg: str):
310  self.get_logger().warn(msg)
311 
312  def error_msg(self, msg: str):
313  self.get_logger().error(msg)
314 
315 
316 def main(argv=sys.argv[1:]):
317  rclpy.init()
318  time.sleep(10)
319  test = BackupTest()
320  result = test.run()
321  test.shutdown()
322 
323  if not result:
324  test.info_msg('Exiting failed')
325  exit(1)
326  else:
327  test.info_msg('Exiting passed')
328  exit(0)
329 
330 
331 if __name__ == '__main__':
332  main()
def sendAndPreemptWithFasterCommand(self, command)
def error_msg(self, str msg)
def sendCommand(self, command)
def sendAndCancelCommand(self, command)
def info_msg(self, str msg)