18 from typing
import Optional
20 from action_msgs.msg
import GoalStatus
22 from nav2_msgs.action
import DriveOnHeading
24 from nav2_msgs.srv
import ManageLifecycleNodes
26 from rclpy.action
import ActionClient
27 from rclpy.action.client
import ClientGoalHandle
28 from rclpy.duration
import Duration
29 from rclpy.node
import Node
30 from rclpy.qos
import QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy
35 def __init__(self) -> None:
36 super().__init__(node_name=
'drive_tester', namespace=
'')
38 durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
39 reliability=QoSReliabilityPolicy.RELIABLE,
40 history=QoSHistoryPolicy.KEEP_LAST,
43 self.
action_clientaction_client = ActionClient(self, DriveOnHeading,
'drive_on_heading')
44 self.
costmap_pubcostmap_pub = self.create_publisher(
45 Costmap,
'local_costmap/costmap_raw', self.
costmap_qoscostmap_qos)
47 PolygonStamped,
'local_costmap/published_footprint', 10)
48 self.
goal_handlegoal_handle: Optional[ClientGoalHandle[
49 DriveOnHeading.Goal, DriveOnHeading.Result,
50 DriveOnHeading.Feedback]] =
None
53 def sendCommand(self, command: DriveOnHeading.Goal) -> bool:
54 self.
info_msginfo_msg(
'Sending goal request...')
57 rclpy.spin_until_future_complete(self, self.
goal_futuregoal_future)
59 except Exception
as e:
60 self.
error_msgerror_msg(f
'Service call failed {e!r}')
66 self.
info_msginfo_msg(
'Goal accepted')
69 self.
info_msginfo_msg(
"Waiting for 'DriveOnHeading' action to complete")
71 rclpy.spin_until_future_complete(self, self.
result_futureresult_future)
75 except Exception
as e:
76 self.
error_msgerror_msg(f
'Service call failed {e!r}')
78 if status != GoalStatus.STATUS_SUCCEEDED:
79 self.
info_msginfo_msg(f
'Goal failed with status code: {status}')
82 self.
info_msginfo_msg(
'DriveOnHeading was successful!')
84 self.
info_msginfo_msg(
'DriveOnHeading failed to meet target!')
87 def sendAndPreemptWithFasterCommand(self, command: DriveOnHeading.Goal) -> bool:
89 self.
info_msginfo_msg(
'Sending goal request...')
92 rclpy.spin_until_future_complete(self, self.
goal_futuregoal_future)
94 except Exception
as e:
95 self.
error_msgerror_msg(f
'Service call failed {e!r}')
101 self.
info_msginfo_msg(
'Goal accepted')
106 self.
info_msginfo_msg(
'Sending preemption request...')
110 rclpy.spin_until_future_complete(self, self.
goal_futuregoal_future)
112 except Exception
as e:
113 self.
error_msgerror_msg(f
'Service call failed {e!r}')
116 self.
error_msgerror_msg(
'Preemption rejected')
119 self.
info_msginfo_msg(
'Preemption accepted')
123 self.
info_msginfo_msg(
"Waiting for 'DriveOnHeading' action Preemption to complete")
125 rclpy.spin_until_future_complete(self, self.
result_futureresult_future)
129 except Exception
as e:
130 self.
error_msgerror_msg(f
'Service call failed {e!r}')
132 if status != GoalStatus.STATUS_SUCCEEDED:
133 self.
info_msginfo_msg(f
'Goal failed with status code: {status}')
136 self.
info_msginfo_msg(
'DriveOnHeading was successful!')
138 self.
info_msginfo_msg(
'DriveOnHeading failed to meet target!')
141 def sendAndCancelCommand(self, command: DriveOnHeading.Goal) -> bool:
142 self.
info_msginfo_msg(
'Sending goal request...')
145 rclpy.spin_until_future_complete(self, self.
goal_futuregoal_future)
147 except Exception
as e:
148 self.
error_msgerror_msg(f
'Service call failed {e!r}')
154 self.
info_msginfo_msg(
'Goal accepted')
159 cancel_future = self.
goal_handlegoal_handle.cancel_goal_async()
160 rclpy.spin_until_future_complete(self, cancel_future)
161 rclpy.spin_until_future_complete(self, self.
result_futureresult_future)
163 if status != GoalStatus.STATUS_CANCELED:
164 self.
info_msginfo_msg(f
'Goal failed with status code: {status}')
167 self.
info_msginfo_msg(
'Goal was canceled successfully')
170 def sendFreeCostmap(self) -> None:
171 costmap_msg = Costmap()
172 costmap_msg.header.frame_id =
'odom'
173 costmap_msg.header.stamp = self.get_clock().now().to_msg()
174 costmap_msg.metadata.resolution = 0.05
175 costmap_msg.metadata.size_x = 100
176 costmap_msg.metadata.size_y = 100
177 costmap_msg.metadata.origin.position.x = -2.5
178 costmap_msg.metadata.origin.position.y = -2.5
179 costmap_msg.data = [0] * (costmap_msg.metadata.size_x * costmap_msg.metadata.size_y)
182 footprint_msg = PolygonStamped()
183 footprint_msg.header.frame_id =
'odom'
184 footprint_msg.header.stamp = self.get_clock().now().to_msg()
185 footprint_msg.polygon.points = [
186 Point32(x=0.1, y=0.1, z=0.0),
187 Point32(x=0.1, y=-0.1, z=0.0),
188 Point32(x=-0.1, y=-0.1, z=0.0),
189 Point32(x=-0.1, y=0.1, z=0.0)
193 def sendOccupiedCostmap(self) -> None:
194 costmap_msg = Costmap()
195 costmap_msg.header.frame_id =
'odom'
196 costmap_msg.header.stamp = self.get_clock().now().to_msg()
197 costmap_msg.metadata.resolution = 0.05
198 costmap_msg.metadata.size_x = 100
199 costmap_msg.metadata.size_y = 100
200 costmap_msg.metadata.origin.position.x = -2.5
201 costmap_msg.metadata.origin.position.y = -2.5
202 costmap_msg.data = [254] * (costmap_msg.metadata.size_x * costmap_msg.metadata.size_y)
205 footprint_msg = PolygonStamped()
206 footprint_msg.header.frame_id =
'odom'
207 footprint_msg.header.stamp = self.get_clock().now().to_msg()
208 footprint_msg.polygon.points = [
209 Point32(x=0.1, y=0.1, z=0.0),
210 Point32(x=0.1, y=-0.1, z=0.0),
211 Point32(x=-0.1, y=-0.1, z=0.0),
212 Point32(x=-0.1, y=0.1, z=0.0)
216 def run(self) -> bool:
217 while not self.
action_clientaction_client.wait_for_server(timeout_sec=1.0):
218 self.
info_msginfo_msg(
"'DriveOnHeading' action server not available, waiting...")
221 action_request = DriveOnHeading.Goal()
222 action_request.speed = 0.15
223 action_request.target.x = 0.15
224 action_request.time_allowance = Duration(seconds=5).to_msg()
228 self.
error_msgerror_msg(
'Test A failed: Passed while costmap was not available!')
231 self.
info_msginfo_msg(
'Test A passed')
237 action_request.target.x = 0.1
240 if not cmd1
or not cmd2:
241 self.
error_msgerror_msg(
'Test B failed: Failed to DriveOnHeading with valid costmap!')
242 return not cmd1
or not cmd2
244 action_request.target.x = 0.5
247 self.
error_msgerror_msg(
'Test B failed: Failed to preempt and invert command!')
248 return not cmd_preempt
252 self.
error_msgerror_msg(
'Test B failed: Failed to cancel command!')
253 return not cmd_cancel
255 self.
info_msginfo_msg(
'Test B passed')
258 action_request.time_allowance = Duration(seconds=0.1).to_msg()
261 self.
error_msgerror_msg(
'Test C failed: Passed while impoossible timing requested!')
264 action_request.target.y = 0.5
265 cmd_invalid_target = self.
sendCommandsendCommand(action_request)
266 if cmd_invalid_target:
267 self.
error_msgerror_msg(
'Test C failed: Passed while impoossible target requested!')
268 return not cmd_invalid_target
270 action_request.target.y = 0.0
272 action_request.target.x = -0.5
273 cmd_invalid_sign = self.
sendCommandsendCommand(action_request)
275 self.
error_msgerror_msg(
'Test C failed: Passed while impoossible target sign requested!')
276 return not cmd_invalid_sign
278 action_request.target.x = 0.5
279 self.
info_msginfo_msg(
'Test C passed')
282 action_request.time_allowance = Duration(seconds=5).to_msg()
287 self.
error_msgerror_msg(
'Test D failed: Passed while costmap was not lethal!')
290 self.
info_msginfo_msg(
'Test D passed')
293 def shutdown(self) -> None:
294 self.
info_msginfo_msg(
'Shutting down')
297 self.
info_msginfo_msg(
'Destroyed DriveOnHeading action client')
299 transition_service =
'lifecycle_manager_navigation/manage_nodes'
300 mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
301 while not mgr_client.wait_for_service(timeout_sec=1.0):
302 self.
info_msginfo_msg(f
'{transition_service} service not available, waiting...')
304 req = ManageLifecycleNodes.Request()
305 req.command = ManageLifecycleNodes.Request().SHUTDOWN
306 future = mgr_client.call_async(req)
308 rclpy.spin_until_future_complete(self, future)
310 except Exception
as e:
311 self.
error_msgerror_msg(f
'{transition_service} service call failed {e!r}')
313 self.
info_msginfo_msg(f
'{transition_service} finished')
315 def info_msg(self, msg: str) ->
None:
316 self.get_logger().info(msg)
318 def warn_msg(self, msg: str) ->
None:
319 self.get_logger().warn(msg)
321 def error_msg(self, msg: str) ->
None:
322 self.get_logger().error(msg)
325 def main(argv: list[str] = sys.argv[1:]):
333 test.info_msg(
'Exiting failed')
336 test.info_msg(
'Exiting passed')
340 if __name__ ==
'__main__':
None info_msg(self, str msg)
None sendFreeCostmap(self)
bool sendAndPreemptWithFasterCommand(self, DriveOnHeading.Goal command)
None error_msg(self, str msg)
bool sendCommand(self, DriveOnHeading.Goal command)
bool sendAndCancelCommand(self, DriveOnHeading.Goal command)
None sendOccupiedCostmap(self)