21 from typing
import Optional
23 from action_msgs.msg
import GoalStatus
24 from geometry_msgs.msg
import Pose
25 from geometry_msgs.msg
import PoseStamped
26 from geometry_msgs.msg
import PoseWithCovarianceStamped
27 from lifecycle_msgs.srv
import GetState
28 from nav2_msgs.action
import NavigateThroughPoses
29 from nav2_msgs.srv
import ManageLifecycleNodes
33 from rclpy.action
import ActionClient
34 from rclpy.node
import Node
35 from rclpy.qos
import QoSDurabilityPolicy, QoSHistoryPolicy, QoSReliabilityPolicy
36 from rclpy.qos
import QoSProfile
47 super().__init__(node_name=
'nav2_tester', namespace=namespace)
48 self.
initial_pose_pubinitial_pose_pub = self.create_publisher(PoseWithCovarianceStamped,
51 pose_qos = QoSProfile(
52 durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
53 reliability=QoSReliabilityPolicy.RELIABLE,
54 history=QoSHistoryPolicy.KEEP_LAST,
57 self.
model_pose_submodel_pose_sub = self.create_subscription(PoseWithCovarianceStamped,
62 self.
action_clientaction_client = ActionClient(self, NavigateThroughPoses,
'navigate_through_poses')
64 def info_msg(self, msg: str):
65 self.get_logger().info(
'\033[1;37;44m' + msg +
'\033[0m')
67 def warn_msg(self, msg: str):
68 self.get_logger().warn(
'\033[1;37;43m' + msg +
'\033[0m')
70 def error_msg(self, msg: str):
71 self.get_logger().error(
'\033[1;37;41m' + msg +
'\033[0m')
73 def setInitialPose(self):
74 msg = PoseWithCovarianceStamped()
76 msg.header.frame_id =
'map'
77 self.
info_msginfo_msg(
'Publishing Initial Pose')
81 def getStampedPoseMsg(self, pose: Pose):
83 msg.header.frame_id =
'map'
87 def runNavigateAction(self, goal_pose: Optional[Pose] =
None):
89 self.
info_msginfo_msg(
"Waiting for 'NavigateThroughPoses' action server")
90 while not self.
action_clientaction_client.wait_for_server(timeout_sec=1.0):
91 self.
info_msginfo_msg(
"'NavigateThroughPoses' action server not available, waiting...")
93 self.
goal_posegoal_pose = goal_pose
if goal_pose
is not None else self.
goal_posegoal_pose
94 goal_msg = NavigateThroughPoses.Goal()
98 self.
info_msginfo_msg(
'Sending goal request...')
99 send_goal_future = self.
action_clientaction_client.send_goal_async(goal_msg)
101 rclpy.spin_until_future_complete(self, send_goal_future)
102 goal_handle = send_goal_future.result()
104 if not goal_handle.accepted:
108 self.
info_msginfo_msg(
'Goal accepted')
109 get_result_future = goal_handle.get_result_async()
111 self.
info_msginfo_msg(
"Waiting for 'NavigateToPose' action to complete")
112 rclpy.spin_until_future_complete(self, get_result_future)
113 status = get_result_future.result().status
114 if status != GoalStatus.STATUS_SUCCEEDED:
115 self.
info_msginfo_msg(f
'Goal failed with status code: {status}')
118 self.
info_msginfo_msg(
'Goal succeeded!')
121 def runFakeNavigateAction(self):
123 self.
info_msginfo_msg(
"Waiting for 'NavigateThroughPoses' action server")
124 while not self.
action_clientaction_client.wait_for_server(timeout_sec=1.0):
125 self.
info_msginfo_msg(
"'NavigateThroughPoses' action server not available, waiting...")
127 goal_msg = NavigateThroughPoses.Goal()
129 self.
info_msginfo_msg(
'Sending goal request...')
130 send_goal_future = self.
action_clientaction_client.send_goal_async(goal_msg)
132 rclpy.spin_until_future_complete(self, send_goal_future)
133 goal_handle = send_goal_future.result()
135 if not goal_handle.accepted:
139 self.
info_msginfo_msg(
'Goal accepted')
140 get_result_future = goal_handle.get_result_async()
142 self.
info_msginfo_msg(
"Waiting for 'NavigateToPose' action to complete")
143 rclpy.spin_until_future_complete(self, get_result_future)
144 status = get_result_future.result().status
145 if status != GoalStatus.STATUS_SUCCEEDED:
146 self.
info_msginfo_msg(f
'Goal failed with status code: {status}')
149 self.
info_msginfo_msg(
'Goal succeeded!')
152 def runNavigatePreemptionAction(self, block):
154 self.
info_msginfo_msg(
"Waiting for 'NavigateThroughPoses' action server")
155 while not self.
action_clientaction_client.wait_for_server(timeout_sec=1.0):
156 self.
info_msginfo_msg(
"'NavigateThroughPoses' action server not available, waiting...")
158 goal_msg = NavigateThroughPoses.Goal()
161 self.
info_msginfo_msg(
'Sending goal request...')
162 send_goal_future = self.
action_clientaction_client.send_goal_async(goal_msg)
164 rclpy.spin_until_future_complete(self, send_goal_future)
165 goal_handle = send_goal_future.result()
167 if not goal_handle.accepted:
174 self.
info_msginfo_msg(
'Goal accepted')
175 get_result_future = goal_handle.get_result_async()
177 self.
info_msginfo_msg(
"Waiting for 'NavigateToPose' action to complete")
178 rclpy.spin_until_future_complete(self, get_result_future)
179 status = get_result_future.result().status
180 if status != GoalStatus.STATUS_SUCCEEDED:
181 self.
info_msginfo_msg(f
'Goal failed with status code: {status}')
184 self.
info_msginfo_msg(
'Goal succeeded!')
187 def poseCallback(self, msg):
188 self.
info_msginfo_msg(
'Received amcl_pose')
192 def wait_for_node_active(self, node_name: str):
194 self.
info_msginfo_msg(f
'Waiting for {node_name} to become active')
195 node_service = f
'{node_name}/get_state'
196 state_client = self.create_client(GetState, node_service)
197 while not state_client.wait_for_service(timeout_sec=1.0):
198 self.
info_msginfo_msg(f
'{node_service} service not available, waiting...')
199 req = GetState.Request()
201 while (state !=
'active'):
202 self.
info_msginfo_msg(f
'Getting {node_name} state...')
203 future = state_client.call_async(req)
204 rclpy.spin_until_future_complete(self, future)
205 if future.result()
is not None:
206 state = future.result().current_state.label
207 self.
info_msginfo_msg(f
'Result of get_state: {state}')
209 self.
error_msgerror_msg(f
'Exception while calling service: {future.exception()!r}')
213 self.
info_msginfo_msg(
'Shutting down')
216 transition_service =
'lifecycle_manager_navigation/manage_nodes'
217 mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
218 while not mgr_client.wait_for_service(timeout_sec=1.0):
219 self.
info_msginfo_msg(f
'{transition_service} service not available, waiting...')
221 req = ManageLifecycleNodes.Request()
222 req.command = ManageLifecycleNodes.Request().SHUTDOWN
223 future = mgr_client.call_async(req)
225 self.
info_msginfo_msg(
'Shutting down navigation lifecycle manager...')
226 rclpy.spin_until_future_complete(self, future)
228 self.
info_msginfo_msg(
'Shutting down navigation lifecycle manager complete.')
229 except Exception
as e:
230 self.
error_msgerror_msg(f
'Service call failed {e!r}')
231 transition_service =
'lifecycle_manager_localization/manage_nodes'
232 mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
233 while not mgr_client.wait_for_service(timeout_sec=1.0):
234 self.
info_msginfo_msg(f
'{transition_service} service not available, waiting...')
236 req = ManageLifecycleNodes.Request()
237 req.command = ManageLifecycleNodes.Request().SHUTDOWN
238 future = mgr_client.call_async(req)
240 self.
info_msginfo_msg(
'Shutting down localization lifecycle manager...')
241 rclpy.spin_until_future_complete(self, future)
243 self.
info_msginfo_msg(
'Shutting down localization lifecycle manager complete')
244 except Exception
as e:
245 self.
error_msgerror_msg(f
'Service call failed {e!r}')
247 def wait_for_initial_pose(self):
250 self.
info_msginfo_msg(
'Setting initial pose')
252 self.
info_msginfo_msg(
'Waiting for amcl_pose to be received')
253 rclpy.spin_once(self, timeout_sec=1)
256 def run_all_tests(robot_tester):
260 robot_tester.wait_for_node_active(
'amcl')
261 robot_tester.wait_for_initial_pose()
262 robot_tester.wait_for_node_active(
'bt_navigator')
263 result = robot_tester.runNavigateAction()
265 result = result
and not robot_tester.runFakeNavigateAction()
267 result = result
and robot_tester.runNavigatePreemptionAction(
False)
268 result = result
and robot_tester.runNavigatePreemptionAction(
True)
273 robot_tester.info_msg(
'Test PASSED')
275 robot_tester.error_msg(
'Test FAILED')
280 def fwd_pose(x=0.0, y=0.0, z=0.01):
281 initial_pose = Pose()
282 initial_pose.position.x = x
283 initial_pose.position.y = y
284 initial_pose.position.z = z
285 initial_pose.orientation.x = 0.0
286 initial_pose.orientation.y = 0.0
287 initial_pose.orientation.z = 0.0
288 initial_pose.orientation.w = 1.0
292 def get_testers(args):
295 init_x, init_y, final_x, final_y = args.robot[0]
297 initial_pose=fwd_pose(float(init_x), float(init_y)),
298 goal_pose=fwd_pose(float(final_x), float(final_y)))
300 'Starting tester, robot going from ' + init_x +
', ' + init_y +
301 ' to ' + final_x +
', ' + final_y +
'.')
302 testers.append(tester)
306 def main(argv=sys.argv[1:]):
308 parser = argparse.ArgumentParser(description=
'System-level navigation tester node')
309 group = parser.add_mutually_exclusive_group(required=
True)
310 group.add_argument(
'-r',
'--robot', action=
'append', nargs=4,
311 metavar=(
'init_x',
'init_y',
'final_x',
'final_y'),
312 help=
'The robot starting and final positions.')
314 args, unknown = parser.parse_known_args()
319 testers = get_testers(args)
324 for tester
in testers:
325 passed = run_all_tests(tester)
329 for tester
in testers:
333 testers[0].info_msg(
'Done Shutting Down.')
336 testers[0].info_msg(
'Exiting failed')
339 testers[0].info_msg(
'Exiting passed')
343 if __name__ ==
'__main__':
def poseCallback(self, msg)
def info_msg(self, str msg)
def getStampedPoseMsg(self, Pose pose)
def error_msg(self, str msg)