Nav2 Navigation Stack - jazzy  jazzy
ROS 2 Navigation Stack
loopback_simulator.py
1 # Copyright (c) 2024 Open Navigation LLC
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 
15 
16 import math
17 
18 from geometry_msgs.msg import PoseWithCovarianceStamped, Twist, TwistStamped
19 from geometry_msgs.msg import Quaternion, TransformStamped, Vector3
20 from nav2_simple_commander.line_iterator import LineIterator
21 from nav_msgs.msg import Odometry
22 from nav_msgs.srv import GetMap
23 import rclpy
24 from rclpy.duration import Duration
25 from rclpy.node import Node
26 from rclpy.qos import DurabilityPolicy, QoSProfile, ReliabilityPolicy
27 from rosgraph_msgs.msg import Clock
28 from sensor_msgs.msg import LaserScan
29 from tf2_ros import Buffer, TransformBroadcaster, TransformListener
30 import tf_transformations
31 
32 from .utils import (
33  addYawToQuat,
34  getMapOccupancy,
35  matrixToTransform,
36  transformStampedToMatrix,
37  worldToMap,
38 )
39 
40 """
41 This is a loopback simulator that replaces a physics simulator to create a
42 frictionless, inertialess, and collisionless simulation environment. It
43 accepts cmd_vel messages and publishes odometry & TF messages based on the
44 cumulative velocities received to mimick global localization and simulation.
45 It also accepts initialpose messages to set the initial pose of the robot
46 to place anywhere.
47 """
48 
49 
50 class LoopbackSimulator(Node):
51 
52  def __init__(self):
53  super().__init__(node_name='loopback_simulator')
54  self.curr_cmd_velcurr_cmd_vel = None
55  self.curr_cmd_vel_timecurr_cmd_vel_time = self.get_clock().now()
56  self.initial_poseinitial_pose = None
57  self.timertimer = None
58  self.setupTimersetupTimer = None
59  self.mapmap = None
60  self.mat_base_to_lasermat_base_to_laser = None
61 
62  self.declare_parameter('update_duration', 0.01)
63  self.update_durupdate_dur = self.get_parameter('update_duration').get_parameter_value().double_value
64 
65  self.declare_parameter('base_frame_id', 'base_footprint')
66  self.base_frame_idbase_frame_id = self.get_parameter('base_frame_id').get_parameter_value().string_value
67 
68  self.declare_parameter('map_frame_id', 'map')
69  self.map_frame_idmap_frame_id = self.get_parameter('map_frame_id').get_parameter_value().string_value
70 
71  self.declare_parameter('odom_frame_id', 'odom')
72  self.odom_frame_idodom_frame_id = self.get_parameter('odom_frame_id').get_parameter_value().string_value
73 
74  self.declare_parameter('scan_frame_id', 'base_scan')
75  self.scan_frame_idscan_frame_id = self.get_parameter('scan_frame_id').get_parameter_value().string_value
76 
77  self.declare_parameter('enable_stamped_cmd_vel', False)
78  use_stamped = self.get_parameter('enable_stamped_cmd_vel').get_parameter_value().bool_value
79 
80  self.declare_parameter('scan_publish_dur', 0.1)
81  self.scan_publish_durscan_publish_dur = self.get_parameter(
82  'scan_publish_dur').get_parameter_value().double_value
83 
84  self.declare_parameter('publish_map_odom_tf', True)
85  self.publish_map_odom_tfpublish_map_odom_tf = self.get_parameter(
86  'publish_map_odom_tf').get_parameter_value().bool_value
87 
88  self.declare_parameter('publish_clock', True)
89  self.publish_clockpublish_clock = self.get_parameter('publish_clock').get_parameter_value().bool_value
90 
91  self.declare_parameter('scan_range_min', 0.05)
92  self.scan_range_minscan_range_min = \
93  self.get_parameter('scan_range_min').get_parameter_value().double_value
94 
95  self.declare_parameter('scan_range_max', 30.0)
96  self.scan_range_maxscan_range_max = \
97  self.get_parameter('scan_range_max').get_parameter_value().double_value
98 
99  self.declare_parameter('scan_angle_min', -math.pi)
100  self.scan_angle_minscan_angle_min = \
101  self.get_parameter('scan_angle_min').get_parameter_value().double_value
102 
103  self.declare_parameter('scan_angle_max', math.pi)
104  self.scan_angle_maxscan_angle_max = \
105  self.get_parameter('scan_angle_max').get_parameter_value().double_value
106 
107  self.declare_parameter('scan_angle_increment', 0.0261) # 0.0261 rad = 1.5 degrees
108  self.scan_angle_incrementscan_angle_increment = \
109  self.get_parameter('scan_angle_increment').get_parameter_value().double_value
110 
111  self.declare_parameter('scan_use_inf', True)
112  self.use_infuse_inf = \
113  self.get_parameter('scan_use_inf').get_parameter_value().bool_value
114 
115  self.t_map_to_odomt_map_to_odom = TransformStamped()
116  self.t_map_to_odomt_map_to_odom.header.frame_id = self.map_frame_idmap_frame_id
117  self.t_map_to_odomt_map_to_odom.child_frame_id = self.odom_frame_idodom_frame_id
118  self.t_odom_to_base_linkt_odom_to_base_link = TransformStamped()
119  self.t_odom_to_base_linkt_odom_to_base_link.header.frame_id = self.odom_frame_idodom_frame_id
120  self.t_odom_to_base_linkt_odom_to_base_link.child_frame_id = self.base_frame_idbase_frame_id
121 
122  self.tf_broadcastertf_broadcaster = TransformBroadcaster(self)
123 
124  self.initial_pose_subinitial_pose_sub = self.create_subscription(
125  PoseWithCovarianceStamped,
126  'initialpose', self.initialPoseCallbackinitialPoseCallback, 10)
127  if not use_stamped:
128  self.cmd_vel_subcmd_vel_sub = self.create_subscription(
129  Twist,
130  'cmd_vel', self.cmdVelCallbackcmdVelCallback, 10)
131  else:
132  self.cmd_vel_subcmd_vel_sub = self.create_subscription(
133  TwistStamped,
134  'cmd_vel', self.cmdVelStampedCallbackcmdVelStampedCallback, 10)
135  self.odom_pubodom_pub = self.create_publisher(Odometry, 'odom', 10)
136 
137  sensor_qos = QoSProfile(
138  reliability=ReliabilityPolicy.BEST_EFFORT,
139  durability=DurabilityPolicy.VOLATILE,
140  depth=10)
141  self.scan_pubscan_pub = self.create_publisher(LaserScan, 'scan', sensor_qos)
142 
143  if self.publish_clockpublish_clock:
144  self.clock_timerclock_timer = self.create_timer(0.1, self.clockTimerCallbackclockTimerCallback)
145  self.clock_pubclock_pub = self.create_publisher(Clock, '/clock', 10)
146 
147  self.setupTimersetupTimer = self.create_timer(0.1, self.setupTimerCallbacksetupTimerCallback)
148 
149  self.map_clientmap_client = self.create_client(GetMap, '/map_server/map')
150 
151  self.tf_buffertf_buffer = Buffer()
152  self.tf_listenertf_listener = TransformListener(self.tf_buffertf_buffer, self)
153 
154  self.getMapgetMap()
155 
156  self.infoinfo('Loopback simulator initialized')
157 
158  def getBaseToLaserTf(self):
159  try:
160  # Wait for transform and lookup
161  transform = self.tf_buffertf_buffer.lookup_transform(
162  self.base_frame_idbase_frame_id, self.scan_frame_idscan_frame_id, rclpy.time.Time())
163  self.mat_base_to_lasermat_base_to_laser = transformStampedToMatrix(transform)
164 
165  except Exception as ex:
166  self.get_logger().error('Transform lookup failed: %s' % str(ex))
167 
168  def setupTimerCallback(self):
169  # Publish initial identity odom transform & laser scan to warm up system
170  self.tf_broadcastertf_broadcaster.sendTransform(self.t_odom_to_base_linkt_odom_to_base_link)
171  if self.mat_base_to_lasermat_base_to_laser is None:
172  self.getBaseToLaserTfgetBaseToLaserTf()
173 
174  def clockTimerCallback(self):
175  msg = Clock()
176  msg.clock = self.get_clock().now().to_msg()
177  self.clock_pubclock_pub.publish(msg)
178 
179  def cmdVelCallback(self, msg):
180  self.debugdebug('Received cmd_vel')
181  if self.initial_poseinitial_pose is None:
182  # Don't consider velocities before the initial pose is set
183  return
184  self.curr_cmd_velcurr_cmd_vel = msg
185  self.curr_cmd_vel_timecurr_cmd_vel_time = self.get_clock().now()
186 
187  def cmdVelStampedCallback(self, msg):
188  self.debugdebug('Received cmd_vel')
189  if self.initial_poseinitial_pose is None:
190  # Don't consider velocities before the initial pose is set
191  return
192  self.curr_cmd_velcurr_cmd_vel = msg.twist
193  self.curr_cmd_vel_timecurr_cmd_vel_time = rclpy.time.Time.from_msg(msg.header.stamp)
194 
195  def initialPoseCallback(self, msg):
196  self.infoinfo('Received initial pose!')
197  if self.initial_poseinitial_pose is None:
198  # Initialize transforms (map->odom as input pose, odom->base_link start from identity)
199  self.initial_poseinitial_pose = msg.pose.pose
200  self.t_map_to_odomt_map_to_odom.transform.translation.x = self.initial_poseinitial_pose.position.x
201  self.t_map_to_odomt_map_to_odom.transform.translation.y = self.initial_poseinitial_pose.position.y
202  self.t_map_to_odomt_map_to_odom.transform.translation.z = 0.0
203  self.t_map_to_odomt_map_to_odom.transform.rotation = self.initial_poseinitial_pose.orientation
204  self.t_odom_to_base_linkt_odom_to_base_link.transform.translation = Vector3()
205  self.t_odom_to_base_linkt_odom_to_base_link.transform.rotation = Quaternion()
206  self.publishTransformspublishTransforms(self.t_map_to_odomt_map_to_odom, self.t_odom_to_base_linkt_odom_to_base_link)
207 
208  # Start republication timer and velocity processing
209  if self.setupTimersetupTimer is not None:
210  self.setupTimersetupTimer.cancel()
211  self.setupTimersetupTimer.destroy()
212  self.setupTimersetupTimer = None
213  self.timertimer = self.create_timer(self.update_durupdate_dur, self.timerCallbacktimerCallback)
214  self.timer_lasertimer_laser = self.create_timer(self.scan_publish_durscan_publish_dur, self.publishLaserScanpublishLaserScan)
215  return
216 
217  self.initial_poseinitial_pose = msg.pose.pose
218 
219  # Adjust map->odom transform based on new initial pose, keeping odom->base_link the same
220  t_map_to_base_link = TransformStamped()
221  t_map_to_base_link.header = msg.header
222  t_map_to_base_link.child_frame_id = self.base_frame_idbase_frame_id
223  t_map_to_base_link.transform.translation.x = self.initial_poseinitial_pose.position.x
224  t_map_to_base_link.transform.translation.y = self.initial_poseinitial_pose.position.y
225  t_map_to_base_link.transform.translation.z = 0.0
226  t_map_to_base_link.transform.rotation = self.initial_poseinitial_pose.orientation
227  mat_map_to_base_link = transformStampedToMatrix(t_map_to_base_link)
228  mat_odom_to_base_link = transformStampedToMatrix(self.t_odom_to_base_linkt_odom_to_base_link)
229  mat_base_link_to_odom = tf_transformations.inverse_matrix(mat_odom_to_base_link)
230  mat_map_to_odom = \
231  tf_transformations.concatenate_matrices(mat_map_to_base_link, mat_base_link_to_odom)
232  self.t_map_to_odomt_map_to_odom.transform = matrixToTransform(mat_map_to_odom)
233 
234  def timerCallback(self):
235  # If no data, just republish existing transforms without change
236  one_sec = Duration(seconds=1)
237  if self.curr_cmd_velcurr_cmd_vel is None or self.get_clock().now() - self.curr_cmd_vel_timecurr_cmd_vel_time > one_sec:
238  self.publishTransformspublishTransforms(self.t_map_to_odomt_map_to_odom, self.t_odom_to_base_linkt_odom_to_base_link)
239  self.curr_cmd_velcurr_cmd_vel = None
240  return
241 
242  # Update odom->base_link from cmd_vel
243  dx = self.curr_cmd_velcurr_cmd_vel.linear.x * self.update_durupdate_dur
244  dy = self.curr_cmd_velcurr_cmd_vel.linear.y * self.update_durupdate_dur
245  dth = self.curr_cmd_velcurr_cmd_vel.angular.z * self.update_durupdate_dur
246  q = [self.t_odom_to_base_linkt_odom_to_base_link.transform.rotation.x,
247  self.t_odom_to_base_linkt_odom_to_base_link.transform.rotation.y,
248  self.t_odom_to_base_linkt_odom_to_base_link.transform.rotation.z,
249  self.t_odom_to_base_linkt_odom_to_base_link.transform.rotation.w]
250  _, _, yaw = tf_transformations.euler_from_quaternion(q)
251  self.t_odom_to_base_linkt_odom_to_base_link.transform.translation.x += dx * math.cos(yaw) - dy * math.sin(yaw)
252  self.t_odom_to_base_linkt_odom_to_base_link.transform.translation.y += dx * math.sin(yaw) + dy * math.cos(yaw)
253  self.t_odom_to_base_linkt_odom_to_base_link.transform.rotation = \
254  addYawToQuat(self.t_odom_to_base_linkt_odom_to_base_link.transform.rotation, dth)
255 
256  self.publishTransformspublishTransforms(self.t_map_to_odomt_map_to_odom, self.t_odom_to_base_linkt_odom_to_base_link)
257  self.publishOdometrypublishOdometry(self.t_odom_to_base_linkt_odom_to_base_link)
258 
259  def publishLaserScan(self):
260  # Publish a bogus laser scan for collision monitor
261  self.scan_msgscan_msg = LaserScan()
262  self.scan_msgscan_msg.header.stamp = (self.get_clock().now()).to_msg()
263  self.scan_msgscan_msg.header.frame_id = self.scan_frame_idscan_frame_id
264  self.scan_msgscan_msg.angle_min = self.scan_angle_minscan_angle_min
265  self.scan_msgscan_msg.angle_max = self.scan_angle_maxscan_angle_max
266  # 1.5 degrees
267  self.scan_msgscan_msg.angle_increment = self.scan_angle_incrementscan_angle_increment
268  self.scan_msgscan_msg.time_increment = 0.0
269  self.scan_msgscan_msg.scan_time = 0.1
270  self.scan_msgscan_msg.range_min = self.scan_range_minscan_range_min
271  self.scan_msgscan_msg.range_max = self.scan_range_maxscan_range_max
272  num_samples = int(
273  (self.scan_msgscan_msg.angle_max - self.scan_msgscan_msg.angle_min) /
274  self.scan_msgscan_msg.angle_increment)
275  self.scan_msgscan_msg.ranges = [0.0] * num_samples
276  self.getLaserScangetLaserScan(num_samples)
277  self.scan_pubscan_pub.publish(self.scan_msgscan_msg)
278 
279  def publishTransforms(self, map_to_odom, odom_to_base_link):
280  map_to_odom.header.stamp = \
281  (self.get_clock().now() + Duration(seconds=self.update_durupdate_dur)).to_msg()
282  odom_to_base_link.header.stamp = self.get_clock().now().to_msg()
283  if self.publish_map_odom_tfpublish_map_odom_tf:
284  self.tf_broadcastertf_broadcaster.sendTransform(map_to_odom)
285  self.tf_broadcastertf_broadcaster.sendTransform(odom_to_base_link)
286 
287  def publishOdometry(self, odom_to_base_link):
288  odom = Odometry()
289  odom.header.stamp = self.get_clock().now().to_msg()
290  odom.header.frame_id = 'odom'
291  odom.child_frame_id = 'base_link'
292  odom.pose.pose.position.x = odom_to_base_link.transform.translation.x
293  odom.pose.pose.position.y = odom_to_base_link.transform.translation.y
294  odom.pose.pose.orientation = odom_to_base_link.transform.rotation
295  odom.twist.twist = self.curr_cmd_velcurr_cmd_vel
296  self.odom_pubodom_pub.publish(odom)
297 
298  def info(self, msg):
299  self.get_logger().info(msg)
300  return
301 
302  def debug(self, msg):
303  self.get_logger().debug(msg)
304  return
305 
306  def getMap(self):
307  request = GetMap.Request()
308  if self.map_clientmap_client.wait_for_service(timeout_sec=5.0):
309  # Request to get map
310  future = self.map_clientmap_client.call_async(request)
311  rclpy.spin_until_future_complete(self, future)
312  if future.result() is not None:
313  self.mapmap = future.result().map
314  self.get_logger().info('Laser scan will be populated using map data')
315  else:
316  self.get_logger().warn(
317  'Failed to get map, '
318  'Laser scan will be populated using max range'
319  )
320  else:
321  self.get_logger().warn(
322  'Failed to get map, '
323  'Laser scan will be populated using max range'
324  )
325 
326  def getLaserPose(self):
327  mat_map_to_odom = transformStampedToMatrix(self.t_map_to_odomt_map_to_odom)
328  mat_odom_to_base = transformStampedToMatrix(self.t_odom_to_base_linkt_odom_to_base_link)
329 
330  mat_map_to_laser = tf_transformations.concatenate_matrices(
331  mat_map_to_odom,
332  mat_odom_to_base,
333  self.mat_base_to_lasermat_base_to_laser
334  )
335  transform = matrixToTransform(mat_map_to_laser)
336 
337  x = transform.translation.x
338  y = transform.translation.y
339  theta = tf_transformations.euler_from_quaternion([
340  transform.rotation.x,
341  transform.rotation.y,
342  transform.rotation.z,
343  transform.rotation.w
344  ])[2]
345 
346  return x, y, theta
347 
348  def getLaserScan(self, num_samples):
349  if self.mapmap is None or self.initial_poseinitial_pose is None or self.mat_base_to_lasermat_base_to_laser is None:
350  if self.use_infuse_inf:
351  self.scan_msgscan_msg.ranges = [float('inf')] * num_samples
352  else:
353  self.scan_msgscan_msg.ranges = [self.scan_msgscan_msg.range_max - 0.1] * num_samples
354  return
355 
356  x0, y0, theta = self.getLaserPosegetLaserPose()
357 
358  mx0, my0 = worldToMap(x0, y0, self.mapmap)
359 
360  if not 0 < mx0 < self.mapmap.info.width or not 0 < my0 < self.mapmap.info.height:
361  # outside map
362  if self.use_infuse_inf:
363  self.scan_msgscan_msg.ranges = [float('inf')] * num_samples
364  else:
365  self.scan_msgscan_msg.ranges = [self.scan_msgscan_msg.range_max - 0.1] * num_samples
366  return
367 
368  for i in range(num_samples):
369  curr_angle = theta + self.scan_msgscan_msg.angle_min + i * self.scan_msgscan_msg.angle_increment
370  x1 = x0 + self.scan_msgscan_msg.range_max * math.cos(curr_angle)
371  y1 = y0 + self.scan_msgscan_msg.range_max * math.sin(curr_angle)
372 
373  mx1, my1 = worldToMap(x1, y1, self.mapmap)
374 
375  line_iterator = LineIterator(mx0, my0, mx1, my1, 0.5)
376 
377  while line_iterator.isValid():
378  mx, my = int(line_iterator.getX()), int(line_iterator.getY())
379 
380  if not 0 < mx < self.mapmap.info.width or not 0 < my < self.mapmap.info.height:
381  # if outside map then check next ray
382  break
383 
384  point_cost = getMapOccupancy(mx, my, self.mapmap)
385 
386  if point_cost >= 60:
387  self.scan_msgscan_msg.ranges[i] = math.sqrt(
388  (int(line_iterator.getX()) - mx0) ** 2 +
389  (int(line_iterator.getY()) - my0) ** 2
390  ) * self.mapmap.info.resolution
391  break
392 
393  line_iterator.advance()
394  if self.scan_msgscan_msg.ranges[i] == 0.0 and self.use_infuse_inf:
395  self.scan_msgscan_msg.ranges[i] = float('inf')
396 
397 
398 def main():
399  rclpy.init()
400  loopback_simulator = LoopbackSimulator()
401  rclpy.spin(loopback_simulator)
402  loopback_simulator.destroy_node()
403  rclpy.shutdown()
404  exit(0)
405 
406 
407 if __name__ == '__main__':
408  main()
def publishOdometry(self, odom_to_base_link)
def publishTransforms(self, map_to_odom, odom_to_base_link)