Nav2 Navigation Stack - rolling  main
ROS 2 Navigation Stack
metrics.py
1 #! /usr/bin/env python3
2 # Copyright (c) 2022 Samsung R&D Institute Russia
3 # Copyright (c) 2022 Joshua Wallace
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 
17 import math
18 import os
19 import pickle
20 from random import randint, seed, uniform
21 from typing import Optional
22 
23 from builtin_interfaces.msg import Time
24 from geometry_msgs.msg import PoseStamped
25 from nav2_msgs.action import ComputePathToPose, SmoothPath
26 from nav2_simple_commander.robot_navigator import BasicNavigator
27 import numpy as np
28 from numpy.typing import NDArray
29 import rclpy
30 from transforms3d.euler import euler2quat
31 
32 # Note: Map origin is assumed to be (0,0)
33 
34 
35 def getPlannerResults(
36  navigator: BasicNavigator, initial_pose: PoseStamped,
37  goal_pose: PoseStamped, planner: str) -> Optional[ComputePathToPose.Result]:
38  result = navigator._getPathImpl(initial_pose, goal_pose, planner, use_start=True)
39  if result is None or result.error_code != 0:
40  print(planner, 'planner failed to produce the path')
41  return None
42  return result
43 
44 
45 def getSmootherResults(
46  navigator: BasicNavigator, path: PoseStamped,
47  smoothers: list[str]) -> Optional[list[SmoothPath.Result]]:
48  smoothed_results = []
49  for smoother in smoothers:
50  smoothed_result = navigator._smoothPathImpl(path, smoother)
51  if smoothed_result is not None:
52  smoothed_results.append(smoothed_result)
53  else:
54  print(smoother, 'failed to smooth the path')
55  return None
56  return smoothed_results
57 
58 
59 def getRandomStart(costmap: NDArray[np.float32], max_cost: int,
60  side_buffer: int, time_stamp: Time, res: float) -> PoseStamped:
61  start = PoseStamped()
62  start.header.frame_id = 'map'
63  start.header.stamp = time_stamp
64  while True:
65  row = randint(side_buffer, costmap.shape[0] - side_buffer)
66  col = randint(side_buffer, costmap.shape[1] - side_buffer)
67 
68  if costmap[row, col] < max_cost:
69  start.pose.position.x = col * res
70  start.pose.position.y = row * res
71 
72  yaw = uniform(0, 1) * 2 * math.pi
73  quad = euler2quat(0.0, 0.0, yaw)
74  start.pose.orientation.w = quad[0]
75  start.pose.orientation.x = quad[1]
76  start.pose.orientation.y = quad[2]
77  start.pose.orientation.z = quad[3]
78  break
79  return start
80 
81 
82 def getRandomGoal(
83  costmap: NDArray[np.float32], start: PoseStamped,
84  max_cost: int, side_buffer: int,
85  time_stamp: Time, res: float) -> PoseStamped:
86  goal = PoseStamped()
87  goal.header.frame_id = 'map'
88  goal.header.stamp = time_stamp
89  while True:
90  row = randint(side_buffer, costmap.shape[0] - side_buffer)
91  col = randint(side_buffer, costmap.shape[1] - side_buffer)
92 
93  start_x = start.pose.position.x
94  start_y = start.pose.position.y
95  goal_x = col * res
96  goal_y = row * res
97  x_diff = goal_x - start_x
98  y_diff = goal_y - start_y
99  dist = math.sqrt(x_diff ** 2 + y_diff ** 2)
100 
101  if costmap[row, col] < max_cost and dist > 3.0:
102  goal.pose.position.x = goal_x
103  goal.pose.position.y = goal_y
104 
105  yaw = uniform(0, 1) * 2 * math.pi
106  quad = euler2quat(0.0, 0.0, yaw)
107  goal.pose.orientation.w = quad[0]
108  goal.pose.orientation.x = quad[1]
109  goal.pose.orientation.y = quad[2]
110  goal.pose.orientation.z = quad[3]
111  break
112  return goal
113 
114 
115 def main() -> None:
116  rclpy.init()
117 
118  navigator = BasicNavigator()
119 
120  # Wait for planner and smoother to fully activate
121  print('Waiting for planner and smoother servers to activate')
122  navigator.waitUntilNav2Active('smoother_server', 'planner_server')
123 
124  # Get the costmap for start/goal validation
125  costmap_msg = navigator.getGlobalCostmap()
126  costmap = np.asarray(costmap_msg.data)
127  costmap.resize(costmap_msg.metadata.size_y, costmap_msg.metadata.size_x)
128 
129  planner = 'SmacHybrid'
130  smoothers = ['simple_smoother', 'constrained_smoother', 'sg_smoother']
131  max_cost = 210
132  side_buffer = 10
133  time_stamp = navigator.get_clock().now().to_msg()
134  results = []
135  seed(33)
136 
137  random_pairs = 100
138  i = 0
139  res = costmap_msg.metadata.resolution
140  while i < random_pairs:
141  print('Cycle: ', i, 'out of: ', random_pairs)
142  start = getRandomStart(costmap, max_cost, side_buffer, time_stamp, res)
143  goal = getRandomGoal(costmap, start, max_cost, side_buffer, time_stamp, res)
144  print('Start', start)
145  print('Goal', goal)
146  result = getPlannerResults(navigator, start, goal, planner)
147  if result is not None:
148  smoothed_results = getSmootherResults(navigator, result.path, smoothers)
149  if smoothed_results is not None:
150  results.append(result)
151  results.append(smoothed_results)
152  i += 1
153 
154  print('Write Results...')
155  benchmark_dir = os.getcwd()
156  with open(os.path.join(benchmark_dir, 'results.pickle'), 'wb') as f:
157  pickle.dump(results, f, pickle.HIGHEST_PROTOCOL)
158 
159  with open(os.path.join(benchmark_dir, 'costmap.pickle'), 'wb') as f:
160  pickle.dump(costmap_msg, f, pickle.HIGHEST_PROTOCOL)
161 
162  smoothers.insert(0, planner)
163  with open(os.path.join(benchmark_dir, 'methods.pickle'), 'wb') as f:
164  pickle.dump(smoothers, f, pickle.HIGHEST_PROTOCOL)
165  print('Write Complete')
166 
167  exit(0)
168 
169 
170 if __name__ == '__main__':
171  main()