21 from typing
import Optional
23 from action_msgs.msg
import GoalStatus
24 from geometry_msgs.msg
import Pose
25 from geometry_msgs.msg
import PoseStamped
26 from geometry_msgs.msg
import PoseWithCovarianceStamped
27 from lifecycle_msgs.srv
import GetState
28 from nav2_msgs.action
import NavigateToPose
29 from nav2_msgs.srv
import ManageLifecycleNodes
33 from rclpy.action
import ActionClient
34 from rclpy.node
import Node
35 from rclpy.qos
import QoSDurabilityPolicy, QoSHistoryPolicy, QoSReliabilityPolicy
36 from rclpy.qos
import QoSProfile
39 class NavTester(Node):
47 super().__init__(node_name=
'nav2_tester', namespace=namespace)
48 self.initial_pose_pub = self.create_publisher(PoseWithCovarianceStamped,
50 self.goal_pub = self.create_publisher(PoseStamped,
'goal_pose', 10)
52 pose_qos = QoSProfile(
53 durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
54 reliability=QoSReliabilityPolicy.RELIABLE,
55 history=QoSHistoryPolicy.KEEP_LAST,
58 self.model_pose_sub = self.create_subscription(PoseWithCovarianceStamped,
59 'amcl_pose', self.poseCallback, pose_qos)
60 self.initial_pose_received =
False
61 self.initial_pose = initial_pose
62 self.goal_pose = goal_pose
63 self.action_client = ActionClient(self, NavigateToPose,
'navigate_to_pose')
65 def info_msg(self, msg: str):
66 self.get_logger().info(
'\033[1;37;44m' + msg +
'\033[0m')
68 def warn_msg(self, msg: str):
69 self.get_logger().warn(
'\033[1;37;43m' + msg +
'\033[0m')
71 def error_msg(self, msg: str):
72 self.get_logger().error(
'\033[1;37;41m' + msg +
'\033[0m')
74 def setInitialPose(self):
75 msg = PoseWithCovarianceStamped()
76 msg.pose.pose = self.initial_pose
77 msg.header.frame_id =
'map'
78 self.info_msg(
'Publishing Initial Pose')
79 self.initial_pose_pub.publish(msg)
80 self.currentPose = self.initial_pose
82 def getStampedPoseMsg(self, pose: Pose):
84 msg.header.frame_id =
'map'
88 def publishGoalPose(self, goal_pose: Optional[Pose] =
None):
89 self.goal_pose = goal_pose
if goal_pose
is not None else self.goal_pose
90 self.goal_pub.publish(self.getStampedPoseMsg(self.goal_pose))
92 def runNavigateAction(self, goal_pose: Optional[Pose] =
None):
94 self.info_msg(
"Waiting for 'NavigateToPose' action server")
95 while not self.action_client.wait_for_server(timeout_sec=1.0):
96 self.info_msg(
"'NavigateToPose' action server not available, waiting...")
98 self.goal_pose = goal_pose
if goal_pose
is not None else self.goal_pose
99 goal_msg = NavigateToPose.Goal()
100 goal_msg.pose = self.getStampedPoseMsg(self.goal_pose)
102 self.info_msg(
'Sending goal request...')
103 send_goal_future = self.action_client.send_goal_async(goal_msg)
105 rclpy.spin_until_future_complete(self, send_goal_future)
106 goal_handle = send_goal_future.result()
108 if not goal_handle.accepted:
109 self.error_msg(
'Goal rejected')
112 self.info_msg(
'Goal accepted')
113 get_result_future = goal_handle.get_result_async()
115 self.info_msg(
"Waiting for 'NavigateToPose' action to complete")
116 rclpy.spin_until_future_complete(self, get_result_future)
117 status = get_result_future.result().status
118 if status != GoalStatus.STATUS_ABORTED:
119 self.info_msg(f
'Goal failed with status code: {status}')
122 self.info_msg(
'Goal failed, as expected!')
125 def poseCallback(self, msg):
126 self.info_msg(
'Received amcl_pose')
127 self.current_pose = msg.pose.pose
128 self.initial_pose_received =
True
130 def reachesGoal(self, timeout, distance):
132 start_time = time.time()
134 while not goalReached:
135 rclpy.spin_once(self, timeout_sec=1)
136 if self.distanceFromGoal() < distance:
138 self.info_msg(
'*** GOAL REACHED ***')
140 elif timeout
is not None:
141 if (time.time() - start_time) > timeout:
142 self.error_msg(
'Robot timed out reaching its goal!')
145 def distanceFromGoal(self):
146 d_x = self.current_pose.position.x - self.goal_pose.position.x
147 d_y = self.current_pose.position.y - self.goal_pose.position.y
148 distance = math.sqrt(d_x * d_x + d_y * d_y)
149 self.info_msg(f
'Distance from goal is: {distance}')
152 def wait_for_node_active(self, node_name: str):
154 self.info_msg(f
'Waiting for {node_name} to become active')
155 node_service = f
'{node_name}/get_state'
156 state_client = self.create_client(GetState, node_service)
157 while not state_client.wait_for_service(timeout_sec=1.0):
158 self.info_msg(f
'{node_service} service not available, waiting...')
159 req = GetState.Request()
161 while (state !=
'active'):
162 self.info_msg(f
'Getting {node_name} state...')
163 future = state_client.call_async(req)
164 rclpy.spin_until_future_complete(self, future)
165 if future.result()
is not None:
166 state = future.result().current_state.label
167 self.info_msg(f
'Result of get_state: {state}')
169 self.error_msg(f
'Exception while calling service: {future.exception()!r}')
173 self.info_msg(
'Shutting down')
174 self.action_client.destroy()
176 transition_service =
'lifecycle_manager_navigation/manage_nodes'
177 mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
178 while not mgr_client.wait_for_service(timeout_sec=1.0):
179 self.info_msg(f
'{transition_service} service not available, waiting...')
181 req = ManageLifecycleNodes.Request()
182 req.command = ManageLifecycleNodes.Request().SHUTDOWN
183 future = mgr_client.call_async(req)
185 self.info_msg(
'Shutting down navigation lifecycle manager...')
186 rclpy.spin_until_future_complete(self, future)
188 self.info_msg(
'Shutting down navigation lifecycle manager complete.')
189 except Exception
as e:
190 self.error_msg(f
'Service call failed {e!r}')
191 transition_service =
'lifecycle_manager_localization/manage_nodes'
192 mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
193 while not mgr_client.wait_for_service(timeout_sec=1.0):
194 self.info_msg(f
'{transition_service} service not available, waiting...')
196 req = ManageLifecycleNodes.Request()
197 req.command = ManageLifecycleNodes.Request().SHUTDOWN
198 future = mgr_client.call_async(req)
200 self.info_msg(
'Shutting down localization lifecycle manager...')
201 rclpy.spin_until_future_complete(self, future)
203 self.info_msg(
'Shutting down localization lifecycle manager complete')
204 except Exception
as e:
205 self.error_msg(f
'Service call failed {e!r}')
207 def wait_for_initial_pose(self):
208 self.initial_pose_received =
False
209 while not self.initial_pose_received:
210 self.info_msg(
'Setting initial pose')
211 self.setInitialPose()
212 self.info_msg(
'Waiting for amcl_pose to be received')
213 rclpy.spin_once(self, timeout_sec=1)
216 def run_all_tests(robot_tester):
220 robot_tester.wait_for_node_active(
'amcl')
221 robot_tester.wait_for_initial_pose()
222 robot_tester.wait_for_node_active(
'bt_navigator')
223 result = robot_tester.runNavigateAction()
228 robot_tester.info_msg(
'Test PASSED')
230 robot_tester.error_msg(
'Test FAILED')
235 def fwd_pose(x=0.0, y=0.0, z=0.01):
236 initial_pose = Pose()
237 initial_pose.position.x = x
238 initial_pose.position.y = y
239 initial_pose.position.z = z
240 initial_pose.orientation.x = 0.0
241 initial_pose.orientation.y = 0.0
242 initial_pose.orientation.z = 0.0
243 initial_pose.orientation.w = 1.0
247 def get_testers(args):
252 init_x, init_y, final_x, final_y = args.robot[0]
254 initial_pose=fwd_pose(float(init_x), float(init_y)),
255 goal_pose=fwd_pose(float(final_x), float(final_y)))
257 'Starting tester, robot going from ' + init_x +
', ' + init_y +
258 ' to ' + final_x +
', ' + final_y +
'.')
259 testers.append(tester)
265 def main(argv=sys.argv[1:]):
267 parser = argparse.ArgumentParser(description=
'System-level navigation tester node')
268 group = parser.add_mutually_exclusive_group(required=
True)
269 group.add_argument(
'-r',
'--robot', action=
'append', nargs=4,
270 metavar=(
'init_x',
'init_y',
'final_x',
'final_y'),
271 help=
'The robot starting and final positions.')
272 group.add_argument(
'-rs',
'--robots', action=
'append', nargs=5,
273 metavar=(
'name',
'init_x',
'init_y',
'final_x',
'final_y'),
274 help=
"The robot's namespace and starting and final positions. " +
275 'Repeating the argument for multiple robots is supported.')
277 args, unknown = parser.parse_known_args()
282 testers = get_testers(args)
287 for tester
in testers:
288 passed = run_all_tests(tester)
292 for tester
in testers:
296 testers[0].info_msg(
'Done Shutting Down.')
299 testers[0].info_msg(
'Exiting failed')
302 testers[0].info_msg(
'Exiting passed')
306 if __name__ ==
'__main__':