22 from typing
import Optional
24 from action_msgs.msg
import GoalStatus
25 from geometry_msgs.msg
import Pose
26 from geometry_msgs.msg
import PoseStamped
27 from geometry_msgs.msg
import PoseWithCovarianceStamped
28 from lifecycle_msgs.srv
import GetState
29 from nav2_msgs.action
import NavigateToPose
30 from nav2_msgs.srv
import ManageLifecycleNodes
34 from rclpy.action
import ActionClient
35 from rclpy.node
import Node
36 from rclpy.qos
import QoSDurabilityPolicy, QoSHistoryPolicy, QoSReliabilityPolicy
37 from rclpy.qos
import QoSProfile
42 def __init__(self, initial_pose: Pose, goal_pose: Pose, namespace: str =
''):
43 super().__init__(node_name=
'nav2_tester', namespace=namespace)
45 PoseWithCovarianceStamped,
'initialpose', 10
47 self.
goal_pubgoal_pub = self.create_publisher(PoseStamped,
'goal_pose', 10)
49 pose_qos = QoSProfile(
50 durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
51 reliability=QoSReliabilityPolicy.RELIABLE,
52 history=QoSHistoryPolicy.KEEP_LAST,
57 PoseWithCovarianceStamped,
'amcl_pose', self.
poseCallbackposeCallback, pose_qos
63 self.
action_clientaction_client = ActionClient(self, NavigateToPose,
'navigate_to_pose')
65 def info_msg(self, msg: str):
66 self.get_logger().info(
'\033[1;37;44m' + msg +
'\033[0m')
68 def warn_msg(self, msg: str):
69 self.get_logger().warn(
'\033[1;37;43m' + msg +
'\033[0m')
71 def error_msg(self, msg: str):
72 self.get_logger().error(
'\033[1;37;41m' + msg +
'\033[0m')
74 def setInitialPose(self):
75 msg = PoseWithCovarianceStamped()
77 msg.header.frame_id =
'map'
78 self.
info_msginfo_msg(
'Publishing Initial Pose')
82 def getStampedPoseMsg(self, pose: Pose):
84 msg.header.frame_id =
'map'
88 def publishGoalPose(self, goal_pose: Optional[Pose] =
None):
89 self.
goal_posegoal_pose = goal_pose
if goal_pose
is not None else self.
goal_posegoal_pose
92 def runNavigateAction(self, goal_pose: Optional[Pose] =
None):
94 self.
info_msginfo_msg(
"Waiting for 'NavigateToPose' action server")
95 while not self.
action_clientaction_client.wait_for_server(timeout_sec=1.0):
96 self.
info_msginfo_msg(
"'NavigateToPose' action server not available, waiting...")
98 self.
goal_posegoal_pose = goal_pose
if goal_pose
is not None else self.
goal_posegoal_pose
99 goal_msg = NavigateToPose.Goal()
102 self.
info_msginfo_msg(
'Sending goal request...')
103 send_goal_future = self.
action_clientaction_client.send_goal_async(goal_msg)
105 rclpy.spin_until_future_complete(self, send_goal_future)
106 goal_handle = send_goal_future.result()
108 if not goal_handle.accepted:
112 self.
info_msginfo_msg(
'Goal accepted')
113 get_result_future = goal_handle.get_result_async()
117 self.
info_msginfo_msg(
"Waiting for 'NavigateToPose' action to complete")
118 rclpy.spin_until_future_complete(self, get_result_future)
119 status = get_result_future.result().status
120 if status != GoalStatus.STATUS_SUCCEEDED:
121 self.
info_msginfo_msg(f
'Goal failed with status code: {status}')
124 if not future_return:
127 self.
info_msginfo_msg(
'Goal succeeded!')
130 def poseCallback(self, msg):
131 self.
info_msginfo_msg(
'Received amcl_pose')
135 def reachesGoal(self, timeout, distance):
137 start_time = time.time()
139 while not goalReached:
140 rclpy.spin_once(self, timeout_sec=1)
143 self.
info_msginfo_msg(
'*** GOAL REACHED ***')
145 elif timeout
is not None:
146 if (time.time() - start_time) > timeout:
147 self.
error_msgerror_msg(
'Robot timed out reaching its goal!')
150 def distanceFromGoal(self):
153 distance = math.sqrt(d_x * d_x + d_y * d_y)
154 self.
info_msginfo_msg(f
'Distance from goal is: {distance}')
157 def wait_for_node_active(self, node_name: str):
159 self.
info_msginfo_msg(f
'Waiting for {node_name} to become active')
160 node_service = f
'{node_name}/get_state'
161 state_client = self.create_client(GetState, node_service)
162 while not state_client.wait_for_service(timeout_sec=1.0):
163 self.
info_msginfo_msg(f
'{node_service} service not available, waiting...')
164 req = GetState.Request()
166 while state !=
'active':
167 self.
info_msginfo_msg(f
'Getting {node_name} state...')
168 future = state_client.call_async(req)
169 rclpy.spin_until_future_complete(self, future)
170 if future.result()
is not None:
171 state = future.result().current_state.label
172 self.
info_msginfo_msg(f
'Result of get_state: {state}')
175 f
'Exception while calling service: {future.exception()!r}'
180 self.
info_msginfo_msg(
'Shutting down')
183 transition_service =
'lifecycle_manager_navigation/manage_nodes'
184 mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
185 while not mgr_client.wait_for_service(timeout_sec=1.0):
186 self.
info_msginfo_msg(f
'{transition_service} service not available, waiting...')
188 req = ManageLifecycleNodes.Request()
189 req.command = ManageLifecycleNodes.Request().SHUTDOWN
190 future = mgr_client.call_async(req)
192 self.
info_msginfo_msg(
'Shutting down navigation lifecycle manager...')
193 rclpy.spin_until_future_complete(self, future)
195 self.
info_msginfo_msg(
'Shutting down navigation lifecycle manager complete.')
196 except Exception
as e:
197 self.
error_msgerror_msg(f
'Service call failed {e!r}')
198 transition_service =
'lifecycle_manager_localization/manage_nodes'
199 mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
200 while not mgr_client.wait_for_service(timeout_sec=1.0):
201 self.
info_msginfo_msg(f
'{transition_service} service not available, waiting...')
203 req = ManageLifecycleNodes.Request()
204 req.command = ManageLifecycleNodes.Request().SHUTDOWN
205 future = mgr_client.call_async(req)
207 self.
info_msginfo_msg(
'Shutting down localization lifecycle manager...')
208 rclpy.spin_until_future_complete(self, future)
210 self.
info_msginfo_msg(
'Shutting down localization lifecycle manager complete')
211 except Exception
as e:
212 self.
error_msgerror_msg(f
'Service call failed {e!r}')
214 def wait_for_initial_pose(self):
219 start_time = time.time()
222 self.
info_msginfo_msg(
'Setting initial pose')
224 self.
info_msginfo_msg(
'Waiting for amcl_pose to be received')
225 duration = time.time() - start_time
227 self.
error_msgerror_msg(
'Timeout waiting for initial pose to be set')
229 rclpy.spin_once(self, timeout_sec=1)
233 def test_RobotMovesToGoal(robot_tester):
234 robot_tester.info_msg(
'Setting goal pose')
235 robot_tester.publishGoalPose()
236 robot_tester.info_msg(
'Waiting 60 seconds for robot to reach goal')
237 return robot_tester.reachesGoal(timeout=60, distance=0.5)
240 def run_all_tests(robot_tester):
244 robot_tester.wait_for_node_active(
'amcl')
245 result = robot_tester.wait_for_initial_pose()
247 robot_tester.wait_for_node_active(
'bt_navigator')
248 result = robot_tester.runNavigateAction()
251 result = test_RobotMovesToGoal(robot_tester)
256 robot_tester.info_msg(
'Test PASSED')
258 robot_tester.error_msg(
'Test FAILED')
263 def fwd_pose(x=0.0, y=0.0, z=0.01):
264 initial_pose = Pose()
265 initial_pose.position.x = x
266 initial_pose.position.y = y
267 initial_pose.position.z = z
268 initial_pose.orientation.x = 0.0
269 initial_pose.orientation.y = 0.0
270 initial_pose.orientation.z = 0.0
271 initial_pose.orientation.w = 1.0
275 def get_testers(args):
280 init_x, init_y, final_x, final_y = args.robot[0]
282 initial_pose=fwd_pose(float(init_x), float(init_y)),
283 goal_pose=fwd_pose(float(final_x), float(final_y)),
286 'Starting tester, robot going from '
296 testers.append(tester)
300 for robot
in args.robots:
301 namespace, init_x, init_y, final_x, final_y = robot
304 initial_pose=fwd_pose(float(init_x), float(init_y)),
305 goal_pose=fwd_pose(float(final_x), float(final_y)),
308 'Starting tester for '
319 testers.append(tester)
323 def check_args(expect_failure: str):
325 if expect_failure !=
'True' and expect_failure !=
'False':
327 '\033[1;37;41m' +
' -e flag must be set to True or False only. ' +
'\033[0m'
331 return eval(expect_failure)
334 def main(argv=sys.argv[1:]):
336 parser = argparse.ArgumentParser(description=
'System-level navigation tester node')
337 parser.add_argument(
'-e',
'--expect_failure')
338 group = parser.add_mutually_exclusive_group(required=
True)
344 metavar=(
'init_x',
'init_y',
'final_x',
'final_y'),
345 help=
'The robot starting and final positions.',
352 metavar=(
'name',
'init_x',
'init_y',
'final_x',
'final_y'),
353 help=
"The robot's namespace and starting and final positions. "
354 +
'Repeating the argument for multiple robots is supported.',
357 args, unknown = parser.parse_known_args()
359 expect_failure = check_args(args.expect_failure)
364 testers = get_testers(args)
369 for tester
in testers:
370 passed = run_all_tests(tester)
371 if passed != expect_failure:
374 for tester
in testers:
378 testers[0].info_msg(
'Done Shutting Down.')
380 if passed != expect_failure:
381 testers[0].info_msg(
'Exiting failed')
384 testers[0].info_msg(
'Exiting passed')
388 if __name__ ==
'__main__':
def info_msg(self, str msg)
def distanceFromGoal(self)
def error_msg(self, str msg)
def poseCallback(self, msg)
def getStampedPoseMsg(self, Pose pose)