18 from typing
import Optional
20 from action_msgs.msg
import GoalStatus
22 from nav2_msgs.action
import BackUp
24 from nav2_msgs.srv
import ManageLifecycleNodes
26 from rclpy.action
import ActionClient
27 from rclpy.action.client
import ClientGoalHandle
28 from rclpy.duration
import Duration
29 from rclpy.node
import Node
30 from rclpy.qos
import QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy
35 def __init__(self) -> None:
36 super().__init__(node_name=
'backup_tester', namespace=
'')
38 durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
39 reliability=QoSReliabilityPolicy.RELIABLE,
40 history=QoSHistoryPolicy.KEEP_LAST,
43 self.
action_clientaction_client = ActionClient(self, BackUp,
'backup')
44 self.
costmap_pubcostmap_pub = self.create_publisher(
45 Costmap,
'local_costmap/costmap_raw', self.
costmap_qoscostmap_qos)
47 PolygonStamped,
'local_costmap/published_footprint', 10)
48 self.
goal_handlegoal_handle: Optional[ClientGoalHandle[
49 BackUp.Goal, BackUp.Result, BackUp.Feedback]] =
None
52 def sendCommand(self, command: BackUp.Goal) -> bool:
53 self.
info_msginfo_msg(
'Sending goal request...')
56 rclpy.spin_until_future_complete(self, self.
goal_futuregoal_future)
58 except Exception
as e:
59 self.
error_msgerror_msg(f
'Service call failed {e!r}')
65 self.
info_msginfo_msg(
'Goal accepted')
68 self.
info_msginfo_msg(
"Waiting for 'Backup' action to complete")
70 rclpy.spin_until_future_complete(self, self.
result_futureresult_future)
74 except Exception
as e:
75 self.
error_msgerror_msg(f
'Service call failed {e!r}')
77 if status != GoalStatus.STATUS_SUCCEEDED:
78 self.
info_msginfo_msg(f
'Goal failed with status code: {status}')
81 self.
info_msginfo_msg(
'Backup was successful!')
83 self.
info_msginfo_msg(
'Backup failed to meet target!')
86 def sendAndPreemptWithFasterCommand(self, command: BackUp.Goal) -> bool:
88 self.
info_msginfo_msg(
'Sending goal request...')
91 rclpy.spin_until_future_complete(self, self.
goal_futuregoal_future)
93 except Exception
as e:
94 self.
error_msgerror_msg(f
'Service call failed {e!r}')
100 self.
info_msginfo_msg(
'Goal accepted')
105 self.
info_msginfo_msg(
'Sending preemption request...')
109 rclpy.spin_until_future_complete(self, self.
goal_futuregoal_future)
111 except Exception
as e:
112 self.
error_msgerror_msg(f
'Service call failed {e!r}')
115 self.
error_msgerror_msg(
'Preemption rejected')
118 self.
info_msginfo_msg(
'Preemption accepted')
122 self.
info_msginfo_msg(
"Waiting for 'backup' action Preemption to complete")
124 rclpy.spin_until_future_complete(self, self.
result_futureresult_future)
128 except Exception
as e:
129 self.
error_msgerror_msg(f
'Service call failed {e!r}')
131 if status != GoalStatus.STATUS_SUCCEEDED:
132 self.
info_msginfo_msg(f
'Goal failed with status code: {status}')
135 self.
info_msginfo_msg(
'Backup was successful!')
137 self.
info_msginfo_msg(
'Backup failed to meet target!')
140 def sendAndCancelCommand(self, command: BackUp.Goal) -> bool:
141 self.
info_msginfo_msg(
'Sending goal request...')
144 rclpy.spin_until_future_complete(self, self.
goal_futuregoal_future)
146 except Exception
as e:
147 self.
error_msgerror_msg(f
'Service call failed {e!r}')
153 self.
info_msginfo_msg(
'Goal accepted')
158 cancel_future = self.
goal_handlegoal_handle.cancel_goal_async()
159 rclpy.spin_until_future_complete(self, cancel_future)
160 rclpy.spin_until_future_complete(self, self.
result_futureresult_future)
162 if status != GoalStatus.STATUS_CANCELED:
163 self.
info_msginfo_msg(f
'Goal failed with status code: {status}')
166 self.
info_msginfo_msg(
'Goal was canceled successfully')
169 def sendFreeCostmap(self) -> None:
170 costmap_msg = Costmap()
171 costmap_msg.header.frame_id =
'odom'
172 costmap_msg.header.stamp = self.get_clock().now().to_msg()
173 costmap_msg.metadata.resolution = 0.05
174 costmap_msg.metadata.size_x = 100
175 costmap_msg.metadata.size_y = 100
176 costmap_msg.metadata.origin.position.x = -2.5
177 costmap_msg.metadata.origin.position.y = -2.5
178 costmap_msg.data = [0] * (costmap_msg.metadata.size_x * costmap_msg.metadata.size_y)
181 footprint_msg = PolygonStamped()
182 footprint_msg.header.frame_id =
'odom'
183 footprint_msg.header.stamp = self.get_clock().now().to_msg()
184 footprint_msg.polygon.points = [
185 Point32(x=0.1, y=0.1, z=0.0),
186 Point32(x=0.1, y=-0.1, z=0.0),
187 Point32(x=-0.1, y=-0.1, z=0.0),
188 Point32(x=-0.1, y=0.1, z=0.0)
192 def sendOccupiedCostmap(self) -> None:
193 costmap_msg = Costmap()
194 costmap_msg.header.frame_id =
'odom'
195 costmap_msg.header.stamp = self.get_clock().now().to_msg()
196 costmap_msg.metadata.resolution = 0.05
197 costmap_msg.metadata.size_x = 100
198 costmap_msg.metadata.size_y = 100
199 costmap_msg.metadata.origin.position.x = -2.5
200 costmap_msg.metadata.origin.position.y = -2.5
201 costmap_msg.data = [254] * (costmap_msg.metadata.size_x * costmap_msg.metadata.size_y)
204 footprint_msg = PolygonStamped()
205 footprint_msg.header.frame_id =
'odom'
206 footprint_msg.header.stamp = self.get_clock().now().to_msg()
207 footprint_msg.polygon.points = [
208 Point32(x=0.1, y=0.1, z=0.0),
209 Point32(x=0.1, y=-0.1, z=0.0),
210 Point32(x=-0.1, y=-0.1, z=0.0),
211 Point32(x=-0.1, y=0.1, z=0.0)
215 def run(self) -> bool:
216 while not self.
action_clientaction_client.wait_for_server(timeout_sec=1.0):
217 self.
info_msginfo_msg(
"'Backup' action server not available, waiting...")
220 action_request = BackUp.Goal()
221 action_request.speed = 0.15
222 action_request.target.x = 0.15
223 action_request.time_allowance = Duration(seconds=5).to_msg()
227 self.
error_msgerror_msg(
'Test A failed: Passed while costmap was not available!')
230 self.
info_msginfo_msg(
'Test A passed')
236 action_request.target.x = 0.1
239 if not cmd1
or not cmd2:
240 self.
error_msgerror_msg(
'Test B failed: Failed to Backup with valid costmap!')
241 return not cmd1
or not cmd2
243 action_request.target.x = 0.5
246 self.
error_msgerror_msg(
'Test B failed: Failed to preempt and invert command!')
247 return not cmd_preempt
251 self.
error_msgerror_msg(
'Test B failed: Failed to cancel command!')
252 return not cmd_cancel
254 self.
info_msginfo_msg(
'Test B passed')
257 action_request.time_allowance = Duration(seconds=0.1).to_msg()
260 self.
error_msgerror_msg(
'Test C failed: Passed while impoossible timing requested!')
263 action_request.target.y = 0.5
264 cmd_invalid_target = self.
sendCommandsendCommand(action_request)
265 if cmd_invalid_target:
266 self.
error_msgerror_msg(
'Test C failed: Passed while impoossible target requested!')
267 return not cmd_invalid_target
269 action_request.target.y = 0.0
270 self.
info_msginfo_msg(
'Test C passed')
273 action_request.time_allowance = Duration(seconds=5).to_msg()
278 self.
error_msgerror_msg(
'Test D failed: Passed while costmap was not lethal!')
281 self.
info_msginfo_msg(
'Test D passed')
284 def shutdown(self) -> None:
285 self.
info_msginfo_msg(
'Shutting down')
288 self.
info_msginfo_msg(
'Destroyed backup action client')
290 transition_service =
'lifecycle_manager_navigation/manage_nodes'
291 mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
292 while not mgr_client.wait_for_service(timeout_sec=1.0):
293 self.
info_msginfo_msg(f
'{transition_service} service not available, waiting...')
295 req = ManageLifecycleNodes.Request()
296 req.command = ManageLifecycleNodes.Request().SHUTDOWN
297 future = mgr_client.call_async(req)
299 rclpy.spin_until_future_complete(self, future)
301 except Exception
as e:
302 self.
error_msgerror_msg(f
'{transition_service} service call failed {e!r}')
304 self.
info_msginfo_msg(f
'{transition_service} finished')
306 def info_msg(self, msg: str) ->
None:
307 self.get_logger().info(msg)
309 def warn_msg(self, msg: str) ->
None:
310 self.get_logger().warn(msg)
312 def error_msg(self, msg: str) ->
None:
313 self.get_logger().error(msg)
316 def main(argv: list[str] = sys.argv[1:]):
324 test.info_msg(
'Exiting failed')
327 test.info_msg(
'Exiting passed')
331 if __name__ ==
'__main__':
bool sendAndPreemptWithFasterCommand(self, BackUp.Goal command)
bool sendAndCancelCommand(self, BackUp.Goal command)
bool sendCommand(self, BackUp.Goal command)
None error_msg(self, str msg)
None sendFreeCostmap(self)
None info_msg(self, str msg)
None sendOccupiedCostmap(self)