Nav2 Navigation Stack - jazzy  jazzy
ROS 2 Navigation Stack
metrics.py
1 #! /usr/bin/env python3
2 # Copyright (c) 2022 Samsung R&D Institute Russia
3 # Copyright (c) 2022 Joshua Wallace
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 
17 import math
18 import os
19 import pickle
20 from random import randint, seed, uniform
21 
22 from geometry_msgs.msg import PoseStamped
23 from nav2_simple_commander.robot_navigator import BasicNavigator
24 import numpy as np
25 import rclpy
26 from transforms3d.euler import euler2quat
27 
28 
29 # Note: Map origin is assumed to be (0,0)
30 
31 
32 def getPlannerResults(navigator, initial_pose, goal_pose, planner):
33  result = navigator._getPathImpl(initial_pose, goal_pose, planner, use_start=True)
34  if result is None or result.error_code != 0:
35  print(planner, 'planner failed to produce the path')
36  return None
37  return result
38 
39 
40 def getSmootherResults(navigator, path, smoothers):
41  smoothed_results = []
42  for smoother in smoothers:
43  smoothed_result = navigator._smoothPathImpl(path, smoother)
44  if smoothed_result is not None:
45  smoothed_results.append(smoothed_result)
46  else:
47  print(smoother, 'failed to smooth the path')
48  return None
49  return smoothed_results
50 
51 
52 def getRandomStart(costmap, max_cost, side_buffer, time_stamp, res):
53  start = PoseStamped()
54  start.header.frame_id = 'map'
55  start.header.stamp = time_stamp
56  while True:
57  row = randint(side_buffer, costmap.shape[0] - side_buffer)
58  col = randint(side_buffer, costmap.shape[1] - side_buffer)
59 
60  if costmap[row, col] < max_cost:
61  start.pose.position.x = col * res
62  start.pose.position.y = row * res
63 
64  yaw = uniform(0, 1) * 2 * math.pi
65  quad = euler2quat(0.0, 0.0, yaw)
66  start.pose.orientation.w = quad[0]
67  start.pose.orientation.x = quad[1]
68  start.pose.orientation.y = quad[2]
69  start.pose.orientation.z = quad[3]
70  break
71  return start
72 
73 
74 def getRandomGoal(costmap, start, max_cost, side_buffer, time_stamp, res):
75  goal = PoseStamped()
76  goal.header.frame_id = 'map'
77  goal.header.stamp = time_stamp
78  while True:
79  row = randint(side_buffer, costmap.shape[0] - side_buffer)
80  col = randint(side_buffer, costmap.shape[1] - side_buffer)
81 
82  start_x = start.pose.position.x
83  start_y = start.pose.position.y
84  goal_x = col * res
85  goal_y = row * res
86  x_diff = goal_x - start_x
87  y_diff = goal_y - start_y
88  dist = math.sqrt(x_diff ** 2 + y_diff ** 2)
89 
90  if costmap[row, col] < max_cost and dist > 3.0:
91  goal.pose.position.x = goal_x
92  goal.pose.position.y = goal_y
93 
94  yaw = uniform(0, 1) * 2 * math.pi
95  quad = euler2quat(0.0, 0.0, yaw)
96  goal.pose.orientation.w = quad[0]
97  goal.pose.orientation.x = quad[1]
98  goal.pose.orientation.y = quad[2]
99  goal.pose.orientation.z = quad[3]
100  break
101  return goal
102 
103 
104 def main():
105  rclpy.init()
106 
107  navigator = BasicNavigator()
108 
109  # Wait for planner and smoother to fully activate
110  print('Waiting for planner and smoother servers to activate')
111  navigator.waitUntilNav2Active('smoother_server', 'planner_server')
112 
113  # Get the costmap for start/goal validation
114  costmap_msg = navigator.getGlobalCostmap()
115  costmap = np.asarray(costmap_msg.data)
116  costmap.resize(costmap_msg.metadata.size_y, costmap_msg.metadata.size_x)
117 
118  planner = 'SmacHybrid'
119  smoothers = ['simple_smoother', 'constrained_smoother', 'sg_smoother']
120  max_cost = 210
121  side_buffer = 10
122  time_stamp = navigator.get_clock().now().to_msg()
123  results = []
124  seed(33)
125 
126  random_pairs = 100
127  i = 0
128  res = costmap_msg.metadata.resolution
129  while i < random_pairs:
130  print('Cycle: ', i, 'out of: ', random_pairs)
131  start = getRandomStart(costmap, max_cost, side_buffer, time_stamp, res)
132  goal = getRandomGoal(costmap, start, max_cost, side_buffer, time_stamp, res)
133  print('Start', start)
134  print('Goal', goal)
135  result = getPlannerResults(navigator, start, goal, planner)
136  if result is not None:
137  smoothed_results = getSmootherResults(navigator, result.path, smoothers)
138  if smoothed_results is not None:
139  results.append(result)
140  results.append(smoothed_results)
141  i += 1
142 
143  print('Write Results...')
144  benchmark_dir = os.getcwd()
145  with open(os.path.join(benchmark_dir, 'results.pickle'), 'wb') as f:
146  pickle.dump(results, f, pickle.HIGHEST_PROTOCOL)
147 
148  with open(os.path.join(benchmark_dir, 'costmap.pickle'), 'wb') as f:
149  pickle.dump(costmap_msg, f, pickle.HIGHEST_PROTOCOL)
150 
151  smoothers.insert(0, planner)
152  with open(os.path.join(benchmark_dir, 'methods.pickle'), 'wb') as f:
153  pickle.dump(smoothers, f, pickle.HIGHEST_PROTOCOL)
154  print('Write Complete')
155 
156  exit(0)
157 
158 
159 if __name__ == '__main__':
160  main()