Nav2 Navigation Stack - rolling  main
ROS 2 Navigation Stack
metrics.py
1 #! /usr/bin/env python3
2 # Copyright (c) 2022 Samsung R&D Institute Russia
3 # Copyright (c) 2022 Joshua Wallace
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 
17 import math
18 import os
19 import pickle
20 from random import randint, seed, uniform
21 from typing import Optional
22 
23 from geometry_msgs.msg import PoseStamped
24 from nav2_simple_commander.robot_navigator import BasicNavigator
25 import numpy as np
26 from numpy.typing import NDArray
27 import rclpy
28 from rclpy.time import Time
29 from transforms3d.euler import euler2quat
30 
31 # Note: Map origin is assumed to be (0,0)
32 
33 
34 def getPlannerResults(
35  navigator: BasicNavigator, initial_pose: PoseStamped,
36  goal_pose: PoseStamped, planner: str) -> PoseStamped:
37  result = navigator._getPathImpl(initial_pose, goal_pose, planner, use_start=True)
38  if result is None or result.error_code != 0:
39  print(planner, 'planner failed to produce the path')
40  return None
41  return result
42 
43 
44 def getSmootherResults(
45  navigator: BasicNavigator, path: PoseStamped,
46  smoothers: list[str]) -> Optional[list[PoseStamped]]:
47  smoothed_results = []
48  for smoother in smoothers:
49  smoothed_result = navigator._smoothPathImpl(path, smoother)
50  if smoothed_result is not None:
51  smoothed_results.append(smoothed_result)
52  else:
53  print(smoother, 'failed to smooth the path')
54  return None
55  return smoothed_results
56 
57 
58 def getRandomStart(costmap: NDArray[np.float32], max_cost: int,
59  side_buffer: int, time_stamp: Time, res: float) -> PoseStamped:
60  start = PoseStamped()
61  start.header.frame_id = 'map'
62  start.header.stamp = time_stamp
63  while True:
64  row = randint(side_buffer, costmap.shape[0] - side_buffer)
65  col = randint(side_buffer, costmap.shape[1] - side_buffer)
66 
67  if costmap[row, col] < max_cost:
68  start.pose.position.x = col * res
69  start.pose.position.y = row * res
70 
71  yaw = uniform(0, 1) * 2 * math.pi
72  quad = euler2quat(0.0, 0.0, yaw)
73  start.pose.orientation.w = quad[0]
74  start.pose.orientation.x = quad[1]
75  start.pose.orientation.y = quad[2]
76  start.pose.orientation.z = quad[3]
77  break
78  return start
79 
80 
81 def getRandomGoal(
82  costmap: NDArray[np.float32], start: PoseStamped,
83  max_cost: int, side_buffer: int,
84  time_stamp: Time, res: float) -> PoseStamped:
85  goal = PoseStamped()
86  goal.header.frame_id = 'map'
87  goal.header.stamp = time_stamp
88  while True:
89  row = randint(side_buffer, costmap.shape[0] - side_buffer)
90  col = randint(side_buffer, costmap.shape[1] - side_buffer)
91 
92  start_x = start.pose.position.x
93  start_y = start.pose.position.y
94  goal_x = col * res
95  goal_y = row * res
96  x_diff = goal_x - start_x
97  y_diff = goal_y - start_y
98  dist = math.sqrt(x_diff ** 2 + y_diff ** 2)
99 
100  if costmap[row, col] < max_cost and dist > 3.0:
101  goal.pose.position.x = goal_x
102  goal.pose.position.y = goal_y
103 
104  yaw = uniform(0, 1) * 2 * math.pi
105  quad = euler2quat(0.0, 0.0, yaw)
106  goal.pose.orientation.w = quad[0]
107  goal.pose.orientation.x = quad[1]
108  goal.pose.orientation.y = quad[2]
109  goal.pose.orientation.z = quad[3]
110  break
111  return goal
112 
113 
114 def main() -> None:
115  rclpy.init()
116 
117  navigator = BasicNavigator()
118 
119  # Wait for planner and smoother to fully activate
120  print('Waiting for planner and smoother servers to activate')
121  navigator.waitUntilNav2Active('smoother_server', 'planner_server')
122 
123  # Get the costmap for start/goal validation
124  costmap_msg = navigator.getGlobalCostmap()
125  costmap = np.asarray(costmap_msg.data)
126  costmap.resize(costmap_msg.metadata.size_y, costmap_msg.metadata.size_x)
127 
128  planner = 'SmacHybrid'
129  smoothers = ['simple_smoother', 'constrained_smoother', 'sg_smoother']
130  max_cost = 210
131  side_buffer = 10
132  time_stamp = navigator.get_clock().now().to_msg()
133  results = []
134  seed(33)
135 
136  random_pairs = 100
137  i = 0
138  res = costmap_msg.metadata.resolution
139  while i < random_pairs:
140  print('Cycle: ', i, 'out of: ', random_pairs)
141  start = getRandomStart(costmap, max_cost, side_buffer, time_stamp, res)
142  goal = getRandomGoal(costmap, start, max_cost, side_buffer, time_stamp, res)
143  print('Start', start)
144  print('Goal', goal)
145  result = getPlannerResults(navigator, start, goal, planner)
146  if result is not None:
147  smoothed_results = getSmootherResults(navigator, result.path, smoothers)
148  if smoothed_results is not None:
149  results.append(result)
150  results.append(smoothed_results)
151  i += 1
152 
153  print('Write Results...')
154  benchmark_dir = os.getcwd()
155  with open(os.path.join(benchmark_dir, 'results.pickle'), 'wb') as f:
156  pickle.dump(results, f, pickle.HIGHEST_PROTOCOL)
157 
158  with open(os.path.join(benchmark_dir, 'costmap.pickle'), 'wb') as f:
159  pickle.dump(costmap_msg, f, pickle.HIGHEST_PROTOCOL)
160 
161  smoothers.insert(0, planner)
162  with open(os.path.join(benchmark_dir, 'methods.pickle'), 'wb') as f:
163  pickle.dump(smoothers, f, pickle.HIGHEST_PROTOCOL)
164  print('Write Complete')
165 
166  exit(0)
167 
168 
169 if __name__ == '__main__':
170  main()