18 from typing
import Optional
20 from action_msgs.msg
import GoalStatus
22 from nav2_msgs.action
import DriveOnHeading
24 from nav2_msgs.srv
import ManageLifecycleNodes
26 from rclpy.action
import ActionClient
27 from rclpy.action.client
import ClientGoalHandle
28 from rclpy.client
import Client
29 from rclpy.duration
import Duration
30 from rclpy.node
import Node
31 from rclpy.qos
import QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy
36 def __init__(self) -> None:
37 super().__init__(node_name=
'drive_tester', namespace=
'')
39 durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
40 reliability=QoSReliabilityPolicy.RELIABLE,
41 history=QoSHistoryPolicy.KEEP_LAST,
44 self.action_client: ActionClient[
46 DriveOnHeading.Result,
47 DriveOnHeading.Feedback
48 ] = ActionClient(self, DriveOnHeading,
'drive_on_heading')
49 self.
costmap_pubcostmap_pub = self.create_publisher(
50 Costmap,
'local_costmap/costmap_raw', self.
costmap_qoscostmap_qos)
52 PolygonStamped,
'local_costmap/published_footprint', 10)
53 self.
goal_handlegoal_handle: Optional[ClientGoalHandle[
54 DriveOnHeading.Goal, DriveOnHeading.Result,
55 DriveOnHeading.Feedback]] =
None
58 def sendCommand(self, command: DriveOnHeading.Goal) -> bool:
59 self.
info_msginfo_msg(
'Sending goal request...')
60 self.
goal_futuregoal_future = self.action_client.send_goal_async(command)
62 rclpy.spin_until_future_complete(self, self.
goal_futuregoal_future)
64 except Exception
as e:
65 self.
error_msgerror_msg(f
'Service call failed {e!r}')
71 self.
info_msginfo_msg(
'Goal accepted')
74 self.
info_msginfo_msg(
"Waiting for 'DriveOnHeading' action to complete")
76 rclpy.spin_until_future_complete(self, self.
result_futureresult_future)
80 except Exception
as e:
81 self.
error_msgerror_msg(f
'Service call failed {e!r}')
83 if status != GoalStatus.STATUS_SUCCEEDED:
84 self.
info_msginfo_msg(f
'Goal failed with status code: {status}')
87 self.
info_msginfo_msg(
'DriveOnHeading was successful!')
89 self.
info_msginfo_msg(
'DriveOnHeading failed to meet target!')
92 def sendAndPreemptWithFasterCommand(self, command: DriveOnHeading.Goal) -> bool:
94 self.
info_msginfo_msg(
'Sending goal request...')
95 self.
goal_futuregoal_future = self.action_client.send_goal_async(command)
97 rclpy.spin_until_future_complete(self, self.
goal_futuregoal_future)
99 except Exception
as e:
100 self.
error_msgerror_msg(f
'Service call failed {e!r}')
106 self.
info_msginfo_msg(
'Goal accepted')
111 self.
info_msginfo_msg(
'Sending preemption request...')
113 self.
goal_futuregoal_future = self.action_client.send_goal_async(command)
115 rclpy.spin_until_future_complete(self, self.
goal_futuregoal_future)
117 except Exception
as e:
118 self.
error_msgerror_msg(f
'Service call failed {e!r}')
121 self.
error_msgerror_msg(
'Preemption rejected')
124 self.
info_msginfo_msg(
'Preemption accepted')
128 self.
info_msginfo_msg(
"Waiting for 'DriveOnHeading' action Preemption to complete")
130 rclpy.spin_until_future_complete(self, self.
result_futureresult_future)
134 except Exception
as e:
135 self.
error_msgerror_msg(f
'Service call failed {e!r}')
137 if status != GoalStatus.STATUS_SUCCEEDED:
138 self.
info_msginfo_msg(f
'Goal failed with status code: {status}')
141 self.
info_msginfo_msg(
'DriveOnHeading was successful!')
143 self.
info_msginfo_msg(
'DriveOnHeading failed to meet target!')
146 def sendAndCancelCommand(self, command: DriveOnHeading.Goal) -> bool:
147 self.
info_msginfo_msg(
'Sending goal request...')
148 self.
goal_futuregoal_future = self.action_client.send_goal_async(command)
150 rclpy.spin_until_future_complete(self, self.
goal_futuregoal_future)
152 except Exception
as e:
153 self.
error_msgerror_msg(f
'Service call failed {e!r}')
159 self.
info_msginfo_msg(
'Goal accepted')
164 cancel_future = self.
goal_handlegoal_handle.cancel_goal_async()
165 rclpy.spin_until_future_complete(self, cancel_future)
166 rclpy.spin_until_future_complete(self, self.
result_futureresult_future)
168 if status != GoalStatus.STATUS_CANCELED:
169 self.
info_msginfo_msg(f
'Goal failed with status code: {status}')
172 self.
info_msginfo_msg(
'Goal was canceled successfully')
175 def sendFreeCostmap(self) -> None:
176 costmap_msg = Costmap()
177 costmap_msg.header.frame_id =
'odom'
178 costmap_msg.header.stamp = self.get_clock().now().to_msg()
179 costmap_msg.metadata.resolution = 0.05
180 costmap_msg.metadata.size_x = 100
181 costmap_msg.metadata.size_y = 100
182 costmap_msg.metadata.origin.position.x = -2.5
183 costmap_msg.metadata.origin.position.y = -2.5
184 costmap_msg.data = [0] * (costmap_msg.metadata.size_x * costmap_msg.metadata.size_y)
187 footprint_msg = PolygonStamped()
188 footprint_msg.header.frame_id =
'odom'
189 footprint_msg.header.stamp = self.get_clock().now().to_msg()
190 footprint_msg.polygon.points = [
191 Point32(x=0.1, y=0.1, z=0.0),
192 Point32(x=0.1, y=-0.1, z=0.0),
193 Point32(x=-0.1, y=-0.1, z=0.0),
194 Point32(x=-0.1, y=0.1, z=0.0)
198 def sendOccupiedCostmap(self) -> None:
199 costmap_msg = Costmap()
200 costmap_msg.header.frame_id =
'odom'
201 costmap_msg.header.stamp = self.get_clock().now().to_msg()
202 costmap_msg.metadata.resolution = 0.05
203 costmap_msg.metadata.size_x = 100
204 costmap_msg.metadata.size_y = 100
205 costmap_msg.metadata.origin.position.x = -2.5
206 costmap_msg.metadata.origin.position.y = -2.5
207 costmap_msg.data = [254] * (costmap_msg.metadata.size_x * costmap_msg.metadata.size_y)
210 footprint_msg = PolygonStamped()
211 footprint_msg.header.frame_id =
'odom'
212 footprint_msg.header.stamp = self.get_clock().now().to_msg()
213 footprint_msg.polygon.points = [
214 Point32(x=0.1, y=0.1, z=0.0),
215 Point32(x=0.1, y=-0.1, z=0.0),
216 Point32(x=-0.1, y=-0.1, z=0.0),
217 Point32(x=-0.1, y=0.1, z=0.0)
221 def run(self) -> bool:
222 while not self.action_client.wait_for_server(timeout_sec=1.0):
223 self.
info_msginfo_msg(
"'DriveOnHeading' action server not available, waiting...")
226 action_request = DriveOnHeading.Goal()
227 action_request.speed = 0.15
228 action_request.target.x = 0.15
229 action_request.time_allowance = Duration(seconds=5).to_msg()
233 self.
error_msgerror_msg(
'Test A failed: Passed while costmap was not available!')
236 self.
info_msginfo_msg(
'Test A passed')
242 action_request.target.x = 0.1
245 if not cmd1
or not cmd2:
246 self.
error_msgerror_msg(
'Test B failed: Failed to DriveOnHeading with valid costmap!')
247 return not cmd1
or not cmd2
249 action_request.target.x = 0.5
252 self.
error_msgerror_msg(
'Test B failed: Failed to preempt and invert command!')
253 return not cmd_preempt
257 self.
error_msgerror_msg(
'Test B failed: Failed to cancel command!')
258 return not cmd_cancel
260 self.
info_msginfo_msg(
'Test B passed')
263 action_request.time_allowance = Duration(seconds=0.1).to_msg()
266 self.
error_msgerror_msg(
'Test C failed: Passed while impoossible timing requested!')
269 action_request.target.y = 0.5
270 cmd_invalid_target = self.
sendCommandsendCommand(action_request)
271 if cmd_invalid_target:
272 self.
error_msgerror_msg(
'Test C failed: Passed while impoossible target requested!')
273 return not cmd_invalid_target
275 action_request.target.y = 0.0
277 action_request.target.x = -0.5
278 cmd_invalid_sign = self.
sendCommandsendCommand(action_request)
280 self.
error_msgerror_msg(
'Test C failed: Passed while impoossible target sign requested!')
281 return not cmd_invalid_sign
283 action_request.target.x = 0.5
284 self.
info_msginfo_msg(
'Test C passed')
287 action_request.time_allowance = Duration(seconds=5).to_msg()
292 self.
error_msgerror_msg(
'Test D failed: Passed while costmap was not lethal!')
295 self.
info_msginfo_msg(
'Test D passed')
298 def shutdown(self) -> None:
299 self.
info_msginfo_msg(
'Shutting down')
301 self.action_client.destroy()
302 self.
info_msginfo_msg(
'Destroyed DriveOnHeading action client')
304 transition_service =
'lifecycle_manager_navigation/manage_nodes'
305 mgr_client: Client[ManageLifecycleNodes.Request, ManageLifecycleNodes.Response] \
306 = self.create_client(ManageLifecycleNodes, transition_service)
307 while not mgr_client.wait_for_service(timeout_sec=1.0):
308 self.
info_msginfo_msg(f
'{transition_service} service not available, waiting...')
310 req = ManageLifecycleNodes.Request()
311 req.command = ManageLifecycleNodes.Request().SHUTDOWN
312 future = mgr_client.call_async(req)
314 rclpy.spin_until_future_complete(self, future)
316 except Exception
as e:
317 self.
error_msgerror_msg(f
'{transition_service} service call failed {e!r}')
319 self.
info_msginfo_msg(f
'{transition_service} finished')
321 def info_msg(self, msg: str) ->
None:
322 self.get_logger().info(msg)
324 def warn_msg(self, msg: str) ->
None:
325 self.get_logger().warning(msg)
327 def error_msg(self, msg: str) ->
None:
328 self.get_logger().error(msg)
331 def main(argv: list[str] = sys.argv[1:]):
339 test.info_msg(
'Exiting failed')
342 test.info_msg(
'Exiting passed')
346 if __name__ ==
'__main__':
None info_msg(self, str msg)
None sendFreeCostmap(self)
bool sendAndPreemptWithFasterCommand(self, DriveOnHeading.Goal command)
None error_msg(self, str msg)
bool sendCommand(self, DriveOnHeading.Goal command)
bool sendAndCancelCommand(self, DriveOnHeading.Goal command)
None sendOccupiedCostmap(self)