Nav2 Navigation Stack - kilted  kilted
ROS 2 Navigation Stack
nav_to_pose_tester_node.py
1 #! /usr/bin/env python3
2 # Copyright 2018 Intel Corporation.
3 # Copyright 2020 Florian Gramss
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 
17 import argparse
18 import json
19 import math
20 import os
21 import struct
22 import sys
23 import time
24 from typing import Any, Optional
25 
26 from action_msgs.msg import GoalStatus
27 from geometry_msgs.msg import Pose, PoseStamped, PoseWithCovarianceStamped
28 from lifecycle_msgs.srv import GetState
29 from nav2_msgs.action import NavigateToPose
30 from nav2_msgs.srv import ManageLifecycleNodes
31 import rclpy
32 from rclpy.action import ActionClient # type: ignore[attr-defined]
33 from rclpy.node import Node
34 from rclpy.qos import QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy
35 import zmq
36 
37 
38 class NavTester(Node):
39 
40  def __init__(self, initial_pose: Pose, goal_pose: Pose, namespace: str = ''):
41  super().__init__(node_name='nav2_tester', namespace=namespace)
42  self.initial_pose_pubinitial_pose_pub = self.create_publisher(
43  PoseWithCovarianceStamped, 'initialpose', 10
44  )
45  self.goal_pubgoal_pub = self.create_publisher(PoseStamped, 'goal_pose', 10)
46 
47  pose_qos = QoSProfile(
48  durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
49  reliability=QoSReliabilityPolicy.RELIABLE,
50  history=QoSHistoryPolicy.KEEP_LAST,
51  depth=1,
52  )
53 
54  self.model_pose_submodel_pose_sub = self.create_subscription(
55  PoseWithCovarianceStamped, 'amcl_pose', self.poseCallbackposeCallback, pose_qos
56  )
57  self.initial_pose_receivedinitial_pose_received = False
58  self.initial_poseinitial_pose = initial_pose
59  self.goal_posegoal_pose = goal_pose
60  self.set_initial_pose_timeoutset_initial_pose_timeout = 15
61  self.action_clientaction_client = ActionClient(self, NavigateToPose, 'navigate_to_pose')
62 
63  def info_msg(self, msg: str) -> None:
64  self.get_logger().info('\033[1;37;44m' + msg + '\033[0m')
65 
66  def warn_msg(self, msg: str) -> None:
67  self.get_logger().warn('\033[1;37;43m' + msg + '\033[0m')
68 
69  def error_msg(self, msg: str) -> None:
70  self.get_logger().error('\033[1;37;41m' + msg + '\033[0m')
71 
72  def setInitialPose(self) -> None:
73  msg = PoseWithCovarianceStamped()
74  msg.pose.pose = self.initial_poseinitial_pose
75  msg.header.frame_id = 'map'
76  self.info_msginfo_msg('Publishing Initial Pose')
77  self.initial_pose_pubinitial_pose_pub.publish(msg)
78  self.currentPosecurrentPose = self.initial_poseinitial_pose
79 
80  def getStampedPoseMsg(self, pose: Pose) -> PoseStamped:
81  msg = PoseStamped()
82  msg.header.frame_id = 'map'
83  msg.pose = pose
84  return msg
85 
86  def publishGoalPose(self, goal_pose: Optional[Pose] = None) -> None:
87  self.goal_posegoal_pose = goal_pose if goal_pose is not None else self.goal_posegoal_pose
88  self.goal_pubgoal_pub.publish(self.getStampedPoseMsggetStampedPoseMsg(self.goal_posegoal_pose))
89 
90  def runNavigateAction(self, goal_pose: Optional[Pose] = None) -> bool:
91  # Sends a `NavToPose` action request and waits for completion
92  self.info_msginfo_msg("Waiting for 'NavigateToPose' action server")
93  while not self.action_clientaction_client.wait_for_server(timeout_sec=1.0):
94  self.info_msginfo_msg("'NavigateToPose' action server not available, waiting...")
95 
96  if os.getenv('GROOT_MONITORING') == 'True':
97  if not self.grootMonitoringGetStatusgrootMonitoringGetStatus():
98  self.error_msgerror_msg('Behavior Tree must not be running already!')
99  self.error_msgerror_msg('Are you running multiple goals/bts..?')
100  return False
101 
102  self.goal_posegoal_pose = goal_pose if goal_pose is not None else self.goal_posegoal_pose
103  goal_msg = NavigateToPose.Goal()
104  goal_msg.pose = self.getStampedPoseMsggetStampedPoseMsg(self.goal_posegoal_pose)
105 
106  self.info_msginfo_msg('Sending goal request...')
107  send_goal_future = self.action_clientaction_client.send_goal_async(goal_msg)
108 
109  rclpy.spin_until_future_complete(self, send_goal_future)
110  goal_handle = send_goal_future.result()
111 
112  if not goal_handle or not goal_handle.accepted:
113  self.error_msgerror_msg('Goal rejected')
114  return False
115 
116  self.info_msginfo_msg('Goal accepted')
117  get_result_future = goal_handle.get_result_async()
118 
119  future_return = True
120  if os.getenv('GROOT_MONITORING') == 'True':
121  try:
122  if not self.grootMonitoringReloadTreegrootMonitoringReloadTree():
123  self.error_msgerror_msg('Failed GROOT_BT - Reload Tree from ZMQ Server')
124  future_return = False
125  if not self.grootMonitoringSetBreakpointgrootMonitoringSetBreakpoint():
126  self.error_msgerror_msg('Failed GROOT_BT - Set Breakpoint from ZMQ Publisher')
127  future_return = False
128  except Exception as e: # noqa: B902
129  self.error_msgerror_msg(f'Failed GROOT_BT - ZMQ Tests: {e}')
130  future_return = False
131 
132  self.info_msginfo_msg("Waiting for 'NavigateToPose' action to complete")
133  rclpy.spin_until_future_complete(self, get_result_future)
134  status = get_result_future.result().status # type: ignore[union-attr]
135  if status != GoalStatus.STATUS_SUCCEEDED:
136  result = get_result_future.result().result # type: ignore[union-attr]
137  self.info_msginfo_msg(f'Goal failed with status code: {status}'
138  f' error code:{result.error_code}'
139  f' error msg:{result.error_msg}')
140  return False
141 
142  if not future_return:
143  return False
144 
145  self.info_msginfo_msg('Goal succeeded!')
146  return True
147 
148  def grootMonitoringReloadTree(self) -> bool:
149  # ZeroMQ Context
150  context = zmq.Context()
151 
152  sock = context.socket(zmq.REQ)
153  port = 1667 # default server port for groot monitoring
154  # # Set a Timeout so we do not spin till infinity
155  sock.setsockopt(zmq.RCVTIMEO, 1000)
156  # sock.setsockopt(zmq.LINGER, 0)
157 
158  sock.connect(f'tcp://localhost:{port}')
159  self.info_msginfo_msg(f'ZMQ Server Port:{port}')
160 
161  # this should fail
162  try:
163  sock.recv()
164  self.error_msgerror_msg('ZMQ Reload Tree Test 1/3 - This should have failed!')
165  # Only works when ZMQ server receives a request first
166  sock.close()
167  return False
168  except zmq.error.ZMQError:
169  self.info_msginfo_msg('ZMQ Reload Tree Test 1/3: Check')
170  try:
171  # request tree from server
172  request_header = struct.pack('!BBI', 2, ord('T'), 12345)
173  sock.send(request_header)
174  # receive tree from server as flat_buffer
175  sock.recv_multipart()
176  self.info_msginfo_msg('ZMQ Reload Tree Test 2/3: Check')
177  except zmq.error.Again:
178  self.info_msginfo_msg('ZMQ Reload Tree Test 2/3 - Failed to load tree')
179  sock.close()
180  return False
181 
182  # this should fail
183  try:
184  sock.recv()
185  self.error_msgerror_msg('ZMQ Reload Tree Test 3/3 - This should have failed!')
186  # Tree should only be loadable ONCE after ZMQ server received a request
187  sock.close()
188  return False
189  except zmq.error.ZMQError:
190  self.info_msginfo_msg('ZMQ Reload Tree Test 3/3: Check')
191 
192  return True
193 
194  def grootMonitoringSetBreakpoint(self) -> bool:
195  # ZeroMQ Context
196  context = zmq.Context()
197  # Define the socket using the 'Context'
198  sock = context.socket(zmq.REQ)
199  # Set a Timeout so we do not spin till infinity
200  sock.setsockopt(zmq.RCVTIMEO, 2000)
201  # sock.setsockopt(zmq.LINGER, 0)
202 
203  port = 1667 # default publishing port for groot monitoring
204  sock.connect(f'tcp://127.0.0.1:{port}')
205  self.info_msginfo_msg(f'ZMQ Publisher Port:{port}')
206 
207  # Create header for the request
208  request_header = struct.pack('!BBI', 2, ord('I'), 12345) # HOOK_INSERT
209  # Create JSON for the hook
210  hook_data = {
211  'enabled': True,
212  'uid': 9, # Node ID
213  'mode': 0, # 0 = BREAKPOINT, 1 = REPLACE
214  'once': False,
215  'desired_status': 'SUCCESS', # Desired status
216  'position': 0, # 0 = PRE, 1 = POST
217  }
218  hook_json = json.dumps(hook_data)
219 
220  # Send the request
221  try:
222  sock.send_multipart([request_header, hook_json.encode('utf-8')])
223  reply = sock.recv_multipart()
224  if len(reply[0]) < 2:
225  self.error_msgerror_msg('ZMQ - Incomplete reply received')
226  sock.close()
227  return False
228  except Exception as e:
229  self.error_msgerror_msg(f'ZMQ - Error during request: {e}')
230  sock.close()
231  return False
232  self.info_msginfo_msg('ZMQ - HOOK_INSERT request sent')
233  return True
234 
235  def grootMonitoringGetStatus(self) -> bool:
236  # ZeroMQ Context
237  context = zmq.Context()
238 
239  sock = context.socket(zmq.REQ)
240  port = 1667 # default server port for groot monitoring
241  # # Set a Timeout so we do not spin till infinity
242  sock.setsockopt(zmq.RCVTIMEO, 1000)
243  # sock.setsockopt(zmq.LINGER, 0)
244 
245  sock.connect(f'tcp://localhost:{port}')
246  self.info_msginfo_msg(f'ZMQ Server Port:{port}')
247 
248  for request in range(3):
249  try:
250  # request tree from server
251  request_header = struct.pack('!BBI', 2, ord('S'), 12345)
252  sock.send(request_header)
253  # receive tree from server as flat_buffer
254  reply = sock.recv_multipart()
255  if len(reply[0]) < 6:
256  self.error_msgerror_msg('ZMQ - Incomplete reply received')
257  sock.close()
258  return False
259  # Decoding payload
260  payload = reply[1]
261  node_states = []
262  offset = 0
263  while offset < len(payload):
264  node_uid, node_status = struct.unpack_from('!HB', payload, offset)
265  offset += 3 # 2 bytes for UID, 1 byte for status
266  node_states.append((node_uid, node_status))
267  # Get the status of the first node
268  node_uid, node_status = node_states[0]
269  if node_status != 0:
270  self.error_msgerror_msg('ZMQ - BT Not running')
271  return False
272  except zmq.error.Again:
273  self.error_msgerror_msg('ZMQ - Did not receive any status')
274  sock.close()
275  return False
276  self.info_msginfo_msg('ZMQ - Did receive status')
277  return True
278 
279  def poseCallback(self, msg: PoseWithCovarianceStamped) -> None:
280  self.info_msginfo_msg('Received amcl_pose')
281  self.current_posecurrent_pose = msg.pose.pose
282  self.initial_pose_receivedinitial_pose_received = True
283 
284  def reachesGoal(self, timeout: float, distance: float) -> bool:
285  goalReached = False
286  start_time = time.time()
287 
288  while not goalReached:
289  rclpy.spin_once(self, timeout_sec=1)
290  if self.distanceFromGoaldistanceFromGoal() < distance:
291  goalReached = True
292  self.info_msginfo_msg('*** GOAL REACHED ***')
293  return True
294  elif timeout is not None:
295  if (time.time() - start_time) > timeout:
296  self.error_msgerror_msg('Robot timed out reaching its goal!')
297  return False
298 
299  self.info_msginfo_msg('Robot reached its goal!')
300  return True
301 
302  def distanceFromGoal(self) -> float:
303  d_x = self.current_posecurrent_pose.position.x - self.goal_posegoal_pose.position.x
304  d_y = self.current_posecurrent_pose.position.y - self.goal_posegoal_pose.position.y
305  distance = math.sqrt(d_x * d_x + d_y * d_y)
306  self.info_msginfo_msg(f'Distance from goal is: {distance}')
307  return distance
308 
309  def wait_for_node_active(self, node_name: str) -> None:
310  # Waits for the node within the tester namespace to become active
311  self.info_msginfo_msg(f'Waiting for {node_name} to become active')
312  node_service = f'{node_name}/get_state'
313  state_client = self.create_client(GetState, node_service)
314  while not state_client.wait_for_service(timeout_sec=1.0):
315  self.info_msginfo_msg(f'{node_service} service not available, waiting...')
316  req = GetState.Request() # empty request
317  state = 'UNKNOWN'
318  while state != 'active':
319  self.info_msginfo_msg(f'Getting {node_name} state...')
320  future = state_client.call_async(req)
321  rclpy.spin_until_future_complete(self, future)
322  if future.result() is not None:
323  state = future.result().current_state.label # type: ignore[union-attr]
324  self.info_msginfo_msg(f'Result of get_state: {state}')
325  else:
326  self.error_msgerror_msg(
327  f'Exception while calling service: {future.exception()!r}'
328  )
329  time.sleep(5)
330 
331  def shutdown(self) -> None:
332  self.info_msginfo_msg('Shutting down')
333  self.action_clientaction_client.destroy()
334 
335  transition_service = 'lifecycle_manager_navigation/manage_nodes'
336  mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
337  while not mgr_client.wait_for_service(timeout_sec=1.0):
338  self.info_msginfo_msg(f'{transition_service} service not available, waiting...')
339 
340  req = ManageLifecycleNodes.Request()
341  req.command = ManageLifecycleNodes.Request().SHUTDOWN
342  future = mgr_client.call_async(req)
343  try:
344  self.info_msginfo_msg('Shutting down navigation lifecycle manager...')
345  rclpy.spin_until_future_complete(self, future)
346  future.result()
347  self.info_msginfo_msg('Shutting down navigation lifecycle manager complete.')
348  except Exception as e: # noqa: B902
349  self.error_msgerror_msg(f'Service call failed {e!r}')
350  transition_service = 'lifecycle_manager_localization/manage_nodes'
351  mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
352  while not mgr_client.wait_for_service(timeout_sec=1.0):
353  self.info_msginfo_msg(f'{transition_service} service not available, waiting...')
354 
355  req = ManageLifecycleNodes.Request()
356  req.command = ManageLifecycleNodes.Request().SHUTDOWN
357  future = mgr_client.call_async(req)
358  try:
359  self.info_msginfo_msg('Shutting down localization lifecycle manager...')
360  rclpy.spin_until_future_complete(self, future)
361  future.result()
362  self.info_msginfo_msg('Shutting down localization lifecycle manager complete')
363  except Exception as e: # noqa: B902
364  self.error_msgerror_msg(f'Service call failed {e!r}')
365 
366  def wait_for_initial_pose(self) -> bool:
367  self.initial_pose_receivedinitial_pose_received = False
368  # If the initial pose is not received within 100 seconds, return False
369  # this is because when setting a wrong initial pose, amcl_pose is not received
370  # and the test will hang indefinitely
371  start_time = time.time()
372  duration = 0.0
373  while not self.initial_pose_receivedinitial_pose_received:
374  self.info_msginfo_msg('Setting initial pose')
375  self.setInitialPosesetInitialPose()
376  self.info_msginfo_msg('Waiting for amcl_pose to be received')
377  duration = time.time() - start_time
378  if duration > self.set_initial_pose_timeoutset_initial_pose_timeout:
379  self.error_msgerror_msg('Timeout waiting for initial pose to be set')
380  return False
381  rclpy.spin_once(self, timeout_sec=1)
382  return True
383 
384 
385 def test_RobotMovesToGoal(robot_tester: NavTester) -> bool:
386  robot_tester.info_msg('Setting goal pose')
387  robot_tester.publishGoalPose()
388  robot_tester.info_msg('Waiting 60 seconds for robot to reach goal')
389  return robot_tester.reachesGoal(timeout=60, distance=0.5)
390 
391 
392 def run_all_tests(robot_tester: NavTester) -> bool:
393  # set transforms to use_sim_time
394  result = True
395  if result:
396  robot_tester.wait_for_node_active('amcl')
397  result = robot_tester.wait_for_initial_pose()
398  if result:
399  robot_tester.wait_for_node_active('bt_navigator')
400  result = robot_tester.runNavigateAction()
401 
402  if result:
403  result = test_RobotMovesToGoal(robot_tester)
404 
405  # Add more tests here if desired
406 
407  if result:
408  robot_tester.info_msg('Test PASSED')
409  else:
410  robot_tester.error_msg('Test FAILED')
411 
412  return result
413 
414 
415 def fwd_pose(x: float = 0.0, y: float = 0.0, z: float = 0.01) -> Pose:
416  initial_pose = Pose()
417  initial_pose.position.x = x
418  initial_pose.position.y = y
419  initial_pose.position.z = z
420  initial_pose.orientation.x = 0.0
421  initial_pose.orientation.y = 0.0
422  initial_pose.orientation.z = 0.0
423  initial_pose.orientation.w = 1.0
424  return initial_pose
425 
426 
427 def get_testers(args: argparse.Namespace) -> list[NavTester]:
428  testers = []
429 
430  if args.robot:
431  # Requested tester for one robot
432  init_x, init_y, final_x, final_y = args.robot[0]
433  tester = NavTester(
434  initial_pose=fwd_pose(float(init_x), float(init_y)),
435  goal_pose=fwd_pose(float(final_x), float(final_y)),
436  )
437  tester.info_msg(
438  'Starting tester, robot going from '
439  + init_x
440  + ', '
441  + init_y
442  + ' to '
443  + final_x
444  + ', '
445  + final_y
446  + '.'
447  )
448  testers.append(tester)
449  return testers
450 
451  # Requested tester for multiple robots
452  for robot in args.robots:
453  namespace, init_x, init_y, final_x, final_y = robot
454  tester = NavTester(
455  namespace=namespace,
456  initial_pose=fwd_pose(float(init_x), float(init_y)),
457  goal_pose=fwd_pose(float(final_x), float(final_y)),
458  )
459  tester.info_msg(
460  'Starting tester for '
461  + namespace
462  + ' going from '
463  + init_x
464  + ', '
465  + init_y
466  + ' to '
467  + final_x
468  + ', '
469  + final_y
470  )
471  testers.append(tester)
472  return testers
473 
474 
475 def check_args(expect_failure: str) -> Any:
476  # Check if --expect_failure is True or False
477  if expect_failure != 'True' and expect_failure != 'False':
478  print(
479  '\033[1;37;41m' + ' -e flag must be set to True or False only. ' + '\033[0m'
480  )
481  exit(1)
482  else:
483  return eval(expect_failure)
484 
485 
486 def main(argv: list[str] = sys.argv[1:]): # type: ignore[no-untyped-def]
487  # The robot(s) positions from the input arguments
488  parser = argparse.ArgumentParser(description='System-level navigation tester node')
489  parser.add_argument('-e', '--expect_failure')
490  group = parser.add_mutually_exclusive_group(required=True)
491  group.add_argument(
492  '-r',
493  '--robot',
494  action='append',
495  nargs=4,
496  metavar=('init_x', 'init_y', 'final_x', 'final_y'),
497  help='The robot starting and final positions.',
498  )
499  group.add_argument(
500  '-rs',
501  '--robots',
502  action='append',
503  nargs=5,
504  metavar=('name', 'init_x', 'init_y', 'final_x', 'final_y'),
505  help="The robot's namespace and starting and final positions."
506  + 'Repeating the argument for multiple robots is supported.',
507  )
508 
509  args, unknown = parser.parse_known_args()
510 
511  expect_failure = check_args(args.expect_failure)
512 
513  rclpy.init()
514 
515  # Create testers for each robot
516  testers = get_testers(args)
517 
518  # wait a few seconds to make sure entire stacks are up
519  time.sleep(10)
520 
521  for tester in testers:
522  passed = run_all_tests(tester)
523  if passed != expect_failure:
524  break
525 
526  for tester in testers:
527  # stop and shutdown the nav stack to exit cleanly
528  tester.shutdown()
529 
530  testers[0].info_msg('Done Shutting Down.')
531 
532  if passed != expect_failure:
533  testers[0].info_msg('Exiting failed')
534  exit(1)
535  else:
536  testers[0].info_msg('Exiting passed')
537  exit(0)
538 
539 
540 if __name__ == '__main__':
541  main()
PoseStamped getStampedPoseMsg(self, Pose pose)
None poseCallback(self, PoseWithCovarianceStamped msg)