18 from typing
import Optional
20 from action_msgs.msg
import GoalStatus
22 from nav2_msgs.action
import Spin
24 from nav2_msgs.srv
import ManageLifecycleNodes
26 from rclpy.action
import ActionClient
27 from rclpy.action.client
import ClientGoalHandle
28 from rclpy.client
import Client
29 from rclpy.duration
import Duration
30 from rclpy.node
import Node
31 from rclpy.qos
import QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy
36 def __init__(self) -> None:
37 super().__init__(node_name=
'spin_tester', namespace=
'')
39 durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
40 reliability=QoSReliabilityPolicy.RELIABLE,
41 history=QoSHistoryPolicy.KEEP_LAST,
44 self.action_client: ActionClient[Spin.Goal, Spin.Result, Spin.Feedback] \
45 = ActionClient(self, Spin,
'spin')
47 self.
costmap_pubcostmap_pub = self.create_publisher(
48 Costmap,
'local_costmap/costmap_raw', self.
costmap_qoscostmap_qos)
50 PolygonStamped,
'local_costmap/published_footprint', 10)
51 self.
goal_handlegoal_handle: Optional[ClientGoalHandle[
52 Spin.Goal, Spin.Result, Spin.Feedback]] =
None
55 def sendCommand(self, command: Spin.Goal) -> bool:
56 self.
info_msginfo_msg(
'Sending goal request...')
57 self.
goal_futuregoal_future = self.action_client.send_goal_async(command)
59 rclpy.spin_until_future_complete(self, self.
goal_futuregoal_future)
61 except Exception
as e:
62 self.
error_msgerror_msg(f
'Service call failed {e!r}')
68 self.
info_msginfo_msg(
'Goal accepted')
71 self.
info_msginfo_msg(
"Waiting for 'spin' action to complete")
73 rclpy.spin_until_future_complete(self, self.
result_futureresult_future)
77 except Exception
as e:
78 self.
error_msgerror_msg(f
'Service call failed {e!r}')
80 if status != GoalStatus.STATUS_SUCCEEDED:
81 self.
info_msginfo_msg(f
'Goal failed with status code: {status}')
84 self.
info_msginfo_msg(
'Spin was successful!')
86 self.
info_msginfo_msg(
'Spin failed to meet target!')
89 def sendAndPreemptWithInvertedCommand(self, command: Spin.Goal) -> bool:
91 self.
info_msginfo_msg(
'Sending goal request...')
92 self.
goal_futuregoal_future = self.action_client.send_goal_async(command)
94 rclpy.spin_until_future_complete(self, self.
goal_futuregoal_future)
96 except Exception
as e:
97 self.
error_msgerror_msg(f
'Service call failed {e!r}')
103 self.
info_msginfo_msg(
'Goal accepted')
108 self.
info_msginfo_msg(
'Sending preemption request...')
109 command.target_yaw = -command.target_yaw
110 self.
goal_futuregoal_future = self.action_client.send_goal_async(command)
112 rclpy.spin_until_future_complete(self, self.
goal_futuregoal_future)
114 except Exception
as e:
115 self.
error_msgerror_msg(f
'Service call failed {e!r}')
118 self.
error_msgerror_msg(
'Preemption rejected')
121 self.
info_msginfo_msg(
'Preemption accepted')
125 self.
info_msginfo_msg(
"Waiting for 'spin' action Preemption to complete")
127 rclpy.spin_until_future_complete(self, self.
result_futureresult_future)
131 except Exception
as e:
132 self.
error_msgerror_msg(f
'Service call failed {e!r}')
134 if status != GoalStatus.STATUS_SUCCEEDED:
135 self.
info_msginfo_msg(f
'Goal failed with status code: {status}')
138 self.
info_msginfo_msg(
'Spin was successful!')
140 self.
info_msginfo_msg(
'Spin failed to meet target!')
143 def sendAndCancelCommand(self, command: Spin.Goal) -> bool:
144 self.
info_msginfo_msg(
'Sending goal request...')
145 self.
goal_futuregoal_future = self.action_client.send_goal_async(command)
147 rclpy.spin_until_future_complete(self, self.
goal_futuregoal_future)
149 except Exception
as e:
150 self.
error_msgerror_msg(f
'Service call failed {e!r}')
156 self.
info_msginfo_msg(
'Goal accepted')
161 cancel_future = self.
goal_handlegoal_handle.cancel_goal_async()
162 rclpy.spin_until_future_complete(self, cancel_future)
163 rclpy.spin_until_future_complete(self, self.
result_futureresult_future)
165 if status != GoalStatus.STATUS_CANCELED:
166 self.
info_msginfo_msg(f
'Goal failed with status code: {status}')
169 self.
info_msginfo_msg(
'Goal was canceled successfully')
172 def sendFreeCostmap(self) -> None:
173 costmap_msg = Costmap()
174 costmap_msg.header.frame_id =
'odom'
175 costmap_msg.header.stamp = self.get_clock().now().to_msg()
176 costmap_msg.metadata.resolution = 0.05
177 costmap_msg.metadata.size_x = 100
178 costmap_msg.metadata.size_y = 100
179 costmap_msg.metadata.origin.position.x = -2.5
180 costmap_msg.metadata.origin.position.y = -2.5
181 costmap_msg.data = [0] * (costmap_msg.metadata.size_x * costmap_msg.metadata.size_y)
184 footprint_msg = PolygonStamped()
185 footprint_msg.header.frame_id =
'odom'
186 footprint_msg.header.stamp = self.get_clock().now().to_msg()
187 footprint_msg.polygon.points = [
188 Point32(x=0.1, y=0.1, z=0.0),
189 Point32(x=0.1, y=-0.1, z=0.0),
190 Point32(x=-0.1, y=-0.1, z=0.0),
191 Point32(x=-0.1, y=0.1, z=0.0)
195 def sendOccupiedCostmap(self) -> None:
196 costmap_msg = Costmap()
197 costmap_msg.header.frame_id =
'odom'
198 costmap_msg.header.stamp = self.get_clock().now().to_msg()
199 costmap_msg.metadata.resolution = 0.05
200 costmap_msg.metadata.size_x = 100
201 costmap_msg.metadata.size_y = 100
202 costmap_msg.metadata.origin.position.x = -2.5
203 costmap_msg.metadata.origin.position.y = -2.5
204 costmap_msg.data = [254] * (costmap_msg.metadata.size_x * costmap_msg.metadata.size_y)
207 footprint_msg = PolygonStamped()
208 footprint_msg.header.frame_id =
'odom'
209 footprint_msg.header.stamp = self.get_clock().now().to_msg()
210 footprint_msg.polygon.points = [
211 Point32(x=0.1, y=0.1, z=0.0),
212 Point32(x=0.1, y=-0.1, z=0.0),
213 Point32(x=-0.1, y=-0.1, z=0.0),
214 Point32(x=-0.1, y=0.1, z=0.0)
218 def run(self) -> bool:
219 while not self.action_client.wait_for_server(timeout_sec=1.0):
220 self.
info_msginfo_msg(
"'spin' action server not available, waiting...")
223 action_request = Spin.Goal()
224 action_request.target_yaw = 0.349066
225 action_request.time_allowance = Duration(seconds=5).to_msg()
229 self.
error_msgerror_msg(
'Test A failed: Passed while costmap was not available!')
232 self.
info_msginfo_msg(
'Test A passed')
238 action_request.target_yaw = -3.49066
241 if not cmd1
or not cmd2:
242 self.
error_msgerror_msg(
'Test B failed: Failed to spin with valid costmap!')
243 return not cmd1
or not cmd2
245 action_request.target_yaw = 0.349066
248 self.
error_msgerror_msg(
'Test B failed: Failed to preempt and invert command!')
249 return not cmd_preempt
253 self.
error_msgerror_msg(
'Test B failed: Failed to cancel command!')
254 return not cmd_cancel
256 self.
info_msginfo_msg(
'Test B passed')
259 action_request.time_allowance = Duration(seconds=0.1).to_msg()
262 self.
error_msgerror_msg(
'Test C failed: Passed while impoossible timing requested!')
265 self.
info_msginfo_msg(
'Test C passed')
268 action_request.time_allowance = Duration(seconds=5).to_msg()
273 self.
error_msgerror_msg(
'Test D failed: Passed while costmap was not lethal!')
276 self.
info_msginfo_msg(
'Test D passed')
279 def shutdown(self) -> None:
280 self.
info_msginfo_msg(
'Shutting down')
282 self.action_client.destroy()
283 self.
info_msginfo_msg(
'Destroyed backup action client')
285 transition_service =
'lifecycle_manager_navigation/manage_nodes'
286 mgr_client: Client[ManageLifecycleNodes.Request, ManageLifecycleNodes.Response] \
287 = self.create_client(ManageLifecycleNodes, transition_service)
288 while not mgr_client.wait_for_service(timeout_sec=1.0):
289 self.
info_msginfo_msg(f
'{transition_service} service not available, waiting...')
291 req = ManageLifecycleNodes.Request()
292 req.command = ManageLifecycleNodes.Request().SHUTDOWN
293 future = mgr_client.call_async(req)
295 rclpy.spin_until_future_complete(self, future)
297 except Exception
as e:
298 self.
error_msgerror_msg(f
'{transition_service} service call failed {e!r}')
300 self.
info_msginfo_msg(f
'{transition_service} finished')
302 def info_msg(self, msg: str) ->
None:
303 self.get_logger().info(msg)
305 def warn_msg(self, msg: str) ->
None:
306 self.get_logger().warning(msg)
308 def error_msg(self, msg: str) ->
None:
309 self.get_logger().error(msg)
312 def main(argv: list[str] = sys.argv[1:]):
320 test.info_msg(
'Exiting failed')
323 test.info_msg(
'Exiting passed')
327 if __name__ ==
'__main__':
None sendOccupiedCostmap(self)
bool sendAndCancelCommand(self, Spin.Goal command)
None error_msg(self, str msg)
None sendFreeCostmap(self)
bool sendCommand(self, Spin.Goal command)
None info_msg(self, str msg)
bool sendAndPreemptWithInvertedCommand(self, Spin.Goal command)