21 from typing
import Optional
23 from action_msgs.msg
import GoalStatus
25 from lifecycle_msgs.srv
import GetState
26 from nav2_msgs.action
import NavigateToPose
27 from nav2_msgs.srv
import ManageLifecycleNodes
29 from rclpy.action
import ActionClient
30 from rclpy.client
import Client
31 from rclpy.node
import Node
32 from rclpy.qos
import QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy
35 class NavTester(Node):
37 def __init__(self, initial_pose: Pose, goal_pose: Pose, namespace: str =
''):
38 super().__init__(node_name=
'nav2_tester', namespace=namespace)
39 self.initial_pose_pub = self.create_publisher(
40 PoseWithCovarianceStamped,
'initialpose', 10
42 self.goal_pub = self.create_publisher(PoseStamped,
'goal_pose', 10)
44 pose_qos = QoSProfile(
45 durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
46 reliability=QoSReliabilityPolicy.RELIABLE,
47 history=QoSHistoryPolicy.KEEP_LAST,
51 self.model_pose_sub = self.create_subscription(
52 PoseWithCovarianceStamped,
'amcl_pose', self.poseCallback, pose_qos
54 self.initial_pose_received =
False
55 self.initial_pose = initial_pose
56 self.goal_pose = goal_pose
57 self.action_client: ActionClient[
59 NavigateToPose.Result,
60 NavigateToPose.Feedback
61 ] = ActionClient(self, NavigateToPose,
'navigate_to_pose')
63 def info_msg(self, msg: str) ->
None:
64 self.get_logger().info(
'\033[1;37;44m' + msg +
'\033[0m')
66 def warn_msg(self, msg: str) ->
None:
67 self.get_logger().warning(
'\033[1;37;43m' + msg +
'\033[0m')
69 def error_msg(self, msg: str) ->
None:
70 self.get_logger().error(
'\033[1;37;41m' + msg +
'\033[0m')
72 def setInitialPose(self) -> None:
73 msg = PoseWithCovarianceStamped()
74 msg.pose.pose = self.initial_pose
75 msg.header.frame_id =
'map'
76 self.info_msg(
'Publishing Initial Pose')
77 self.initial_pose_pub.publish(msg)
78 self.currentPose = self.initial_pose
80 def getStampedPoseMsg(self, pose: Pose) -> PoseStamped:
82 msg.header.frame_id =
'map'
86 def publishGoalPose(self, goal_pose: Optional[Pose] =
None) ->
None:
87 self.goal_pose = goal_pose
if goal_pose
is not None else self.goal_pose
88 self.goal_pub.publish(self.getStampedPoseMsg(self.goal_pose))
90 def runNavigateAction(self, goal_pose: Optional[Pose] =
None) -> bool:
92 self.info_msg(
"Waiting for 'NavigateToPose' action server")
93 while not self.action_client.wait_for_server(timeout_sec=1.0):
94 self.info_msg(
"'NavigateToPose' action server not available, waiting...")
96 self.goal_pose = goal_pose
if goal_pose
is not None else self.goal_pose
97 goal_msg = NavigateToPose.Goal()
98 goal_msg.pose = self.getStampedPoseMsg(self.goal_pose)
100 self.info_msg(
'Sending goal request...')
101 send_goal_future = self.action_client.send_goal_async(goal_msg)
103 rclpy.spin_until_future_complete(self, send_goal_future)
104 goal_handle = send_goal_future.result()
106 if not goal_handle
or not goal_handle.accepted:
107 self.error_msg(
'Goal rejected')
110 self.info_msg(
'Goal accepted')
111 get_result_future = goal_handle.get_result_async()
113 self.info_msg(
"Waiting for 'NavigateToPose' action to complete")
114 rclpy.spin_until_future_complete(self, get_result_future)
115 status = get_result_future.result().status
116 if status != GoalStatus.STATUS_ABORTED:
117 self.info_msg(f
'Goal failed with status code: {status}')
120 self.info_msg(
'Goal failed, as expected!')
123 def poseCallback(self, msg: PoseWithCovarianceStamped) ->
None:
124 self.info_msg(
'Received amcl_pose')
125 self.current_pose = msg.pose.pose
126 self.initial_pose_received =
True
128 def reachesGoal(self, timeout: float, distance: float) -> bool:
130 start_time = time.time()
132 while not goalReached:
133 rclpy.spin_once(self, timeout_sec=1)
134 if self.distanceFromGoal() < distance:
136 self.info_msg(
'*** GOAL REACHED ***')
138 elif timeout
is not None:
139 if (time.time() - start_time) > timeout:
140 self.error_msg(
'Robot timed out reaching its goal!')
143 self.info_msg(
'Robot reached its goal!')
146 def distanceFromGoal(self) -> float:
147 d_x = self.current_pose.position.x - self.goal_pose.position.x
148 d_y = self.current_pose.position.y - self.goal_pose.position.y
149 distance = math.sqrt(d_x * d_x + d_y * d_y)
150 self.info_msg(f
'Distance from goal is: {distance}')
153 def wait_for_node_active(self, node_name: str) ->
None:
155 self.info_msg(f
'Waiting for {node_name} to become active')
156 node_service = f
'{node_name}/get_state'
157 state_client: Client[GetState.Request, GetState.Response] = \
158 self.create_client(GetState, node_service)
159 while not state_client.wait_for_service(timeout_sec=1.0):
160 self.info_msg(f
'{node_service} service not available, waiting...')
161 req = GetState.Request()
163 while state !=
'active':
164 self.info_msg(f
'Getting {node_name} state...')
165 future = state_client.call_async(req)
166 rclpy.spin_until_future_complete(self, future)
167 if future.result()
is not None:
168 state = future.result().current_state.label
169 self.info_msg(f
'Result of get_state: {state}')
172 f
'Exception while calling service: {future.exception()!r}'
176 def shutdown(self) -> None:
177 self.info_msg(
'Shutting down')
178 self.action_client.destroy()
180 transition_service =
'lifecycle_manager_navigation/manage_nodes'
181 mgr_client: Client[ManageLifecycleNodes.Request, ManageLifecycleNodes.Response] = \
182 self.create_client(ManageLifecycleNodes, transition_service)
183 while not mgr_client.wait_for_service(timeout_sec=1.0):
184 self.info_msg(f
'{transition_service} service not available, waiting...')
186 req = ManageLifecycleNodes.Request()
187 req.command = ManageLifecycleNodes.Request().SHUTDOWN
188 future = mgr_client.call_async(req)
190 self.info_msg(
'Shutting down navigation lifecycle manager...')
191 rclpy.spin_until_future_complete(self, future)
193 self.info_msg(
'Shutting down navigation lifecycle manager complete.')
194 except Exception
as e:
195 self.error_msg(f
'Service call failed {e!r}')
196 transition_service =
'lifecycle_manager_localization/manage_nodes'
197 mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
198 while not mgr_client.wait_for_service(timeout_sec=1.0):
199 self.info_msg(f
'{transition_service} service not available, waiting...')
201 req = ManageLifecycleNodes.Request()
202 req.command = ManageLifecycleNodes.Request().SHUTDOWN
203 future = mgr_client.call_async(req)
205 self.info_msg(
'Shutting down localization lifecycle manager...')
206 rclpy.spin_until_future_complete(self, future)
208 self.info_msg(
'Shutting down localization lifecycle manager complete')
209 except Exception
as e:
210 self.error_msg(f
'Service call failed {e!r}')
212 def wait_for_initial_pose(self) -> None:
213 self.initial_pose_received =
False
214 while not self.initial_pose_received:
215 self.info_msg(
'Setting initial pose')
216 self.setInitialPose()
217 self.info_msg(
'Waiting for amcl_pose to be received')
218 rclpy.spin_once(self, timeout_sec=1)
221 def run_all_tests(robot_tester: NavTester) -> bool:
225 robot_tester.wait_for_node_active(
'amcl')
226 robot_tester.wait_for_initial_pose()
227 robot_tester.wait_for_node_active(
'bt_navigator')
228 result = robot_tester.runNavigateAction()
233 robot_tester.info_msg(
'Test PASSED')
235 robot_tester.error_msg(
'Test FAILED')
240 def fwd_pose(x: float = 0.0, y: float = 0.0, z: float = 0.01) -> Pose:
241 initial_pose = Pose()
242 initial_pose.position.x = x
243 initial_pose.position.y = y
244 initial_pose.position.z = z
245 initial_pose.orientation.x = 0.0
246 initial_pose.orientation.y = 0.0
247 initial_pose.orientation.z = 0.0
248 initial_pose.orientation.w = 1.0
252 def get_testers(args: argparse.Namespace) -> list[NavTester]:
257 init_x, init_y, final_x, final_y = args.robot[0]
259 initial_pose=fwd_pose(float(init_x), float(init_y)),
260 goal_pose=fwd_pose(float(final_x), float(final_y)),
263 'Starting tester, robot going from '
273 testers.append(tester)
279 def main(argv: list[str] = sys.argv[1:]):
281 parser = argparse.ArgumentParser(description=
'System-level navigation tester node')
282 group = parser.add_mutually_exclusive_group(required=
True)
288 metavar=(
'init_x',
'init_y',
'final_x',
'final_y'),
289 help=
'The robot starting and final positions.',
296 metavar=(
'name',
'init_x',
'init_y',
'final_x',
'final_y'),
297 help=
"The robot's namespace and starting and final positions. "
298 +
'Repeating the argument for multiple robots is supported.',
301 args, unknown = parser.parse_known_args()
306 testers = get_testers(args)
311 for tester
in testers:
312 passed = run_all_tests(tester)
316 for tester
in testers:
320 testers[0].info_msg(
'Done Shutting Down.')
323 testers[0].info_msg(
'Exiting failed')
326 testers[0].info_msg(
'Exiting passed')
330 if __name__ ==
'__main__':