Nav2 Navigation Stack - rolling  main
ROS 2 Navigation Stack
nav_to_pose_tester_node.py
1 #! /usr/bin/env python3
2 # Copyright 2018 Intel Corporation.
3 # Copyright 2020 Florian Gramss
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 
17 import argparse
18 import json
19 import math
20 import os
21 import struct
22 import sys
23 import time
24 from typing import Any, Optional
25 
26 from action_msgs.msg import GoalStatus
27 from geometry_msgs.msg import Pose, PoseStamped, PoseWithCovarianceStamped
28 from lifecycle_msgs.srv import GetState
29 from nav2_msgs.action import NavigateToPose
30 from nav2_msgs.srv import ManageLifecycleNodes
31 import rclpy
32 from rclpy.action import ActionClient
33 from rclpy.client import Client
34 from rclpy.node import Node
35 from rclpy.qos import QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy
36 import zmq
37 
38 
39 class NavTester(Node):
40 
41  def __init__(self, initial_pose: Pose, goal_pose: Pose, namespace: str = ''):
42  super().__init__(node_name='nav2_tester', namespace=namespace)
43  self.initial_pose_pubinitial_pose_pub = self.create_publisher(
44  PoseWithCovarianceStamped, 'initialpose', 10
45  )
46  self.goal_pubgoal_pub = self.create_publisher(PoseStamped, 'goal_pose', 10)
47 
48  pose_qos = QoSProfile(
49  durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
50  reliability=QoSReliabilityPolicy.RELIABLE,
51  history=QoSHistoryPolicy.KEEP_LAST,
52  depth=1,
53  )
54 
55  self.model_pose_submodel_pose_sub = self.create_subscription(
56  PoseWithCovarianceStamped, 'amcl_pose', self.poseCallbackposeCallback, pose_qos
57  )
58  self.initial_pose_receivedinitial_pose_received = False
59  self.initial_poseinitial_pose = initial_pose
60  self.goal_posegoal_pose = goal_pose
61  self.set_initial_pose_timeoutset_initial_pose_timeout = 15
62  self.action_client: ActionClient[
63  NavigateToPose.Goal,
64  NavigateToPose.Result,
65  NavigateToPose.Feedback
66  ] = ActionClient(self, NavigateToPose, 'navigate_to_pose')
67 
68  def info_msg(self, msg: str) -> None:
69  self.get_logger().info('\033[1;37;44m' + msg + '\033[0m')
70 
71  def warn_msg(self, msg: str) -> None:
72  self.get_logger().warning('\033[1;37;43m' + msg + '\033[0m')
73 
74  def error_msg(self, msg: str) -> None:
75  self.get_logger().error('\033[1;37;41m' + msg + '\033[0m')
76 
77  def setInitialPose(self) -> None:
78  msg = PoseWithCovarianceStamped()
79  msg.pose.pose = self.initial_poseinitial_pose
80  msg.header.frame_id = 'map'
81  self.info_msginfo_msg('Publishing Initial Pose')
82  self.initial_pose_pubinitial_pose_pub.publish(msg)
83  self.currentPosecurrentPose = self.initial_poseinitial_pose
84 
85  def getStampedPoseMsg(self, pose: Pose) -> PoseStamped:
86  msg = PoseStamped()
87  msg.header.frame_id = 'map'
88  msg.pose = pose
89  return msg
90 
91  def publishGoalPose(self, goal_pose: Optional[Pose] = None) -> None:
92  self.goal_posegoal_pose = goal_pose if goal_pose is not None else self.goal_posegoal_pose
93  self.goal_pubgoal_pub.publish(self.getStampedPoseMsggetStampedPoseMsg(self.goal_posegoal_pose))
94 
95  def runNavigateAction(self, goal_pose: Optional[Pose] = None) -> bool:
96  # Sends a `NavToPose` action request and waits for completion
97  self.info_msginfo_msg("Waiting for 'NavigateToPose' action server")
98  while not self.action_client.wait_for_server(timeout_sec=1.0):
99  self.info_msginfo_msg("'NavigateToPose' action server not available, waiting...")
100 
101  if os.getenv('GROOT_MONITORING') == 'True':
102  if not self.grootMonitoringGetStatusgrootMonitoringGetStatus():
103  self.error_msgerror_msg('Behavior Tree must not be running already!')
104  self.error_msgerror_msg('Are you running multiple goals/bts..?')
105  return False
106 
107  self.goal_posegoal_pose = goal_pose if goal_pose is not None else self.goal_posegoal_pose
108  goal_msg = NavigateToPose.Goal()
109  goal_msg.pose = self.getStampedPoseMsggetStampedPoseMsg(self.goal_posegoal_pose)
110 
111  self.info_msginfo_msg('Sending goal request...')
112  send_goal_future = self.action_client.send_goal_async(goal_msg)
113 
114  rclpy.spin_until_future_complete(self, send_goal_future)
115  goal_handle = send_goal_future.result()
116 
117  if not goal_handle or not goal_handle.accepted:
118  self.error_msgerror_msg('Goal rejected')
119  return False
120 
121  self.info_msginfo_msg('Goal accepted')
122  get_result_future = goal_handle.get_result_async()
123 
124  future_return = True
125  if os.getenv('GROOT_MONITORING') == 'True':
126  try:
127  if not self.grootMonitoringReloadTreegrootMonitoringReloadTree():
128  self.error_msgerror_msg('Failed GROOT_BT - Reload Tree from ZMQ Server')
129  future_return = False
130  if not self.grootMonitoringSetBreakpointgrootMonitoringSetBreakpoint():
131  self.error_msgerror_msg('Failed GROOT_BT - Set Breakpoint from ZMQ Publisher')
132  future_return = False
133  except Exception as e: # noqa: B902
134  self.error_msgerror_msg(f'Failed GROOT_BT - ZMQ Tests: {e}')
135  future_return = False
136 
137  self.info_msginfo_msg("Waiting for 'NavigateToPose' action to complete")
138  rclpy.spin_until_future_complete(self, get_result_future)
139  status = get_result_future.result().status # type: ignore[union-attr]
140  if status != GoalStatus.STATUS_SUCCEEDED:
141  result = get_result_future.result().result # type: ignore[union-attr]
142  self.info_msginfo_msg(f'Goal failed with status code: {status}'
143  f' error code:{result.error_code}'
144  f' error msg:{result.error_msg}')
145  return False
146 
147  if not future_return:
148  return False
149 
150  self.info_msginfo_msg('Goal succeeded!')
151  return True
152 
153  def grootMonitoringReloadTree(self) -> bool:
154  # ZeroMQ Context
155  context = zmq.Context()
156 
157  sock = context.socket(zmq.REQ)
158  port = 1667 # default server port for groot monitoring
159  # # Set a Timeout so we do not spin till infinity
160  sock.setsockopt(zmq.RCVTIMEO, 1000)
161  # sock.setsockopt(zmq.LINGER, 0)
162 
163  sock.connect(f'tcp://localhost:{port}')
164  self.info_msginfo_msg(f'ZMQ Server Port:{port}')
165 
166  # this should fail
167  try:
168  sock.recv()
169  self.error_msgerror_msg('ZMQ Reload Tree Test 1/3 - This should have failed!')
170  # Only works when ZMQ server receives a request first
171  sock.close()
172  return False
173  except zmq.error.ZMQError:
174  self.info_msginfo_msg('ZMQ Reload Tree Test 1/3: Check')
175  try:
176  # request tree from server
177  request_header = struct.pack('!BBI', 2, ord('T'), 12345)
178  sock.send(request_header)
179  # receive tree from server as flat_buffer
180  sock.recv_multipart()
181  self.info_msginfo_msg('ZMQ Reload Tree Test 2/3: Check')
182  except zmq.error.Again:
183  self.info_msginfo_msg('ZMQ Reload Tree Test 2/3 - Failed to load tree')
184  sock.close()
185  return False
186 
187  # this should fail
188  try:
189  sock.recv()
190  self.error_msgerror_msg('ZMQ Reload Tree Test 3/3 - This should have failed!')
191  # Tree should only be loadable ONCE after ZMQ server received a request
192  sock.close()
193  return False
194  except zmq.error.ZMQError:
195  self.info_msginfo_msg('ZMQ Reload Tree Test 3/3: Check')
196 
197  return True
198 
199  def grootMonitoringSetBreakpoint(self) -> bool:
200  # ZeroMQ Context
201  context = zmq.Context()
202  # Define the socket using the 'Context'
203  sock = context.socket(zmq.REQ)
204  # Set a Timeout so we do not spin till infinity
205  sock.setsockopt(zmq.RCVTIMEO, 2000)
206  # sock.setsockopt(zmq.LINGER, 0)
207 
208  port = 1667 # default publishing port for groot monitoring
209  sock.connect(f'tcp://127.0.0.1:{port}')
210  self.info_msginfo_msg(f'ZMQ Publisher Port:{port}')
211 
212  # Create header for the request
213  request_header = struct.pack('!BBI', 2, ord('I'), 12345) # HOOK_INSERT
214  # Create JSON for the hook
215  hook_data = {
216  'enabled': True,
217  'uid': 9, # Node ID
218  'mode': 0, # 0 = BREAKPOINT, 1 = REPLACE
219  'once': False,
220  'desired_status': 'SUCCESS', # Desired status
221  'position': 0, # 0 = PRE, 1 = POST
222  }
223  hook_json = json.dumps(hook_data)
224 
225  # Send the request
226  try:
227  sock.send_multipart([request_header, hook_json.encode('utf-8')])
228  reply = sock.recv_multipart()
229  if len(reply[0]) < 2:
230  self.error_msgerror_msg('ZMQ - Incomplete reply received')
231  sock.close()
232  return False
233  except Exception as e:
234  self.error_msgerror_msg(f'ZMQ - Error during request: {e}')
235  sock.close()
236  return False
237  self.info_msginfo_msg('ZMQ - HOOK_INSERT request sent')
238  return True
239 
240  def grootMonitoringGetStatus(self) -> bool:
241  # ZeroMQ Context
242  context = zmq.Context()
243 
244  sock = context.socket(zmq.REQ)
245  port = 1667 # default server port for groot monitoring
246  # # Set a Timeout so we do not spin till infinity
247  sock.setsockopt(zmq.RCVTIMEO, 1000)
248  # sock.setsockopt(zmq.LINGER, 0)
249 
250  sock.connect(f'tcp://localhost:{port}')
251  self.info_msginfo_msg(f'ZMQ Server Port:{port}')
252 
253  for request in range(3):
254  try:
255  # request tree from server
256  request_header = struct.pack('!BBI', 2, ord('S'), 12345)
257  sock.send(request_header)
258  # receive tree from server as flat_buffer
259  reply = sock.recv_multipart()
260  if len(reply[0]) < 6:
261  self.error_msgerror_msg('ZMQ - Incomplete reply received')
262  sock.close()
263  return False
264  # Decoding payload
265  payload = reply[1]
266  node_states = []
267  offset = 0
268  while offset < len(payload):
269  node_uid, node_status = struct.unpack_from('!HB', payload, offset)
270  offset += 3 # 2 bytes for UID, 1 byte for status
271  node_states.append((node_uid, node_status))
272  # Get the status of the first node
273  node_uid, node_status = node_states[0]
274  if node_status != 0:
275  self.error_msgerror_msg('ZMQ - BT Not running')
276  return False
277  except zmq.error.Again:
278  self.error_msgerror_msg('ZMQ - Did not receive any status')
279  sock.close()
280  return False
281  self.info_msginfo_msg('ZMQ - Did receive status')
282  return True
283 
284  def poseCallback(self, msg: PoseWithCovarianceStamped) -> None:
285  self.info_msginfo_msg('Received amcl_pose')
286  self.current_posecurrent_pose = msg.pose.pose
287  self.initial_pose_receivedinitial_pose_received = True
288 
289  def reachesGoal(self, timeout: float, distance: float) -> bool:
290  goalReached = False
291  start_time = time.time()
292 
293  while not goalReached:
294  rclpy.spin_once(self, timeout_sec=1)
295  if self.distanceFromGoaldistanceFromGoal() < distance:
296  goalReached = True
297  self.info_msginfo_msg('*** GOAL REACHED ***')
298  return True
299  elif timeout is not None:
300  if (time.time() - start_time) > timeout:
301  self.error_msgerror_msg('Robot timed out reaching its goal!')
302  return False
303 
304  self.info_msginfo_msg('Robot reached its goal!')
305  return True
306 
307  def distanceFromGoal(self) -> float:
308  d_x = self.current_posecurrent_pose.position.x - self.goal_posegoal_pose.position.x
309  d_y = self.current_posecurrent_pose.position.y - self.goal_posegoal_pose.position.y
310  distance = math.sqrt(d_x * d_x + d_y * d_y)
311  self.info_msginfo_msg(f'Distance from goal is: {distance}')
312  return distance
313 
314  def wait_for_node_active(self, node_name: str) -> None:
315  # Waits for the node within the tester namespace to become active
316  self.info_msginfo_msg(f'Waiting for {node_name} to become active')
317  node_service = f'{node_name}/get_state'
318  state_client: Client[GetState.Request, GetState.Response] = \
319  self.create_client(GetState, node_service)
320  while not state_client.wait_for_service(timeout_sec=1.0):
321  self.info_msginfo_msg(f'{node_service} service not available, waiting...')
322  req = GetState.Request() # empty request
323  state = 'UNKNOWN'
324  while state != 'active':
325  self.info_msginfo_msg(f'Getting {node_name} state...')
326  future = state_client.call_async(req)
327  rclpy.spin_until_future_complete(self, future)
328  if future.result() is not None:
329  state = future.result().current_state.label # type: ignore[union-attr]
330  self.info_msginfo_msg(f'Result of get_state: {state}')
331  else:
332  self.error_msgerror_msg(
333  f'Exception while calling service: {future.exception()!r}'
334  )
335  time.sleep(5)
336 
337  def shutdown(self) -> None:
338  self.info_msginfo_msg('Shutting down')
339  self.action_client.destroy()
340 
341  transition_service = 'lifecycle_manager_navigation/manage_nodes'
342  mgr_client: Client[ManageLifecycleNodes.Request, ManageLifecycleNodes.Response] = \
343  self.create_client(ManageLifecycleNodes, transition_service)
344  while not mgr_client.wait_for_service(timeout_sec=1.0):
345  self.info_msginfo_msg(f'{transition_service} service not available, waiting...')
346 
347  req = ManageLifecycleNodes.Request()
348  req.command = ManageLifecycleNodes.Request().SHUTDOWN
349  future = mgr_client.call_async(req)
350  try:
351  self.info_msginfo_msg('Shutting down navigation lifecycle manager...')
352  rclpy.spin_until_future_complete(self, future)
353  future.result()
354  self.info_msginfo_msg('Shutting down navigation lifecycle manager complete.')
355  except Exception as e: # noqa: B902
356  self.error_msgerror_msg(f'Service call failed {e!r}')
357  transition_service = 'lifecycle_manager_localization/manage_nodes'
358  mgr_client = self.create_client(ManageLifecycleNodes, transition_service)
359  while not mgr_client.wait_for_service(timeout_sec=1.0):
360  self.info_msginfo_msg(f'{transition_service} service not available, waiting...')
361 
362  req = ManageLifecycleNodes.Request()
363  req.command = ManageLifecycleNodes.Request().SHUTDOWN
364  future = mgr_client.call_async(req)
365  try:
366  self.info_msginfo_msg('Shutting down localization lifecycle manager...')
367  rclpy.spin_until_future_complete(self, future)
368  future.result()
369  self.info_msginfo_msg('Shutting down localization lifecycle manager complete')
370  except Exception as e: # noqa: B902
371  self.error_msgerror_msg(f'Service call failed {e!r}')
372 
373  def wait_for_initial_pose(self) -> bool:
374  self.initial_pose_receivedinitial_pose_received = False
375  # If the initial pose is not received within 100 seconds, return False
376  # this is because when setting a wrong initial pose, amcl_pose is not received
377  # and the test will hang indefinitely
378  start_time = time.time()
379  duration = 0.0
380  while not self.initial_pose_receivedinitial_pose_received:
381  self.info_msginfo_msg('Setting initial pose')
382  self.setInitialPosesetInitialPose()
383  self.info_msginfo_msg('Waiting for amcl_pose to be received')
384  duration = time.time() - start_time
385  if duration > self.set_initial_pose_timeoutset_initial_pose_timeout:
386  self.error_msgerror_msg('Timeout waiting for initial pose to be set')
387  return False
388  rclpy.spin_once(self, timeout_sec=1)
389  return True
390 
391 
392 def test_RobotMovesToGoal(robot_tester: NavTester) -> bool:
393  robot_tester.info_msg('Setting goal pose')
394  robot_tester.publishGoalPose()
395  robot_tester.info_msg('Waiting 60 seconds for robot to reach goal')
396  return robot_tester.reachesGoal(timeout=60, distance=0.5)
397 
398 
399 def run_all_tests(robot_tester: NavTester) -> bool:
400  # set transforms to use_sim_time
401  result = True
402  if result:
403  robot_tester.wait_for_node_active('amcl')
404  result = robot_tester.wait_for_initial_pose()
405  if result:
406  robot_tester.wait_for_node_active('bt_navigator')
407  result = robot_tester.runNavigateAction()
408 
409  if result:
410  result = test_RobotMovesToGoal(robot_tester)
411 
412  # Add more tests here if desired
413 
414  if result:
415  robot_tester.info_msg('Test PASSED')
416  else:
417  robot_tester.error_msg('Test FAILED')
418 
419  return result
420 
421 
422 def fwd_pose(x: float = 0.0, y: float = 0.0, z: float = 0.01) -> Pose:
423  initial_pose = Pose()
424  initial_pose.position.x = x
425  initial_pose.position.y = y
426  initial_pose.position.z = z
427  initial_pose.orientation.x = 0.0
428  initial_pose.orientation.y = 0.0
429  initial_pose.orientation.z = 0.0
430  initial_pose.orientation.w = 1.0
431  return initial_pose
432 
433 
434 def get_testers(args: argparse.Namespace) -> list[NavTester]:
435  testers = []
436 
437  if args.robot:
438  # Requested tester for one robot
439  init_x, init_y, final_x, final_y = args.robot[0]
440  tester = NavTester(
441  initial_pose=fwd_pose(float(init_x), float(init_y)),
442  goal_pose=fwd_pose(float(final_x), float(final_y)),
443  )
444  tester.info_msg(
445  'Starting tester, robot going from '
446  + init_x
447  + ', '
448  + init_y
449  + ' to '
450  + final_x
451  + ', '
452  + final_y
453  + '.'
454  )
455  testers.append(tester)
456  return testers
457 
458  # Requested tester for multiple robots
459  for robot in args.robots:
460  namespace, init_x, init_y, final_x, final_y = robot
461  tester = NavTester(
462  namespace=namespace,
463  initial_pose=fwd_pose(float(init_x), float(init_y)),
464  goal_pose=fwd_pose(float(final_x), float(final_y)),
465  )
466  tester.info_msg(
467  'Starting tester for '
468  + namespace
469  + ' going from '
470  + init_x
471  + ', '
472  + init_y
473  + ' to '
474  + final_x
475  + ', '
476  + final_y
477  )
478  testers.append(tester)
479  return testers
480 
481 
482 def check_args(expect_failure: str) -> Any:
483  # Check if --expect_failure is True or False
484  if expect_failure != 'True' and expect_failure != 'False':
485  print(
486  '\033[1;37;41m' + ' -e flag must be set to True or False only. ' + '\033[0m'
487  )
488  exit(1)
489  else:
490  return eval(expect_failure)
491 
492 
493 def main(argv: list[str] = sys.argv[1:]): # type: ignore[no-untyped-def]
494  # The robot(s) positions from the input arguments
495  parser = argparse.ArgumentParser(description='System-level navigation tester node')
496  parser.add_argument('-e', '--expect_failure')
497  group = parser.add_mutually_exclusive_group(required=True)
498  group.add_argument(
499  '-r',
500  '--robot',
501  action='append',
502  nargs=4,
503  metavar=('init_x', 'init_y', 'final_x', 'final_y'),
504  help='The robot starting and final positions.',
505  )
506  group.add_argument(
507  '-rs',
508  '--robots',
509  action='append',
510  nargs=5,
511  metavar=('name', 'init_x', 'init_y', 'final_x', 'final_y'),
512  help="The robot's namespace and starting and final positions."
513  + 'Repeating the argument for multiple robots is supported.',
514  )
515 
516  args, unknown = parser.parse_known_args()
517 
518  expect_failure = check_args(args.expect_failure)
519 
520  rclpy.init()
521 
522  # Create testers for each robot
523  testers = get_testers(args)
524 
525  # wait a few seconds to make sure entire stacks are up
526  time.sleep(10)
527 
528  for tester in testers:
529  passed = run_all_tests(tester)
530  if passed != expect_failure:
531  break
532 
533  for tester in testers:
534  # stop and shutdown the nav stack to exit cleanly
535  tester.shutdown()
536 
537  testers[0].info_msg('Done Shutting Down.')
538 
539  if passed != expect_failure:
540  testers[0].info_msg('Exiting failed')
541  exit(1)
542  else:
543  testers[0].info_msg('Exiting passed')
544  exit(0)
545 
546 
547 if __name__ == '__main__':
548  main()
PoseStamped getStampedPoseMsg(self, Pose pose)
None poseCallback(self, PoseWithCovarianceStamped msg)