19 from action_msgs.msg
import GoalStatus
20 from geometry_msgs.msg
import Point32, PolygonStamped
21 from nav2_msgs.action
import Spin
22 from nav2_msgs.msg
import Costmap
23 from nav2_msgs.srv
import ManageLifecycleNodes
27 from rclpy.action
import ActionClient
28 from rclpy.duration
import Duration
29 from rclpy.node
import Node
30 from rclpy.qos
import QoSDurabilityPolicy, QoSHistoryPolicy, QoSReliabilityPolicy
31 from rclpy.qos
import QoSProfile
37 super().__init__(node_name=
'spin_tester', namespace=
'')
39 durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
40 reliability=QoSReliabilityPolicy.RELIABLE,
41 history=QoSHistoryPolicy.KEEP_LAST,
44 self.
action_clientaction_client = ActionClient(self, Spin,
'spin')
45 self.
costmap_pubcostmap_pub = self.create_publisher(
46 Costmap,
'local_costmap/costmap_raw', self.
costmap_qoscostmap_qos)
48 PolygonStamped,
'local_costmap/published_footprint', 10)
52 def sendCommand(self, command):
53 self.
info_msginfo_msg(
'Sending goal request...')
56 rclpy.spin_until_future_complete(self, self.
goal_futuregoal_future)
58 except Exception
as e:
59 self.
error_msgerror_msg(f
'Service call failed {e!r}')
65 self.
info_msginfo_msg(
'Goal accepted')
68 self.
info_msginfo_msg(
"Waiting for 'spin' action to complete")
70 rclpy.spin_until_future_complete(self, self.
result_futureresult_future)
74 except Exception
as e:
75 self.
error_msgerror_msg(f
'Service call failed {e!r}')
77 if status != GoalStatus.STATUS_SUCCEEDED:
78 self.
info_msginfo_msg(f
'Goal failed with status code: {status}')
81 self.
info_msginfo_msg(
'Spin was successful!')
83 self.
info_msginfo_msg(
'Spin failed to meet target!')
86 def sendAndPreemptWithInvertedCommand(self, command):
88 self.
info_msginfo_msg(
'Sending goal request...')
91 rclpy.spin_until_future_complete(self, self.
goal_futuregoal_future)
93 except Exception
as e:
94 self.
error_msgerror_msg(f
'Service call failed {e!r}')
100 self.
info_msginfo_msg(
'Goal accepted')
105 self.
info_msginfo_msg(
'Sending preemption request...')
106 command.target_yaw = -command.target_yaw
109 rclpy.spin_until_future_complete(self, self.
goal_futuregoal_future)
111 except Exception
as e:
112 self.
error_msgerror_msg(f
'Service call failed {e!r}')
115 self.
error_msgerror_msg(
'Preemption rejected')
118 self.
info_msginfo_msg(
'Preemption accepted')
122 self.
info_msginfo_msg(
"Waiting for 'spin' action Preemption to complete")
124 rclpy.spin_until_future_complete(self, self.
result_futureresult_future)
128 except Exception
as e:
129 self.
error_msgerror_msg(f
'Service call failed {e!r}')
131 if status != GoalStatus.STATUS_SUCCEEDED:
132 self.
info_msginfo_msg(f
'Goal failed with status code: {status}')
135 self.
info_msginfo_msg(
'Spin was successful!')
137 self.
info_msginfo_msg(
'Spin failed to meet target!')
140 def sendAndCancelCommand(self, command):
141 self.
info_msginfo_msg(
'Sending goal request...')
144 rclpy.spin_until_future_complete(self, self.
goal_futuregoal_future)
146 except Exception
as e:
147 self.
error_msgerror_msg(f
'Service call failed {e!r}')
153 self.
info_msginfo_msg(
'Goal accepted')
158 cancel_future = self.
goal_handlegoal_handle.cancel_goal_async()
159 rclpy.spin_until_future_complete(self, cancel_future)
160 rclpy.spin_until_future_complete(self, self.
result_futureresult_future)
162 if status != GoalStatus.STATUS_CANCELED:
163 self.
info_msginfo_msg(f
'Goal failed with status code: {status}')
166 self.
info_msginfo_msg(
'Goal was canceled successfully')
169 def sendFreeCostmap(self):
170 costmap_msg = Costmap()
171 costmap_msg.header.frame_id =
'odom'
172 costmap_msg.header.stamp = self.get_clock().now().to_msg()
173 costmap_msg.metadata.resolution = 0.05
174 costmap_msg.metadata.size_x = 100
175 costmap_msg.metadata.size_y = 100
176 costmap_msg.metadata.origin.position.x = -2.5
177 costmap_msg.metadata.origin.position.y = -2.5
178 costmap_msg.data = [0] * (costmap_msg.metadata.size_x * costmap_msg.metadata.size_y)
181 footprint_msg = PolygonStamped()
182 footprint_msg.header.frame_id =
'odom'
183 footprint_msg.header.stamp = self.get_clock().now().to_msg()
184 footprint_msg.polygon.points = [
185 Point32(x=0.1, y=0.1, z=0.0),
186 Point32(x=0.1, y=-0.1, z=0.0),
187 Point32(x=-0.1, y=-0.1, z=0.0),
188 Point32(x=-0.1, y=0.1, z=0.0)
192 def sendOccupiedCostmap(self):
193 costmap_msg = Costmap()
194 costmap_msg.header.frame_id =
'odom'
195 costmap_msg.header.stamp = self.get_clock().now().to_msg()
196 costmap_msg.metadata.resolution = 0.05
197 costmap_msg.metadata.size_x = 100
198 costmap_msg.metadata.size_y = 100
199 costmap_msg.metadata.origin.position.x = -2.5
200 costmap_msg.metadata.origin.position.y = -2.5
201 costmap_msg.data = [254] * (costmap_msg.metadata.size_x * costmap_msg.metadata.size_y)
204 footprint_msg = PolygonStamped()
205 footprint_msg.header.frame_id =
'odom'
206 footprint_msg.header.stamp = self.get_clock().now().to_msg()
207 footprint_msg.polygon.points = [
208 Point32(x=0.1, y=0.1, z=0.0),
209 Point32(x=0.1, y=-0.1, z=0.0),
210 Point32(x=-0.1, y=-0.1, z=0.0),
211 Point32(x=-0.1, y=0.1, z=0.0)
216 while not self.
action_clientaction_client.wait_for_server(timeout_sec=1.0):
217 self.
info_msginfo_msg(
"'spin' action server not available, waiting...")
220 action_request = Spin.Goal()
221 action_request.target_yaw = 0.349066
222 action_request.time_allowance = Duration(seconds=5).to_msg()
226 self.
error_msgerror_msg(
'Test A failed: Passed while costmap was not available!')
229 self.
info_msginfo_msg(
'Test A passed')
235 action_request.target_yaw = -3.49066
238 if not cmd1
or not cmd2:
239 self.
error_msgerror_msg(
'Test B failed: Failed to spin with valid costmap!')
240 return not cmd1
or not cmd2
242 action_request.target_yaw = 0.349066
245 self.
error_msgerror_msg(
'Test B failed: Failed to preempt and invert command!')
246 return not cmd_preempt
250 self.
error_msgerror_msg(
'Test B failed: Failed to cancel command!')
251 return not cmd_cancel
253 self.
info_msginfo_msg(
'Test B passed')
256 action_request.time_allowance = Duration(seconds=0.1).to_msg()
259 self.
error_msgerror_msg(
'Test C failed: Passed while impoossible timing requested!')
262 self.
info_msginfo_msg(
'Test C passed')
265 action_request.time_allowance = Duration(seconds=5).to_msg()
270 self.
error_msgerror_msg(
'Test D failed: Passed while costmap was not lethal!')
273 self.
info_msginfo_msg(
'Test D passed')
277 self.
info_msginfo_msg(
'Shutting down')
280 self.
info_msginfo_msg(
'Destroyed backup action client')
282 transition_service =
'lifecycle_manager_navigation/manage_nodes'
283 mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
284 while not mgr_client.wait_for_service(timeout_sec=1.0):
285 self.
info_msginfo_msg(f
'{transition_service} service not available, waiting...')
287 req = ManageLifecycleNodes.Request()
288 req.command = ManageLifecycleNodes.Request().SHUTDOWN
289 future = mgr_client.call_async(req)
291 rclpy.spin_until_future_complete(self, future)
293 except Exception
as e:
294 self.
error_msgerror_msg(f
'{transition_service} service call failed {e!r}')
296 self.
info_msginfo_msg(f
'{transition_service} finished')
298 def info_msg(self, msg: str):
299 self.get_logger().info(msg)
301 def warn_msg(self, msg: str):
302 self.get_logger().warn(msg)
304 def error_msg(self, msg: str):
305 self.get_logger().error(msg)
308 def main(argv=sys.argv[1:]):
316 test.info_msg(
'Exiting failed')
319 test.info_msg(
'Exiting passed')
323 if __name__ ==
'__main__':
def sendFreeCostmap(self)
def sendCommand(self, command)
def info_msg(self, str msg)
def sendAndCancelCommand(self, command)
def error_msg(self, str msg)
def sendOccupiedCostmap(self)
def sendAndPreemptWithInvertedCommand(self, command)