Nav2 Navigation Stack - rolling  main
ROS 2 Navigation Stack
drive_tester.py
1 #! /usr/bin/env python3
2 # Copyright 2024 Open Navigation LLC
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 
16 import sys
17 import time
18 from typing import Optional
19 
20 from action_msgs.msg import GoalStatus
21 from geometry_msgs.msg import Point32, PolygonStamped
22 from nav2_msgs.action import DriveOnHeading
23 from nav2_msgs.msg import Costmap
24 from nav2_msgs.srv import ManageLifecycleNodes
25 import rclpy
26 from rclpy.action import ActionClient
27 from rclpy.action.client import ClientGoalHandle
28 from rclpy.client import Client
29 from rclpy.duration import Duration
30 from rclpy.node import Node
31 from rclpy.qos import QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy
32 
33 
34 class DriveTest(Node):
35 
36  def __init__(self) -> None:
37  super().__init__(node_name='drive_tester', namespace='')
38  self.costmap_qoscostmap_qos = QoSProfile(
39  durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
40  reliability=QoSReliabilityPolicy.RELIABLE,
41  history=QoSHistoryPolicy.KEEP_LAST,
42  depth=1,
43  )
44  self.action_client: ActionClient[
45  DriveOnHeading.Goal,
46  DriveOnHeading.Result,
47  DriveOnHeading.Feedback
48  ] = ActionClient(self, DriveOnHeading, 'drive_on_heading')
49  self.costmap_pubcostmap_pub = self.create_publisher(
50  Costmap, 'local_costmap/costmap_raw', self.costmap_qoscostmap_qos)
51  self.footprint_pubfootprint_pub = self.create_publisher(
52  PolygonStamped, 'local_costmap/published_footprint', 10)
53  self.goal_handlegoal_handle: Optional[ClientGoalHandle[
54  DriveOnHeading.Goal, DriveOnHeading.Result,
55  DriveOnHeading.Feedback]] = None
56  self.action_resultaction_result = DriveOnHeading.Result()
57 
58  def sendCommand(self, command: DriveOnHeading.Goal) -> bool:
59  self.info_msginfo_msg('Sending goal request...')
60  self.goal_futuregoal_future = self.action_client.send_goal_async(command)
61  try:
62  rclpy.spin_until_future_complete(self, self.goal_futuregoal_future)
63  self.goal_handlegoal_handle = self.goal_futuregoal_future.result()
64  except Exception as e: # noqa: B902
65  self.error_msgerror_msg(f'Service call failed {e!r}')
66 
67  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
68  self.error_msgerror_msg('Goal rejected')
69  return False
70 
71  self.info_msginfo_msg('Goal accepted')
72  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
73 
74  self.info_msginfo_msg("Waiting for 'DriveOnHeading' action to complete")
75  try:
76  rclpy.spin_until_future_complete(self, self.result_futureresult_future)
77  status = self.result_futureresult_future.result().status # type: ignore[union-attr]
78  result = self.result_futureresult_future.result().result # type: ignore[union-attr]
79  self.action_resultaction_result = result
80  except Exception as e: # noqa: B902
81  self.error_msgerror_msg(f'Service call failed {e!r}')
82 
83  if status != GoalStatus.STATUS_SUCCEEDED:
84  self.info_msginfo_msg(f'Goal failed with status code: {status}')
85  return False
86  if self.action_resultaction_result.error_code == 0:
87  self.info_msginfo_msg('DriveOnHeading was successful!')
88  return True
89  self.info_msginfo_msg('DriveOnHeading failed to meet target!')
90  return False
91 
92  def sendAndPreemptWithFasterCommand(self, command: DriveOnHeading.Goal) -> bool:
93  # Send initial goal
94  self.info_msginfo_msg('Sending goal request...')
95  self.goal_futuregoal_future = self.action_client.send_goal_async(command)
96  try:
97  rclpy.spin_until_future_complete(self, self.goal_futuregoal_future)
98  self.goal_handlegoal_handle = self.goal_futuregoal_future.result()
99  except Exception as e: # noqa: B902
100  self.error_msgerror_msg(f'Service call failed {e!r}')
101 
102  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
103  self.error_msgerror_msg('Goal rejected')
104  return False
105 
106  self.info_msginfo_msg('Goal accepted')
107  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
108 
109  # Now preempt it
110  time.sleep(0.5)
111  self.info_msginfo_msg('Sending preemption request...')
112  command.speed = 0.2
113  self.goal_futuregoal_future = self.action_client.send_goal_async(command)
114  try:
115  rclpy.spin_until_future_complete(self, self.goal_futuregoal_future)
116  self.goal_handlegoal_handle = self.goal_futuregoal_future.result()
117  except Exception as e: # noqa: B902
118  self.error_msgerror_msg(f'Service call failed {e!r}')
119 
120  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
121  self.error_msgerror_msg('Preemption rejected')
122  return False
123 
124  self.info_msginfo_msg('Preemption accepted')
125  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
126 
127  # Wait for new goal to complete
128  self.info_msginfo_msg("Waiting for 'DriveOnHeading' action Preemption to complete")
129  try:
130  rclpy.spin_until_future_complete(self, self.result_futureresult_future)
131  status = self.result_futureresult_future.result().status # type: ignore[union-attr]
132  result = self.result_futureresult_future.result().result # type: ignore[union-attr]
133  self.action_resultaction_result = result
134  except Exception as e: # noqa: B902
135  self.error_msgerror_msg(f'Service call failed {e!r}')
136 
137  if status != GoalStatus.STATUS_SUCCEEDED:
138  self.info_msginfo_msg(f'Goal failed with status code: {status}')
139  return False
140  if self.action_resultaction_result.error_code == 0:
141  self.info_msginfo_msg('DriveOnHeading was successful!')
142  return True
143  self.info_msginfo_msg('DriveOnHeading failed to meet target!')
144  return False
145 
146  def sendAndCancelCommand(self, command: DriveOnHeading.Goal) -> bool:
147  self.info_msginfo_msg('Sending goal request...')
148  self.goal_futuregoal_future = self.action_client.send_goal_async(command)
149  try:
150  rclpy.spin_until_future_complete(self, self.goal_futuregoal_future)
151  self.goal_handlegoal_handle = self.goal_futuregoal_future.result()
152  except Exception as e: # noqa: B902
153  self.error_msgerror_msg(f'Service call failed {e!r}')
154 
155  if not self.goal_handlegoal_handle or not self.goal_handlegoal_handle.accepted:
156  self.error_msgerror_msg('Goal rejected')
157  return False
158 
159  self.info_msginfo_msg('Goal accepted')
160  self.result_futureresult_future = self.goal_handlegoal_handle.get_result_async()
161 
162  # Now cancel it
163  time.sleep(0.5)
164  cancel_future = self.goal_handlegoal_handle.cancel_goal_async()
165  rclpy.spin_until_future_complete(self, cancel_future)
166  rclpy.spin_until_future_complete(self, self.result_futureresult_future)
167  status = self.result_futureresult_future.result().status # type: ignore[union-attr]
168  if status != GoalStatus.STATUS_CANCELED:
169  self.info_msginfo_msg(f'Goal failed with status code: {status}')
170  return False
171  else:
172  self.info_msginfo_msg('Goal was canceled successfully')
173  return True
174 
175  def sendFreeCostmap(self) -> None:
176  costmap_msg = Costmap()
177  costmap_msg.header.frame_id = 'odom'
178  costmap_msg.header.stamp = self.get_clock().now().to_msg()
179  costmap_msg.metadata.resolution = 0.05
180  costmap_msg.metadata.size_x = 100
181  costmap_msg.metadata.size_y = 100
182  costmap_msg.metadata.origin.position.x = -2.5
183  costmap_msg.metadata.origin.position.y = -2.5
184  costmap_msg.data = [0] * (costmap_msg.metadata.size_x * costmap_msg.metadata.size_y)
185  self.costmap_pubcostmap_pub.publish(costmap_msg)
186 
187  footprint_msg = PolygonStamped()
188  footprint_msg.header.frame_id = 'odom'
189  footprint_msg.header.stamp = self.get_clock().now().to_msg()
190  footprint_msg.polygon.points = [
191  Point32(x=0.1, y=0.1, z=0.0),
192  Point32(x=0.1, y=-0.1, z=0.0),
193  Point32(x=-0.1, y=-0.1, z=0.0),
194  Point32(x=-0.1, y=0.1, z=0.0)
195  ]
196  self.footprint_pubfootprint_pub.publish(footprint_msg)
197 
198  def sendOccupiedCostmap(self) -> None:
199  costmap_msg = Costmap()
200  costmap_msg.header.frame_id = 'odom'
201  costmap_msg.header.stamp = self.get_clock().now().to_msg()
202  costmap_msg.metadata.resolution = 0.05
203  costmap_msg.metadata.size_x = 100
204  costmap_msg.metadata.size_y = 100
205  costmap_msg.metadata.origin.position.x = -2.5
206  costmap_msg.metadata.origin.position.y = -2.5
207  costmap_msg.data = [254] * (costmap_msg.metadata.size_x * costmap_msg.metadata.size_y)
208  self.costmap_pubcostmap_pub.publish(costmap_msg)
209 
210  footprint_msg = PolygonStamped()
211  footprint_msg.header.frame_id = 'odom'
212  footprint_msg.header.stamp = self.get_clock().now().to_msg()
213  footprint_msg.polygon.points = [
214  Point32(x=0.1, y=0.1, z=0.0),
215  Point32(x=0.1, y=-0.1, z=0.0),
216  Point32(x=-0.1, y=-0.1, z=0.0),
217  Point32(x=-0.1, y=0.1, z=0.0)
218  ]
219  self.footprint_pubfootprint_pub.publish(footprint_msg)
220 
221  def run(self) -> bool:
222  while not self.action_client.wait_for_server(timeout_sec=1.0):
223  self.info_msginfo_msg("'DriveOnHeading' action server not available, waiting...")
224 
225  # Test A: Send without valid costmap
226  action_request = DriveOnHeading.Goal()
227  action_request.speed = 0.15
228  action_request.target.x = 0.15
229  action_request.time_allowance = Duration(seconds=5).to_msg()
230  cmd1 = self.sendCommandsendCommand(action_request)
231 
232  if cmd1:
233  self.error_msgerror_msg('Test A failed: Passed while costmap was not available!')
234  return not cmd1
235  else:
236  self.info_msginfo_msg('Test A passed')
237 
238  # Test B: Send with valid costmap and DriveOnHeading a couple of times
239  self.sendFreeCostmapsendFreeCostmap()
240  time.sleep(1)
241  cmd1 = self.sendCommandsendCommand(action_request)
242  action_request.target.x = 0.1
243  cmd2 = self.sendCommandsendCommand(action_request)
244 
245  if not cmd1 or not cmd2:
246  self.error_msgerror_msg('Test B failed: Failed to DriveOnHeading with valid costmap!')
247  return not cmd1 or not cmd2
248 
249  action_request.target.x = 0.5
250  cmd_preempt = self.sendAndPreemptWithFasterCommandsendAndPreemptWithFasterCommand(action_request)
251  if not cmd_preempt:
252  self.error_msgerror_msg('Test B failed: Failed to preempt and invert command!')
253  return not cmd_preempt
254 
255  cmd_cancel = self.sendAndCancelCommandsendAndCancelCommand(action_request)
256  if not cmd_cancel:
257  self.error_msgerror_msg('Test B failed: Failed to cancel command!')
258  return not cmd_cancel
259  else:
260  self.info_msginfo_msg('Test B passed')
261 
262  # Test C: Send with impossible command in time allowance & target * signs
263  action_request.time_allowance = Duration(seconds=0.1).to_msg()
264  cmd3 = self.sendCommandsendCommand(action_request)
265  if cmd3:
266  self.error_msgerror_msg('Test C failed: Passed while impoossible timing requested!')
267  return not cmd3
268 
269  action_request.target.y = 0.5
270  cmd_invalid_target = self.sendCommandsendCommand(action_request)
271  if cmd_invalid_target:
272  self.error_msgerror_msg('Test C failed: Passed while impoossible target requested!')
273  return not cmd_invalid_target
274  else:
275  action_request.target.y = 0.0
276 
277  action_request.target.x = -0.5
278  cmd_invalid_sign = self.sendCommandsendCommand(action_request)
279  if cmd_invalid_sign:
280  self.error_msgerror_msg('Test C failed: Passed while impoossible target sign requested!')
281  return not cmd_invalid_sign
282  else:
283  action_request.target.x = 0.5
284  self.info_msginfo_msg('Test C passed')
285 
286  # Test D: Send with lethal costmap and DriveOnHeading
287  action_request.time_allowance = Duration(seconds=5).to_msg()
288  self.sendOccupiedCostmapsendOccupiedCostmap()
289  time.sleep(1)
290  cmd4 = self.sendCommandsendCommand(action_request)
291  if cmd4:
292  self.error_msgerror_msg('Test D failed: Passed while costmap was not lethal!')
293  return not cmd4
294  else:
295  self.info_msginfo_msg('Test D passed')
296  return True
297 
298  def shutdown(self) -> None:
299  self.info_msginfo_msg('Shutting down')
300 
301  self.action_client.destroy()
302  self.info_msginfo_msg('Destroyed DriveOnHeading action client')
303 
304  transition_service = 'lifecycle_manager_navigation/manage_nodes'
305  mgr_client: Client[ManageLifecycleNodes.Request, ManageLifecycleNodes.Response] \
306  = self.create_client(ManageLifecycleNodes, transition_service)
307  while not mgr_client.wait_for_service(timeout_sec=1.0):
308  self.info_msginfo_msg(f'{transition_service} service not available, waiting...')
309 
310  req = ManageLifecycleNodes.Request()
311  req.command = ManageLifecycleNodes.Request().SHUTDOWN
312  future = mgr_client.call_async(req)
313  try:
314  rclpy.spin_until_future_complete(self, future)
315  future.result()
316  except Exception as e: # noqa: B902
317  self.error_msgerror_msg(f'{transition_service} service call failed {e!r}')
318 
319  self.info_msginfo_msg(f'{transition_service} finished')
320 
321  def info_msg(self, msg: str) -> None:
322  self.get_logger().info(msg)
323 
324  def warn_msg(self, msg: str) -> None:
325  self.get_logger().warning(msg)
326 
327  def error_msg(self, msg: str) -> None:
328  self.get_logger().error(msg)
329 
330 
331 def main(argv: list[str] = sys.argv[1:]): # type: ignore[no-untyped-def]
332  rclpy.init()
333  time.sleep(10)
334  test = DriveTest()
335  result = test.run()
336  test.shutdown()
337 
338  if not result:
339  test.info_msg('Exiting failed')
340  exit(1)
341  else:
342  test.info_msg('Exiting passed')
343  exit(0)
344 
345 
346 if __name__ == '__main__':
347  main()
None info_msg(self, str msg)
None sendFreeCostmap(self)
bool sendAndPreemptWithFasterCommand(self, DriveOnHeading.Goal command)
Definition: drive_tester.py:92
None error_msg(self, str msg)
bool sendCommand(self, DriveOnHeading.Goal command)
Definition: drive_tester.py:58
bool sendAndCancelCommand(self, DriveOnHeading.Goal command)
None sendOccupiedCostmap(self)