18 from typing
import Optional
20 from action_msgs.msg
import GoalStatus
22 from nav2_msgs.action
import BackUp
24 from nav2_msgs.srv
import ManageLifecycleNodes
26 from rclpy.action
import ActionClient
27 from rclpy.action.client
import ClientGoalHandle
28 from rclpy.client
import Client
29 from rclpy.duration
import Duration
30 from rclpy.node
import Node
31 from rclpy.qos
import QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy
36 def __init__(self) -> None:
37 super().__init__(node_name=
'backup_tester', namespace=
'')
39 durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
40 reliability=QoSReliabilityPolicy.RELIABLE,
41 history=QoSHistoryPolicy.KEEP_LAST,
44 self.action_client: ActionClient[
48 ] = ActionClient(self, BackUp,
'backup')
49 self.
costmap_pubcostmap_pub = self.create_publisher(
50 Costmap,
'local_costmap/costmap_raw', self.
costmap_qoscostmap_qos)
52 PolygonStamped,
'local_costmap/published_footprint', 10)
53 self.
goal_handlegoal_handle: Optional[ClientGoalHandle[
54 BackUp.Goal, BackUp.Result, BackUp.Feedback]] =
None
57 def sendCommand(self, command: BackUp.Goal) -> bool:
58 self.
info_msginfo_msg(
'Sending goal request...')
59 self.
goal_futuregoal_future = self.action_client.send_goal_async(command)
61 rclpy.spin_until_future_complete(self, self.
goal_futuregoal_future)
63 except Exception
as e:
64 self.
error_msgerror_msg(f
'Service call failed {e!r}')
70 self.
info_msginfo_msg(
'Goal accepted')
73 self.
info_msginfo_msg(
"Waiting for 'Backup' action to complete")
75 rclpy.spin_until_future_complete(self, self.
result_futureresult_future)
79 except Exception
as e:
80 self.
error_msgerror_msg(f
'Service call failed {e!r}')
82 if status != GoalStatus.STATUS_SUCCEEDED:
83 self.
info_msginfo_msg(f
'Goal failed with status code: {status}')
86 self.
info_msginfo_msg(
'Backup was successful!')
88 self.
info_msginfo_msg(
'Backup failed to meet target!')
91 def sendAndPreemptWithFasterCommand(self, command: BackUp.Goal) -> bool:
93 self.
info_msginfo_msg(
'Sending goal request...')
94 self.
goal_futuregoal_future = self.action_client.send_goal_async(command)
96 rclpy.spin_until_future_complete(self, self.
goal_futuregoal_future)
98 except Exception
as e:
99 self.
error_msgerror_msg(f
'Service call failed {e!r}')
105 self.
info_msginfo_msg(
'Goal accepted')
110 self.
info_msginfo_msg(
'Sending preemption request...')
112 self.
goal_futuregoal_future = self.action_client.send_goal_async(command)
114 rclpy.spin_until_future_complete(self, self.
goal_futuregoal_future)
116 except Exception
as e:
117 self.
error_msgerror_msg(f
'Service call failed {e!r}')
120 self.
error_msgerror_msg(
'Preemption rejected')
123 self.
info_msginfo_msg(
'Preemption accepted')
127 self.
info_msginfo_msg(
"Waiting for 'backup' action Preemption to complete")
129 rclpy.spin_until_future_complete(self, self.
result_futureresult_future)
133 except Exception
as e:
134 self.
error_msgerror_msg(f
'Service call failed {e!r}')
136 if status != GoalStatus.STATUS_SUCCEEDED:
137 self.
info_msginfo_msg(f
'Goal failed with status code: {status}')
140 self.
info_msginfo_msg(
'Backup was successful!')
142 self.
info_msginfo_msg(
'Backup failed to meet target!')
145 def sendAndCancelCommand(self, command: BackUp.Goal) -> bool:
146 self.
info_msginfo_msg(
'Sending goal request...')
147 self.
goal_futuregoal_future = self.action_client.send_goal_async(command)
149 rclpy.spin_until_future_complete(self, self.
goal_futuregoal_future)
151 except Exception
as e:
152 self.
error_msgerror_msg(f
'Service call failed {e!r}')
158 self.
info_msginfo_msg(
'Goal accepted')
163 cancel_future = self.
goal_handlegoal_handle.cancel_goal_async()
164 rclpy.spin_until_future_complete(self, cancel_future)
165 rclpy.spin_until_future_complete(self, self.
result_futureresult_future)
167 if status != GoalStatus.STATUS_CANCELED:
168 self.
info_msginfo_msg(f
'Goal failed with status code: {status}')
171 self.
info_msginfo_msg(
'Goal was canceled successfully')
174 def sendFreeCostmap(self) -> None:
175 costmap_msg = Costmap()
176 costmap_msg.header.frame_id =
'odom'
177 costmap_msg.header.stamp = self.get_clock().now().to_msg()
178 costmap_msg.metadata.resolution = 0.05
179 costmap_msg.metadata.size_x = 100
180 costmap_msg.metadata.size_y = 100
181 costmap_msg.metadata.origin.position.x = -2.5
182 costmap_msg.metadata.origin.position.y = -2.5
183 costmap_msg.data = [0] * (costmap_msg.metadata.size_x * costmap_msg.metadata.size_y)
186 footprint_msg = PolygonStamped()
187 footprint_msg.header.frame_id =
'odom'
188 footprint_msg.header.stamp = self.get_clock().now().to_msg()
189 footprint_msg.polygon.points = [
190 Point32(x=0.1, y=0.1, z=0.0),
191 Point32(x=0.1, y=-0.1, z=0.0),
192 Point32(x=-0.1, y=-0.1, z=0.0),
193 Point32(x=-0.1, y=0.1, z=0.0)
197 def sendOccupiedCostmap(self) -> None:
198 costmap_msg = Costmap()
199 costmap_msg.header.frame_id =
'odom'
200 costmap_msg.header.stamp = self.get_clock().now().to_msg()
201 costmap_msg.metadata.resolution = 0.05
202 costmap_msg.metadata.size_x = 100
203 costmap_msg.metadata.size_y = 100
204 costmap_msg.metadata.origin.position.x = -2.5
205 costmap_msg.metadata.origin.position.y = -2.5
206 costmap_msg.data = [254] * (costmap_msg.metadata.size_x * costmap_msg.metadata.size_y)
209 footprint_msg = PolygonStamped()
210 footprint_msg.header.frame_id =
'odom'
211 footprint_msg.header.stamp = self.get_clock().now().to_msg()
212 footprint_msg.polygon.points = [
213 Point32(x=0.1, y=0.1, z=0.0),
214 Point32(x=0.1, y=-0.1, z=0.0),
215 Point32(x=-0.1, y=-0.1, z=0.0),
216 Point32(x=-0.1, y=0.1, z=0.0)
220 def run(self) -> bool:
221 while not self.action_client.wait_for_server(timeout_sec=1.0):
222 self.
info_msginfo_msg(
"'Backup' action server not available, waiting...")
225 action_request = BackUp.Goal()
226 action_request.speed = 0.15
227 action_request.target.x = 0.15
228 action_request.time_allowance = Duration(seconds=5).to_msg()
232 self.
error_msgerror_msg(
'Test A failed: Passed while costmap was not available!')
235 self.
info_msginfo_msg(
'Test A passed')
241 action_request.target.x = 0.1
244 if not cmd1
or not cmd2:
245 self.
error_msgerror_msg(
'Test B failed: Failed to Backup with valid costmap!')
246 return not cmd1
or not cmd2
248 action_request.target.x = 0.5
251 self.
error_msgerror_msg(
'Test B failed: Failed to preempt and invert command!')
252 return not cmd_preempt
256 self.
error_msgerror_msg(
'Test B failed: Failed to cancel command!')
257 return not cmd_cancel
259 self.
info_msginfo_msg(
'Test B passed')
262 action_request.time_allowance = Duration(seconds=0.1).to_msg()
265 self.
error_msgerror_msg(
'Test C failed: Passed while impoossible timing requested!')
268 action_request.target.y = 0.5
269 cmd_invalid_target = self.
sendCommandsendCommand(action_request)
270 if cmd_invalid_target:
271 self.
error_msgerror_msg(
'Test C failed: Passed while impoossible target requested!')
272 return not cmd_invalid_target
274 action_request.target.y = 0.0
275 self.
info_msginfo_msg(
'Test C passed')
278 action_request.time_allowance = Duration(seconds=5).to_msg()
283 self.
error_msgerror_msg(
'Test D failed: Passed while costmap was not lethal!')
286 self.
info_msginfo_msg(
'Test D passed')
289 def shutdown(self) -> None:
290 self.
info_msginfo_msg(
'Shutting down')
292 self.action_client.destroy()
293 self.
info_msginfo_msg(
'Destroyed backup action client')
295 transition_service =
'lifecycle_manager_navigation/manage_nodes'
296 mgr_client: Client[ManageLifecycleNodes.Request, ManageLifecycleNodes.Response] \
297 = self.create_client(ManageLifecycleNodes, transition_service)
298 while not mgr_client.wait_for_service(timeout_sec=1.0):
299 self.
info_msginfo_msg(f
'{transition_service} service not available, waiting...')
301 req = ManageLifecycleNodes.Request()
302 req.command = ManageLifecycleNodes.Request().SHUTDOWN
303 future = mgr_client.call_async(req)
305 rclpy.spin_until_future_complete(self, future)
307 except Exception
as e:
308 self.
error_msgerror_msg(f
'{transition_service} service call failed {e!r}')
310 self.
info_msginfo_msg(f
'{transition_service} finished')
312 def info_msg(self, msg: str) ->
None:
313 self.get_logger().info(msg)
315 def warn_msg(self, msg: str) ->
None:
316 self.get_logger().warning(msg)
318 def error_msg(self, msg: str) ->
None:
319 self.get_logger().error(msg)
322 def main(argv: list[str] = sys.argv[1:]):
330 test.info_msg(
'Exiting failed')
333 test.info_msg(
'Exiting passed')
337 if __name__ ==
'__main__':
bool sendAndPreemptWithFasterCommand(self, BackUp.Goal command)
bool sendAndCancelCommand(self, BackUp.Goal command)
bool sendCommand(self, BackUp.Goal command)
None error_msg(self, str msg)
None sendFreeCostmap(self)
None info_msg(self, str msg)
None sendOccupiedCostmap(self)