24 from typing
import Any, Optional
26 from action_msgs.msg
import GoalStatus
28 from lifecycle_msgs.srv
import GetState
29 from nav2_msgs.action
import NavigateToPose
30 from nav2_msgs.srv
import ManageLifecycleNodes
32 from rclpy.action
import ActionClient
33 from rclpy.node
import Node
34 from rclpy.qos
import QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy
40 def __init__(self, initial_pose: Pose, goal_pose: Pose, namespace: str =
''):
41 super().__init__(node_name=
'nav2_tester', namespace=namespace)
43 PoseWithCovarianceStamped,
'initialpose', 10
45 self.
goal_pubgoal_pub = self.create_publisher(PoseStamped,
'goal_pose', 10)
47 pose_qos = QoSProfile(
48 durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
49 reliability=QoSReliabilityPolicy.RELIABLE,
50 history=QoSHistoryPolicy.KEEP_LAST,
55 PoseWithCovarianceStamped,
'amcl_pose', self.
poseCallbackposeCallback, pose_qos
61 self.
action_clientaction_client = ActionClient(self, NavigateToPose,
'navigate_to_pose')
63 def info_msg(self, msg: str) ->
None:
64 self.get_logger().info(
'\033[1;37;44m' + msg +
'\033[0m')
66 def warn_msg(self, msg: str) ->
None:
67 self.get_logger().warn(
'\033[1;37;43m' + msg +
'\033[0m')
69 def error_msg(self, msg: str) ->
None:
70 self.get_logger().error(
'\033[1;37;41m' + msg +
'\033[0m')
72 def setInitialPose(self) -> None:
73 msg = PoseWithCovarianceStamped()
75 msg.header.frame_id =
'map'
76 self.
info_msginfo_msg(
'Publishing Initial Pose')
80 def getStampedPoseMsg(self, pose: Pose) -> PoseStamped:
82 msg.header.frame_id =
'map'
86 def publishGoalPose(self, goal_pose: Optional[Pose] =
None) ->
None:
87 self.
goal_posegoal_pose = goal_pose
if goal_pose
is not None else self.
goal_posegoal_pose
90 def runNavigateAction(self, goal_pose: Optional[Pose] =
None) -> bool:
92 self.
info_msginfo_msg(
"Waiting for 'NavigateToPose' action server")
93 while not self.
action_clientaction_client.wait_for_server(timeout_sec=1.0):
94 self.
info_msginfo_msg(
"'NavigateToPose' action server not available, waiting...")
96 if os.getenv(
'GROOT_MONITORING') ==
'True':
98 self.
error_msgerror_msg(
'Behavior Tree must not be running already!')
99 self.
error_msgerror_msg(
'Are you running multiple goals/bts..?')
102 self.
goal_posegoal_pose = goal_pose
if goal_pose
is not None else self.
goal_posegoal_pose
103 goal_msg = NavigateToPose.Goal()
106 self.
info_msginfo_msg(
'Sending goal request...')
107 send_goal_future = self.
action_clientaction_client.send_goal_async(goal_msg)
109 rclpy.spin_until_future_complete(self, send_goal_future)
110 goal_handle = send_goal_future.result()
112 if not goal_handle
or not goal_handle.accepted:
116 self.
info_msginfo_msg(
'Goal accepted')
117 get_result_future = goal_handle.get_result_async()
120 if os.getenv(
'GROOT_MONITORING') ==
'True':
123 self.
error_msgerror_msg(
'Failed GROOT_BT - Reload Tree from ZMQ Server')
124 future_return =
False
126 self.
error_msgerror_msg(
'Failed GROOT_BT - Set Breakpoint from ZMQ Publisher')
127 future_return =
False
128 except Exception
as e:
129 self.
error_msgerror_msg(f
'Failed GROOT_BT - ZMQ Tests: {e}')
130 future_return =
False
132 self.
info_msginfo_msg(
"Waiting for 'NavigateToPose' action to complete")
133 rclpy.spin_until_future_complete(self, get_result_future)
134 status = get_result_future.result().status
135 if status != GoalStatus.STATUS_SUCCEEDED:
136 result = get_result_future.result().result
137 self.
info_msginfo_msg(f
'Goal failed with status code: {status}'
138 f
' error code:{result.error_code}'
139 f
' error msg:{result.error_msg}')
142 if not future_return:
145 self.
info_msginfo_msg(
'Goal succeeded!')
148 def grootMonitoringReloadTree(self) -> bool:
150 context = zmq.Context()
152 sock = context.socket(zmq.REQ)
155 sock.setsockopt(zmq.RCVTIMEO, 1000)
158 sock.connect(f
'tcp://localhost:{port}')
159 self.
info_msginfo_msg(f
'ZMQ Server Port:{port}')
164 self.
error_msgerror_msg(
'ZMQ Reload Tree Test 1/3 - This should have failed!')
168 except zmq.error.ZMQError:
169 self.
info_msginfo_msg(
'ZMQ Reload Tree Test 1/3: Check')
172 request_header = struct.pack(
'!BBI', 2, ord(
'T'), 12345)
173 sock.send(request_header)
175 sock.recv_multipart()
176 self.
info_msginfo_msg(
'ZMQ Reload Tree Test 2/3: Check')
177 except zmq.error.Again:
178 self.
info_msginfo_msg(
'ZMQ Reload Tree Test 2/3 - Failed to load tree')
185 self.
error_msgerror_msg(
'ZMQ Reload Tree Test 3/3 - This should have failed!')
189 except zmq.error.ZMQError:
190 self.
info_msginfo_msg(
'ZMQ Reload Tree Test 3/3: Check')
194 def grootMonitoringSetBreakpoint(self) -> bool:
196 context = zmq.Context()
198 sock = context.socket(zmq.REQ)
200 sock.setsockopt(zmq.RCVTIMEO, 2000)
204 sock.connect(f
'tcp://127.0.0.1:{port}')
205 self.
info_msginfo_msg(f
'ZMQ Publisher Port:{port}')
208 request_header = struct.pack(
'!BBI', 2, ord(
'I'), 12345)
215 'desired_status':
'SUCCESS',
218 hook_json = json.dumps(hook_data)
222 sock.send_multipart([request_header, hook_json.encode(
'utf-8')])
223 reply = sock.recv_multipart()
224 if len(reply[0]) < 2:
225 self.
error_msgerror_msg(
'ZMQ - Incomplete reply received')
228 except Exception
as e:
229 self.
error_msgerror_msg(f
'ZMQ - Error during request: {e}')
232 self.
info_msginfo_msg(
'ZMQ - HOOK_INSERT request sent')
235 def grootMonitoringGetStatus(self) -> bool:
237 context = zmq.Context()
239 sock = context.socket(zmq.REQ)
242 sock.setsockopt(zmq.RCVTIMEO, 1000)
245 sock.connect(f
'tcp://localhost:{port}')
246 self.
info_msginfo_msg(f
'ZMQ Server Port:{port}')
248 for request
in range(3):
251 request_header = struct.pack(
'!BBI', 2, ord(
'S'), 12345)
252 sock.send(request_header)
254 reply = sock.recv_multipart()
255 if len(reply[0]) < 6:
256 self.
error_msgerror_msg(
'ZMQ - Incomplete reply received')
263 while offset < len(payload):
264 node_uid, node_status = struct.unpack_from(
'!HB', payload, offset)
266 node_states.append((node_uid, node_status))
268 node_uid, node_status = node_states[0]
270 self.
error_msgerror_msg(
'ZMQ - BT Not running')
272 except zmq.error.Again:
273 self.
error_msgerror_msg(
'ZMQ - Did not receive any status')
276 self.
info_msginfo_msg(
'ZMQ - Did receive status')
279 def poseCallback(self, msg: PoseWithCovarianceStamped) ->
None:
280 self.
info_msginfo_msg(
'Received amcl_pose')
284 def reachesGoal(self, timeout: float, distance: float) -> bool:
286 start_time = time.time()
288 while not goalReached:
289 rclpy.spin_once(self, timeout_sec=1)
292 self.
info_msginfo_msg(
'*** GOAL REACHED ***')
294 elif timeout
is not None:
295 if (time.time() - start_time) > timeout:
296 self.
error_msgerror_msg(
'Robot timed out reaching its goal!')
299 self.
info_msginfo_msg(
'Robot reached its goal!')
302 def distanceFromGoal(self) -> float:
305 distance = math.sqrt(d_x * d_x + d_y * d_y)
306 self.
info_msginfo_msg(f
'Distance from goal is: {distance}')
309 def wait_for_node_active(self, node_name: str) ->
None:
311 self.
info_msginfo_msg(f
'Waiting for {node_name} to become active')
312 node_service = f
'{node_name}/get_state'
313 state_client = self.create_client(GetState, node_service)
314 while not state_client.wait_for_service(timeout_sec=1.0):
315 self.
info_msginfo_msg(f
'{node_service} service not available, waiting...')
316 req = GetState.Request()
318 while state !=
'active':
319 self.
info_msginfo_msg(f
'Getting {node_name} state...')
320 future = state_client.call_async(req)
321 rclpy.spin_until_future_complete(self, future)
322 if future.result()
is not None:
323 state = future.result().current_state.label
324 self.
info_msginfo_msg(f
'Result of get_state: {state}')
327 f
'Exception while calling service: {future.exception()!r}'
331 def shutdown(self) -> None:
332 self.
info_msginfo_msg(
'Shutting down')
335 transition_service =
'lifecycle_manager_navigation/manage_nodes'
336 mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
337 while not mgr_client.wait_for_service(timeout_sec=1.0):
338 self.
info_msginfo_msg(f
'{transition_service} service not available, waiting...')
340 req = ManageLifecycleNodes.Request()
341 req.command = ManageLifecycleNodes.Request().SHUTDOWN
342 future = mgr_client.call_async(req)
344 self.
info_msginfo_msg(
'Shutting down navigation lifecycle manager...')
345 rclpy.spin_until_future_complete(self, future)
347 self.
info_msginfo_msg(
'Shutting down navigation lifecycle manager complete.')
348 except Exception
as e:
349 self.
error_msgerror_msg(f
'Service call failed {e!r}')
350 transition_service =
'lifecycle_manager_localization/manage_nodes'
351 mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
352 while not mgr_client.wait_for_service(timeout_sec=1.0):
353 self.
info_msginfo_msg(f
'{transition_service} service not available, waiting...')
355 req = ManageLifecycleNodes.Request()
356 req.command = ManageLifecycleNodes.Request().SHUTDOWN
357 future = mgr_client.call_async(req)
359 self.
info_msginfo_msg(
'Shutting down localization lifecycle manager...')
360 rclpy.spin_until_future_complete(self, future)
362 self.
info_msginfo_msg(
'Shutting down localization lifecycle manager complete')
363 except Exception
as e:
364 self.
error_msgerror_msg(f
'Service call failed {e!r}')
366 def wait_for_initial_pose(self) -> bool:
371 start_time = time.time()
374 self.
info_msginfo_msg(
'Setting initial pose')
376 self.
info_msginfo_msg(
'Waiting for amcl_pose to be received')
377 duration = time.time() - start_time
379 self.
error_msgerror_msg(
'Timeout waiting for initial pose to be set')
381 rclpy.spin_once(self, timeout_sec=1)
385 def test_RobotMovesToGoal(robot_tester: NavTester) -> bool:
386 robot_tester.info_msg(
'Setting goal pose')
387 robot_tester.publishGoalPose()
388 robot_tester.info_msg(
'Waiting 60 seconds for robot to reach goal')
389 return robot_tester.reachesGoal(timeout=60, distance=0.5)
392 def run_all_tests(robot_tester: NavTester) -> bool:
396 robot_tester.wait_for_node_active(
'amcl')
397 result = robot_tester.wait_for_initial_pose()
399 robot_tester.wait_for_node_active(
'bt_navigator')
400 result = robot_tester.runNavigateAction()
403 result = test_RobotMovesToGoal(robot_tester)
408 robot_tester.info_msg(
'Test PASSED')
410 robot_tester.error_msg(
'Test FAILED')
415 def fwd_pose(x: float = 0.0, y: float = 0.0, z: float = 0.01) -> Pose:
416 initial_pose = Pose()
417 initial_pose.position.x = x
418 initial_pose.position.y = y
419 initial_pose.position.z = z
420 initial_pose.orientation.x = 0.0
421 initial_pose.orientation.y = 0.0
422 initial_pose.orientation.z = 0.0
423 initial_pose.orientation.w = 1.0
427 def get_testers(args: argparse.Namespace) -> list[NavTester]:
432 init_x, init_y, final_x, final_y = args.robot[0]
434 initial_pose=fwd_pose(float(init_x), float(init_y)),
435 goal_pose=fwd_pose(float(final_x), float(final_y)),
438 'Starting tester, robot going from '
448 testers.append(tester)
452 for robot
in args.robots:
453 namespace, init_x, init_y, final_x, final_y = robot
456 initial_pose=fwd_pose(float(init_x), float(init_y)),
457 goal_pose=fwd_pose(float(final_x), float(final_y)),
460 'Starting tester for '
471 testers.append(tester)
475 def check_args(expect_failure: str) -> Any:
477 if expect_failure !=
'True' and expect_failure !=
'False':
479 '\033[1;37;41m' +
' -e flag must be set to True or False only. ' +
'\033[0m'
483 return eval(expect_failure)
486 def main(argv: list[str] = sys.argv[1:]):
488 parser = argparse.ArgumentParser(description=
'System-level navigation tester node')
489 parser.add_argument(
'-e',
'--expect_failure')
490 group = parser.add_mutually_exclusive_group(required=
True)
496 metavar=(
'init_x',
'init_y',
'final_x',
'final_y'),
497 help=
'The robot starting and final positions.',
504 metavar=(
'name',
'init_x',
'init_y',
'final_x',
'final_y'),
505 help=
"The robot's namespace and starting and final positions."
506 +
'Repeating the argument for multiple robots is supported.',
509 args, unknown = parser.parse_known_args()
511 expect_failure = check_args(args.expect_failure)
516 testers = get_testers(args)
521 for tester
in testers:
522 passed = run_all_tests(tester)
523 if passed != expect_failure:
526 for tester
in testers:
530 testers[0].info_msg(
'Done Shutting Down.')
532 if passed != expect_failure:
533 testers[0].info_msg(
'Exiting failed')
536 testers[0].info_msg(
'Exiting passed')
540 if __name__ ==
'__main__':
None setInitialPose(self)
bool grootMonitoringSetBreakpoint(self)
float distanceFromGoal(self)
None info_msg(self, str msg)
bool grootMonitoringReloadTree(self)
PoseStamped getStampedPoseMsg(self, Pose pose)
None poseCallback(self, PoseWithCovarianceStamped msg)
None error_msg(self, str msg)
bool grootMonitoringGetStatus(self)