Nav2 Navigation Stack - kilted  kilted
ROS 2 Navigation Stack
drive_tester.py
1 #! /usr/bin/env python3
2 # Copyright 2024 Open Navigation LLC
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 
16 import sys
17 import time
18 from typing import Optional
19 
20 from action_msgs.msg import GoalStatus
21 from geometry_msgs.msg import Point32, PolygonStamped
22 from nav2_msgs.action import DriveOnHeading
23 from nav2_msgs.msg import Costmap
24 from nav2_msgs.srv import ManageLifecycleNodes
25 import rclpy
26 from rclpy.action import ActionClient # type: ignore[attr-defined]
27 from rclpy.action.client import ClientGoalHandle
28 from rclpy.duration import Duration
29 from rclpy.node import Node
30 from rclpy.qos import QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy
31 
32 
33 class DriveTest(Node):
34 
35  def __init__(self) -> None:
36  super().__init__(node_name='drive_tester', namespace='')
37  self.costmap_qoscostmap_qos = QoSProfile(
38  durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
39  reliability=QoSReliabilityPolicy.RELIABLE,
40  history=QoSHistoryPolicy.KEEP_LAST,
41  depth=1,
42  )
43  self.action_clientaction_client = ActionClient(self, DriveOnHeading, 'drive_on_heading')
44  self.costmap_pubcostmap_pub = self.create_publisher(
45  Costmap, 'local_costmap/costmap_raw', self.costmap_qoscostmap_qos)
46  self.footprint_pubfootprint_pub = self.create_publisher(
47  PolygonStamped, 'local_costmap/published_footprint', 10)
48  self.goal_handlegoal_handle: Optional[ClientGoalHandle[
49  DriveOnHeading.Goal, DriveOnHeading.Result,
50  DriveOnHeading.Feedback]] = None
51  self.action_resultaction_result = DriveOnHeading.Result()
52 
53  def sendCommand(self, command: DriveOnHeading.Goal) -> bool:
54  self.info_msginfo_msg('Sending goal request...')
55  self.goal_futuregoal_future = self.action_clientaction_client.send_goal_async(command)
56  try:
57  rclpy.spin_until_future_complete(self, self.goal_futuregoal_future)
58  self.goal_handlegoal_handle = self.goal_futuregoal_future.result()
59  except Exception as e: # noqa: B902
60  self.error_msgerror_msg(f'Service call failed {e!r}')
61 
62  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
63  self.error_msgerror_msg('Goal rejected')
64  return False
65 
66  self.info_msginfo_msg('Goal accepted')
67  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
68 
69  self.info_msginfo_msg("Waiting for 'DriveOnHeading' action to complete")
70  try:
71  rclpy.spin_until_future_complete(self, self.result_futureresult_future)
72  status = self.result_futureresult_future.result().status # type: ignore[union-attr]
73  result = self.result_futureresult_future.result().result # type: ignore[union-attr]
74  self.action_resultaction_result = result
75  except Exception as e: # noqa: B902
76  self.error_msgerror_msg(f'Service call failed {e!r}')
77 
78  if status != GoalStatus.STATUS_SUCCEEDED:
79  self.info_msginfo_msg(f'Goal failed with status code: {status}')
80  return False
81  if self.action_resultaction_result.error_code == 0:
82  self.info_msginfo_msg('DriveOnHeading was successful!')
83  return True
84  self.info_msginfo_msg('DriveOnHeading failed to meet target!')
85  return False
86 
87  def sendAndPreemptWithFasterCommand(self, command: DriveOnHeading.Goal) -> bool:
88  # Send initial goal
89  self.info_msginfo_msg('Sending goal request...')
90  self.goal_futuregoal_future = self.action_clientaction_client.send_goal_async(command)
91  try:
92  rclpy.spin_until_future_complete(self, self.goal_futuregoal_future)
93  self.goal_handlegoal_handle = self.goal_futuregoal_future.result()
94  except Exception as e: # noqa: B902
95  self.error_msgerror_msg(f'Service call failed {e!r}')
96 
97  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
98  self.error_msgerror_msg('Goal rejected')
99  return False
100 
101  self.info_msginfo_msg('Goal accepted')
102  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
103 
104  # Now preempt it
105  time.sleep(0.5)
106  self.info_msginfo_msg('Sending preemption request...')
107  command.speed = 0.2
108  self.goal_futuregoal_future = self.action_clientaction_client.send_goal_async(command)
109  try:
110  rclpy.spin_until_future_complete(self, self.goal_futuregoal_future)
111  self.goal_handlegoal_handle = self.goal_futuregoal_future.result()
112  except Exception as e: # noqa: B902
113  self.error_msgerror_msg(f'Service call failed {e!r}')
114 
115  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
116  self.error_msgerror_msg('Preemption rejected')
117  return False
118 
119  self.info_msginfo_msg('Preemption accepted')
120  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
121 
122  # Wait for new goal to complete
123  self.info_msginfo_msg("Waiting for 'DriveOnHeading' action Preemption to complete")
124  try:
125  rclpy.spin_until_future_complete(self, self.result_futureresult_future)
126  status = self.result_futureresult_future.result().status # type: ignore[union-attr]
127  result = self.result_futureresult_future.result().result # type: ignore[union-attr]
128  self.action_resultaction_result = result
129  except Exception as e: # noqa: B902
130  self.error_msgerror_msg(f'Service call failed {e!r}')
131 
132  if status != GoalStatus.STATUS_SUCCEEDED:
133  self.info_msginfo_msg(f'Goal failed with status code: {status}')
134  return False
135  if self.action_resultaction_result.error_code == 0:
136  self.info_msginfo_msg('DriveOnHeading was successful!')
137  return True
138  self.info_msginfo_msg('DriveOnHeading failed to meet target!')
139  return False
140 
141  def sendAndCancelCommand(self, command: DriveOnHeading.Goal) -> bool:
142  self.info_msginfo_msg('Sending goal request...')
143  self.goal_futuregoal_future = self.action_clientaction_client.send_goal_async(command)
144  try:
145  rclpy.spin_until_future_complete(self, self.goal_futuregoal_future)
146  self.goal_handlegoal_handle = self.goal_futuregoal_future.result()
147  except Exception as e: # noqa: B902
148  self.error_msgerror_msg(f'Service call failed {e!r}')
149 
150  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
151  self.error_msgerror_msg('Goal rejected')
152  return False
153 
154  self.info_msginfo_msg('Goal accepted')
155  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
156 
157  # Now cancel it
158  time.sleep(0.5)
159  cancel_future = self.goal_handlegoal_handle.cancel_goal_async()
160  rclpy.spin_until_future_complete(self, cancel_future)
161  rclpy.spin_until_future_complete(self, self.result_futureresult_future)
162  status = self.result_futureresult_future.result().status # type: ignore[union-attr]
163  if status != GoalStatus.STATUS_CANCELED:
164  self.info_msginfo_msg(f'Goal failed with status code: {status}')
165  return False
166  else:
167  self.info_msginfo_msg('Goal was canceled successfully')
168  return True
169 
170  def sendFreeCostmap(self) -> None:
171  costmap_msg = Costmap()
172  costmap_msg.header.frame_id = 'odom'
173  costmap_msg.header.stamp = self.get_clock().now().to_msg()
174  costmap_msg.metadata.resolution = 0.05
175  costmap_msg.metadata.size_x = 100
176  costmap_msg.metadata.size_y = 100
177  costmap_msg.metadata.origin.position.x = -2.5
178  costmap_msg.metadata.origin.position.y = -2.5
179  costmap_msg.data = [0] * (costmap_msg.metadata.size_x * costmap_msg.metadata.size_y)
180  self.costmap_pubcostmap_pub.publish(costmap_msg)
181 
182  footprint_msg = PolygonStamped()
183  footprint_msg.header.frame_id = 'odom'
184  footprint_msg.header.stamp = self.get_clock().now().to_msg()
185  footprint_msg.polygon.points = [
186  Point32(x=0.1, y=0.1, z=0.0),
187  Point32(x=0.1, y=-0.1, z=0.0),
188  Point32(x=-0.1, y=-0.1, z=0.0),
189  Point32(x=-0.1, y=0.1, z=0.0)
190  ]
191  self.footprint_pubfootprint_pub.publish(footprint_msg)
192 
193  def sendOccupiedCostmap(self) -> None:
194  costmap_msg = Costmap()
195  costmap_msg.header.frame_id = 'odom'
196  costmap_msg.header.stamp = self.get_clock().now().to_msg()
197  costmap_msg.metadata.resolution = 0.05
198  costmap_msg.metadata.size_x = 100
199  costmap_msg.metadata.size_y = 100
200  costmap_msg.metadata.origin.position.x = -2.5
201  costmap_msg.metadata.origin.position.y = -2.5
202  costmap_msg.data = [254] * (costmap_msg.metadata.size_x * costmap_msg.metadata.size_y)
203  self.costmap_pubcostmap_pub.publish(costmap_msg)
204 
205  footprint_msg = PolygonStamped()
206  footprint_msg.header.frame_id = 'odom'
207  footprint_msg.header.stamp = self.get_clock().now().to_msg()
208  footprint_msg.polygon.points = [
209  Point32(x=0.1, y=0.1, z=0.0),
210  Point32(x=0.1, y=-0.1, z=0.0),
211  Point32(x=-0.1, y=-0.1, z=0.0),
212  Point32(x=-0.1, y=0.1, z=0.0)
213  ]
214  self.footprint_pubfootprint_pub.publish(footprint_msg)
215 
216  def run(self) -> bool:
217  while not self.action_clientaction_client.wait_for_server(timeout_sec=1.0):
218  self.info_msginfo_msg("'DriveOnHeading' action server not available, waiting...")
219 
220  # Test A: Send without valid costmap
221  action_request = DriveOnHeading.Goal()
222  action_request.speed = 0.15
223  action_request.target.x = 0.15
224  action_request.time_allowance = Duration(seconds=5).to_msg()
225  cmd1 = self.sendCommandsendCommand(action_request)
226 
227  if cmd1:
228  self.error_msgerror_msg('Test A failed: Passed while costmap was not available!')
229  return not cmd1
230  else:
231  self.info_msginfo_msg('Test A passed')
232 
233  # Test B: Send with valid costmap and DriveOnHeading a couple of times
234  self.sendFreeCostmapsendFreeCostmap()
235  time.sleep(1)
236  cmd1 = self.sendCommandsendCommand(action_request)
237  action_request.target.x = 0.1
238  cmd2 = self.sendCommandsendCommand(action_request)
239 
240  if not cmd1 or not cmd2:
241  self.error_msgerror_msg('Test B failed: Failed to DriveOnHeading with valid costmap!')
242  return not cmd1 or not cmd2
243 
244  action_request.target.x = 0.5
245  cmd_preempt = self.sendAndPreemptWithFasterCommandsendAndPreemptWithFasterCommand(action_request)
246  if not cmd_preempt:
247  self.error_msgerror_msg('Test B failed: Failed to preempt and invert command!')
248  return not cmd_preempt
249 
250  cmd_cancel = self.sendAndCancelCommandsendAndCancelCommand(action_request)
251  if not cmd_cancel:
252  self.error_msgerror_msg('Test B failed: Failed to cancel command!')
253  return not cmd_cancel
254  else:
255  self.info_msginfo_msg('Test B passed')
256 
257  # Test C: Send with impossible command in time allowance & target * signs
258  action_request.time_allowance = Duration(seconds=0.1).to_msg()
259  cmd3 = self.sendCommandsendCommand(action_request)
260  if cmd3:
261  self.error_msgerror_msg('Test C failed: Passed while impoossible timing requested!')
262  return not cmd3
263 
264  action_request.target.y = 0.5
265  cmd_invalid_target = self.sendCommandsendCommand(action_request)
266  if cmd_invalid_target:
267  self.error_msgerror_msg('Test C failed: Passed while impoossible target requested!')
268  return not cmd_invalid_target
269  else:
270  action_request.target.y = 0.0
271 
272  action_request.target.x = -0.5
273  cmd_invalid_sign = self.sendCommandsendCommand(action_request)
274  if cmd_invalid_sign:
275  self.error_msgerror_msg('Test C failed: Passed while impoossible target sign requested!')
276  return not cmd_invalid_sign
277  else:
278  action_request.target.x = 0.5
279  self.info_msginfo_msg('Test C passed')
280 
281  # Test D: Send with lethal costmap and DriveOnHeading
282  action_request.time_allowance = Duration(seconds=5).to_msg()
283  self.sendOccupiedCostmapsendOccupiedCostmap()
284  time.sleep(1)
285  cmd4 = self.sendCommandsendCommand(action_request)
286  if cmd4:
287  self.error_msgerror_msg('Test D failed: Passed while costmap was not lethal!')
288  return not cmd4
289  else:
290  self.info_msginfo_msg('Test D passed')
291  return True
292 
293  def shutdown(self) -> None:
294  self.info_msginfo_msg('Shutting down')
295 
296  self.action_clientaction_client.destroy()
297  self.info_msginfo_msg('Destroyed DriveOnHeading action client')
298 
299  transition_service = 'lifecycle_manager_navigation/manage_nodes'
300  mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
301  while not mgr_client.wait_for_service(timeout_sec=1.0):
302  self.info_msginfo_msg(f'{transition_service} service not available, waiting...')
303 
304  req = ManageLifecycleNodes.Request()
305  req.command = ManageLifecycleNodes.Request().SHUTDOWN
306  future = mgr_client.call_async(req)
307  try:
308  rclpy.spin_until_future_complete(self, future)
309  future.result()
310  except Exception as e: # noqa: B902
311  self.error_msgerror_msg(f'{transition_service} service call failed {e!r}')
312 
313  self.info_msginfo_msg(f'{transition_service} finished')
314 
315  def info_msg(self, msg: str) -> None:
316  self.get_logger().info(msg)
317 
318  def warn_msg(self, msg: str) -> None:
319  self.get_logger().warn(msg)
320 
321  def error_msg(self, msg: str) -> None:
322  self.get_logger().error(msg)
323 
324 
325 def main(argv: list[str] = sys.argv[1:]): # type: ignore[no-untyped-def]
326  rclpy.init()
327  time.sleep(10)
328  test = DriveTest()
329  result = test.run()
330  test.shutdown()
331 
332  if not result:
333  test.info_msg('Exiting failed')
334  exit(1)
335  else:
336  test.info_msg('Exiting passed')
337  exit(0)
338 
339 
340 if __name__ == '__main__':
341  main()
None info_msg(self, str msg)
None sendFreeCostmap(self)
bool sendAndPreemptWithFasterCommand(self, DriveOnHeading.Goal command)
Definition: drive_tester.py:87
None error_msg(self, str msg)
bool sendCommand(self, DriveOnHeading.Goal command)
Definition: drive_tester.py:53
bool sendAndCancelCommand(self, DriveOnHeading.Goal command)
None sendOccupiedCostmap(self)