24 from typing
import Any, Optional
26 from action_msgs.msg
import GoalStatus
28 from lifecycle_msgs.srv
import GetState
29 from nav2_msgs.action
import NavigateToPose
30 from nav2_msgs.srv
import ManageLifecycleNodes
32 from rclpy.action
import ActionClient
33 from rclpy.client
import Client
34 from rclpy.node
import Node
35 from rclpy.qos
import QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy
41 def __init__(self, initial_pose: Pose, goal_pose: Pose, namespace: str =
''):
42 super().__init__(node_name=
'nav2_tester', namespace=namespace)
44 PoseWithCovarianceStamped,
'initialpose', 10
46 self.
goal_pubgoal_pub = self.create_publisher(PoseStamped,
'goal_pose', 10)
48 pose_qos = QoSProfile(
49 durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
50 reliability=QoSReliabilityPolicy.RELIABLE,
51 history=QoSHistoryPolicy.KEEP_LAST,
56 PoseWithCovarianceStamped,
'amcl_pose', self.
poseCallbackposeCallback, pose_qos
62 self.action_client: ActionClient[
64 NavigateToPose.Result,
65 NavigateToPose.Feedback
66 ] = ActionClient(self, NavigateToPose,
'navigate_to_pose')
68 def info_msg(self, msg: str) ->
None:
69 self.get_logger().info(
'\033[1;37;44m' + msg +
'\033[0m')
71 def warn_msg(self, msg: str) ->
None:
72 self.get_logger().warning(
'\033[1;37;43m' + msg +
'\033[0m')
74 def error_msg(self, msg: str) ->
None:
75 self.get_logger().error(
'\033[1;37;41m' + msg +
'\033[0m')
77 def setInitialPose(self) -> None:
78 msg = PoseWithCovarianceStamped()
80 msg.header.frame_id =
'map'
81 self.
info_msginfo_msg(
'Publishing Initial Pose')
85 def getStampedPoseMsg(self, pose: Pose) -> PoseStamped:
87 msg.header.frame_id =
'map'
91 def publishGoalPose(self, goal_pose: Optional[Pose] =
None) ->
None:
92 self.
goal_posegoal_pose = goal_pose
if goal_pose
is not None else self.
goal_posegoal_pose
95 def runNavigateAction(self, goal_pose: Optional[Pose] =
None) -> bool:
97 self.
info_msginfo_msg(
"Waiting for 'NavigateToPose' action server")
98 while not self.action_client.wait_for_server(timeout_sec=1.0):
99 self.
info_msginfo_msg(
"'NavigateToPose' action server not available, waiting...")
101 if os.getenv(
'GROOT_MONITORING') ==
'True':
103 self.
error_msgerror_msg(
'Behavior Tree must not be running already!')
104 self.
error_msgerror_msg(
'Are you running multiple goals/bts..?')
107 self.
goal_posegoal_pose = goal_pose
if goal_pose
is not None else self.
goal_posegoal_pose
108 goal_msg = NavigateToPose.Goal()
111 self.
info_msginfo_msg(
'Sending goal request...')
112 send_goal_future = self.action_client.send_goal_async(goal_msg)
114 rclpy.spin_until_future_complete(self, send_goal_future)
115 goal_handle = send_goal_future.result()
117 if not goal_handle
or not goal_handle.accepted:
121 self.
info_msginfo_msg(
'Goal accepted')
122 get_result_future = goal_handle.get_result_async()
125 if os.getenv(
'GROOT_MONITORING') ==
'True':
128 self.
error_msgerror_msg(
'Failed GROOT_BT - Reload Tree from ZMQ Server')
129 future_return =
False
131 self.
error_msgerror_msg(
'Failed GROOT_BT - Set Breakpoint from ZMQ Publisher')
132 future_return =
False
133 except Exception
as e:
134 self.
error_msgerror_msg(f
'Failed GROOT_BT - ZMQ Tests: {e}')
135 future_return =
False
137 self.
info_msginfo_msg(
"Waiting for 'NavigateToPose' action to complete")
138 rclpy.spin_until_future_complete(self, get_result_future)
139 status = get_result_future.result().status
140 if status != GoalStatus.STATUS_SUCCEEDED:
141 result = get_result_future.result().result
142 self.
info_msginfo_msg(f
'Goal failed with status code: {status}'
143 f
' error code:{result.error_code}'
144 f
' error msg:{result.error_msg}')
147 if not future_return:
150 self.
info_msginfo_msg(
'Goal succeeded!')
153 def grootMonitoringReloadTree(self) -> bool:
155 context = zmq.Context()
157 sock = context.socket(zmq.REQ)
160 sock.setsockopt(zmq.RCVTIMEO, 1000)
163 sock.connect(f
'tcp://localhost:{port}')
164 self.
info_msginfo_msg(f
'ZMQ Server Port:{port}')
169 self.
error_msgerror_msg(
'ZMQ Reload Tree Test 1/3 - This should have failed!')
173 except zmq.error.ZMQError:
174 self.
info_msginfo_msg(
'ZMQ Reload Tree Test 1/3: Check')
177 request_header = struct.pack(
'!BBI', 2, ord(
'T'), 12345)
178 sock.send(request_header)
180 sock.recv_multipart()
181 self.
info_msginfo_msg(
'ZMQ Reload Tree Test 2/3: Check')
182 except zmq.error.Again:
183 self.
info_msginfo_msg(
'ZMQ Reload Tree Test 2/3 - Failed to load tree')
190 self.
error_msgerror_msg(
'ZMQ Reload Tree Test 3/3 - This should have failed!')
194 except zmq.error.ZMQError:
195 self.
info_msginfo_msg(
'ZMQ Reload Tree Test 3/3: Check')
199 def grootMonitoringSetBreakpoint(self) -> bool:
201 context = zmq.Context()
203 sock = context.socket(zmq.REQ)
205 sock.setsockopt(zmq.RCVTIMEO, 2000)
209 sock.connect(f
'tcp://127.0.0.1:{port}')
210 self.
info_msginfo_msg(f
'ZMQ Publisher Port:{port}')
213 request_header = struct.pack(
'!BBI', 2, ord(
'I'), 12345)
220 'desired_status':
'SUCCESS',
223 hook_json = json.dumps(hook_data)
227 sock.send_multipart([request_header, hook_json.encode(
'utf-8')])
228 reply = sock.recv_multipart()
229 if len(reply[0]) < 2:
230 self.
error_msgerror_msg(
'ZMQ - Incomplete reply received')
233 except Exception
as e:
234 self.
error_msgerror_msg(f
'ZMQ - Error during request: {e}')
237 self.
info_msginfo_msg(
'ZMQ - HOOK_INSERT request sent')
240 def grootMonitoringGetStatus(self) -> bool:
242 context = zmq.Context()
244 sock = context.socket(zmq.REQ)
247 sock.setsockopt(zmq.RCVTIMEO, 1000)
250 sock.connect(f
'tcp://localhost:{port}')
251 self.
info_msginfo_msg(f
'ZMQ Server Port:{port}')
253 for request
in range(3):
256 request_header = struct.pack(
'!BBI', 2, ord(
'S'), 12345)
257 sock.send(request_header)
259 reply = sock.recv_multipart()
260 if len(reply[0]) < 6:
261 self.
error_msgerror_msg(
'ZMQ - Incomplete reply received')
268 while offset < len(payload):
269 node_uid, node_status = struct.unpack_from(
'!HB', payload, offset)
271 node_states.append((node_uid, node_status))
273 node_uid, node_status = node_states[0]
275 self.
error_msgerror_msg(
'ZMQ - BT Not running')
277 except zmq.error.Again:
278 self.
error_msgerror_msg(
'ZMQ - Did not receive any status')
281 self.
info_msginfo_msg(
'ZMQ - Did receive status')
284 def poseCallback(self, msg: PoseWithCovarianceStamped) ->
None:
285 self.
info_msginfo_msg(
'Received amcl_pose')
289 def reachesGoal(self, timeout: float, distance: float) -> bool:
291 start_time = time.time()
293 while not goalReached:
294 rclpy.spin_once(self, timeout_sec=1)
297 self.
info_msginfo_msg(
'*** GOAL REACHED ***')
299 elif timeout
is not None:
300 if (time.time() - start_time) > timeout:
301 self.
error_msgerror_msg(
'Robot timed out reaching its goal!')
304 self.
info_msginfo_msg(
'Robot reached its goal!')
307 def distanceFromGoal(self) -> float:
310 distance = math.sqrt(d_x * d_x + d_y * d_y)
311 self.
info_msginfo_msg(f
'Distance from goal is: {distance}')
314 def wait_for_node_active(self, node_name: str) ->
None:
316 self.
info_msginfo_msg(f
'Waiting for {node_name} to become active')
317 node_service = f
'{node_name}/get_state'
318 state_client: Client[GetState.Request, GetState.Response] = \
319 self.create_client(GetState, node_service)
320 while not state_client.wait_for_service(timeout_sec=1.0):
321 self.
info_msginfo_msg(f
'{node_service} service not available, waiting...')
322 req = GetState.Request()
324 while state !=
'active':
325 self.
info_msginfo_msg(f
'Getting {node_name} state...')
326 future = state_client.call_async(req)
327 rclpy.spin_until_future_complete(self, future)
328 if future.result()
is not None:
329 state = future.result().current_state.label
330 self.
info_msginfo_msg(f
'Result of get_state: {state}')
333 f
'Exception while calling service: {future.exception()!r}'
337 def shutdown(self) -> None:
338 self.
info_msginfo_msg(
'Shutting down')
339 self.action_client.destroy()
341 transition_service =
'lifecycle_manager_navigation/manage_nodes'
342 mgr_client: Client[ManageLifecycleNodes.Request, ManageLifecycleNodes.Response] = \
343 self.create_client(ManageLifecycleNodes, transition_service)
344 while not mgr_client.wait_for_service(timeout_sec=1.0):
345 self.
info_msginfo_msg(f
'{transition_service} service not available, waiting...')
347 req = ManageLifecycleNodes.Request()
348 req.command = ManageLifecycleNodes.Request().SHUTDOWN
349 future = mgr_client.call_async(req)
351 self.
info_msginfo_msg(
'Shutting down navigation lifecycle manager...')
352 rclpy.spin_until_future_complete(self, future)
354 self.
info_msginfo_msg(
'Shutting down navigation lifecycle manager complete.')
355 except Exception
as e:
356 self.
error_msgerror_msg(f
'Service call failed {e!r}')
357 transition_service =
'lifecycle_manager_localization/manage_nodes'
358 mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
359 while not mgr_client.wait_for_service(timeout_sec=1.0):
360 self.
info_msginfo_msg(f
'{transition_service} service not available, waiting...')
362 req = ManageLifecycleNodes.Request()
363 req.command = ManageLifecycleNodes.Request().SHUTDOWN
364 future = mgr_client.call_async(req)
366 self.
info_msginfo_msg(
'Shutting down localization lifecycle manager...')
367 rclpy.spin_until_future_complete(self, future)
369 self.
info_msginfo_msg(
'Shutting down localization lifecycle manager complete')
370 except Exception
as e:
371 self.
error_msgerror_msg(f
'Service call failed {e!r}')
373 def wait_for_initial_pose(self) -> bool:
378 start_time = time.time()
381 self.
info_msginfo_msg(
'Setting initial pose')
383 self.
info_msginfo_msg(
'Waiting for amcl_pose to be received')
384 duration = time.time() - start_time
386 self.
error_msgerror_msg(
'Timeout waiting for initial pose to be set')
388 rclpy.spin_once(self, timeout_sec=1)
392 def test_RobotMovesToGoal(robot_tester: NavTester) -> bool:
393 robot_tester.info_msg(
'Setting goal pose')
394 robot_tester.publishGoalPose()
395 robot_tester.info_msg(
'Waiting 60 seconds for robot to reach goal')
396 return robot_tester.reachesGoal(timeout=60, distance=0.5)
399 def run_all_tests(robot_tester: NavTester) -> bool:
403 robot_tester.wait_for_node_active(
'amcl')
404 result = robot_tester.wait_for_initial_pose()
406 robot_tester.wait_for_node_active(
'bt_navigator')
407 result = robot_tester.runNavigateAction()
410 result = test_RobotMovesToGoal(robot_tester)
415 robot_tester.info_msg(
'Test PASSED')
417 robot_tester.error_msg(
'Test FAILED')
422 def fwd_pose(x: float = 0.0, y: float = 0.0, z: float = 0.01) -> Pose:
423 initial_pose = Pose()
424 initial_pose.position.x = x
425 initial_pose.position.y = y
426 initial_pose.position.z = z
427 initial_pose.orientation.x = 0.0
428 initial_pose.orientation.y = 0.0
429 initial_pose.orientation.z = 0.0
430 initial_pose.orientation.w = 1.0
434 def get_testers(args: argparse.Namespace) -> list[NavTester]:
439 init_x, init_y, final_x, final_y = args.robot[0]
441 initial_pose=fwd_pose(float(init_x), float(init_y)),
442 goal_pose=fwd_pose(float(final_x), float(final_y)),
445 'Starting tester, robot going from '
455 testers.append(tester)
459 for robot
in args.robots:
460 namespace, init_x, init_y, final_x, final_y = robot
463 initial_pose=fwd_pose(float(init_x), float(init_y)),
464 goal_pose=fwd_pose(float(final_x), float(final_y)),
467 'Starting tester for '
478 testers.append(tester)
482 def check_args(expect_failure: str) -> Any:
484 if expect_failure !=
'True' and expect_failure !=
'False':
486 '\033[1;37;41m' +
' -e flag must be set to True or False only. ' +
'\033[0m'
490 return eval(expect_failure)
493 def main(argv: list[str] = sys.argv[1:]):
495 parser = argparse.ArgumentParser(description=
'System-level navigation tester node')
496 parser.add_argument(
'-e',
'--expect_failure')
497 group = parser.add_mutually_exclusive_group(required=
True)
503 metavar=(
'init_x',
'init_y',
'final_x',
'final_y'),
504 help=
'The robot starting and final positions.',
511 metavar=(
'name',
'init_x',
'init_y',
'final_x',
'final_y'),
512 help=
"The robot's namespace and starting and final positions."
513 +
'Repeating the argument for multiple robots is supported.',
516 args, unknown = parser.parse_known_args()
518 expect_failure = check_args(args.expect_failure)
523 testers = get_testers(args)
528 for tester
in testers:
529 passed = run_all_tests(tester)
530 if passed != expect_failure:
533 for tester
in testers:
537 testers[0].info_msg(
'Done Shutting Down.')
539 if passed != expect_failure:
540 testers[0].info_msg(
'Exiting failed')
543 testers[0].info_msg(
'Exiting passed')
547 if __name__ ==
'__main__':
None setInitialPose(self)
bool grootMonitoringSetBreakpoint(self)
float distanceFromGoal(self)
None info_msg(self, str msg)
bool grootMonitoringReloadTree(self)
PoseStamped getStampedPoseMsg(self, Pose pose)
None poseCallback(self, PoseWithCovarianceStamped msg)
None error_msg(self, str msg)
bool grootMonitoringGetStatus(self)