Nav2 Navigation Stack - rolling  main
ROS 2 Navigation Stack
loopback_simulator.py
1 # Copyright (c) 2024 Open Navigation LLC
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 
15 
16 import math
17 from typing import Optional
18 
19 from geometry_msgs.msg import (PoseWithCovarianceStamped, Quaternion, TransformStamped, Twist,
20  TwistStamped, Vector3)
21 from nav2_loopback_sim.utils import (addYawToQuat, getMapOccupancy, matrixToTransform,
22  transformStampedToMatrix, worldToMap)
23 from nav2_simple_commander.line_iterator import LineIterator
24 from nav_msgs.msg import Odometry
25 from nav_msgs.srv import GetMap
26 import numpy as np
27 import rclpy
28 from rclpy.client import Client
29 from rclpy.duration import Duration
30 from rclpy.node import Node
31 from rclpy.qos import DurabilityPolicy, QoSProfile, ReliabilityPolicy
32 from rclpy.timer import Timer
33 from rosgraph_msgs.msg import Clock
34 from sensor_msgs.msg import LaserScan
35 from tf2_ros import Buffer, TransformBroadcaster, TransformListener
36 import tf_transformations
37 
38 """
39 This is a loopback simulator that replaces a physics simulator to create a
40 frictionless, inertialess, and collisionless simulation environment. It
41 accepts cmd_vel messages and publishes odometry & TF messages based on the
42 cumulative velocities received to mimic global localization and simulation.
43 It also accepts initialpose messages to set the initial pose of the robot
44 to place anywhere.
45 """
46 
47 
48 class LoopbackSimulator(Node):
49 
50  def __init__(self) -> None:
51  super().__init__(node_name='loopback_simulator')
52  self.curr_cmd_velcurr_cmd_vel = None
53  self.curr_cmd_vel_timecurr_cmd_vel_time = self.get_clock().now()
54  self.initial_poseinitial_pose: PoseWithCovarianceStamped = None
55  self.timertimer: Optional[Timer] = None
56  self.setupTimersetupTimer = None
57  self.mapmap = None
58  self.mat_base_to_lasermat_base_to_laser: Optional[np.ndarray[np.float64, np.dtype[np.float64]]] = None
59 
60  self.declare_parameter('update_duration', 0.01)
61  self.update_durupdate_dur = self.get_parameter('update_duration').get_parameter_value().double_value
62 
63  self.declare_parameter('base_frame_id', 'base_footprint')
64  self.base_frame_idbase_frame_id = self.get_parameter('base_frame_id').get_parameter_value().string_value
65 
66  self.declare_parameter('map_frame_id', 'map')
67  self.map_frame_idmap_frame_id = self.get_parameter('map_frame_id').get_parameter_value().string_value
68 
69  self.declare_parameter('odom_frame_id', 'odom')
70  self.odom_frame_idodom_frame_id = self.get_parameter('odom_frame_id').get_parameter_value().string_value
71 
72  self.declare_parameter('scan_frame_id', 'base_scan')
73  self.scan_frame_idscan_frame_id = self.get_parameter('scan_frame_id').get_parameter_value().string_value
74 
75  self.declare_parameter('enable_stamped_cmd_vel', True)
76  use_stamped = self.get_parameter('enable_stamped_cmd_vel').get_parameter_value().bool_value
77 
78  self.declare_parameter('scan_publish_dur', 0.1)
79  self.scan_publish_durscan_publish_dur = self.get_parameter(
80  'scan_publish_dur').get_parameter_value().double_value
81 
82  self.declare_parameter('publish_map_odom_tf', True)
83  self.publish_map_odom_tfpublish_map_odom_tf = self.get_parameter(
84  'publish_map_odom_tf').get_parameter_value().bool_value
85 
86  self.declare_parameter('publish_clock', True)
87  self.publish_clockpublish_clock = self.get_parameter('publish_clock').get_parameter_value().bool_value
88 
89  self.declare_parameter('scan_range_min', 0.05)
90  self.scan_range_minscan_range_min = \
91  self.get_parameter('scan_range_min').get_parameter_value().double_value
92 
93  self.declare_parameter('scan_range_max', 30.0)
94  self.scan_range_maxscan_range_max = \
95  self.get_parameter('scan_range_max').get_parameter_value().double_value
96 
97  self.declare_parameter('scan_angle_min', -math.pi)
98  self.scan_angle_minscan_angle_min = \
99  self.get_parameter('scan_angle_min').get_parameter_value().double_value
100 
101  self.declare_parameter('scan_angle_max', math.pi)
102  self.scan_angle_maxscan_angle_max = \
103  self.get_parameter('scan_angle_max').get_parameter_value().double_value
104 
105  self.declare_parameter('scan_angle_increment', 0.0261) # 0.0261 rad = 1.5 degrees
106  self.scan_angle_incrementscan_angle_increment = \
107  self.get_parameter('scan_angle_increment').get_parameter_value().double_value
108 
109  self.declare_parameter('scan_use_inf', True)
110  self.use_infuse_inf = \
111  self.get_parameter('scan_use_inf').get_parameter_value().bool_value
112 
113  self.t_map_to_odomt_map_to_odom = TransformStamped()
114  self.t_map_to_odomt_map_to_odom.header.frame_id = self.map_frame_idmap_frame_id
115  self.t_map_to_odomt_map_to_odom.child_frame_id = self.odom_frame_idodom_frame_id
116  self.t_odom_to_base_linkt_odom_to_base_link = TransformStamped()
117  self.t_odom_to_base_linkt_odom_to_base_link.header.frame_id = self.odom_frame_idodom_frame_id
118  self.t_odom_to_base_linkt_odom_to_base_link.child_frame_id = self.base_frame_idbase_frame_id
119 
120  self.tf_broadcastertf_broadcaster = TransformBroadcaster(self)
121 
122  self.initial_pose_subinitial_pose_sub = self.create_subscription(
123  PoseWithCovarianceStamped,
124  'initialpose', self.initialPoseCallbackinitialPoseCallback, 10)
125  if not use_stamped:
126  self.cmd_vel_subcmd_vel_sub = self.create_subscription(
127  Twist,
128  'cmd_vel', self.cmdVelCallbackcmdVelCallback, 10)
129  else:
130  self.cmd_vel_subcmd_vel_sub = self.create_subscription(
131  TwistStamped,
132  'cmd_vel', self.cmdVelStampedCallbackcmdVelStampedCallback, 10)
133  self.odom_pubodom_pub = self.create_publisher(Odometry, 'odom', 10)
134 
135  sensor_qos = QoSProfile(
136  reliability=ReliabilityPolicy.BEST_EFFORT,
137  durability=DurabilityPolicy.VOLATILE,
138  depth=10)
139  self.scan_pubscan_pub = self.create_publisher(LaserScan, 'scan', sensor_qos)
140 
141  if self.publish_clockpublish_clock:
142  self.clock_timerclock_timer = self.create_timer(0.1, self.clockTimerCallbackclockTimerCallback)
143  self.clock_pubclock_pub = self.create_publisher(Clock, '/clock', 10)
144 
145  self.setupTimersetupTimer = self.create_timer(0.1, self.setupTimerCallbacksetupTimerCallback)
146 
147  self.map_client: Client[GetMap.Request, GetMap.Response] = \
148  self.create_client(GetMap, '/map_server/map')
149 
150  self.tf_buffertf_buffer = Buffer()
151  self.tf_listenertf_listener = TransformListener(self.tf_buffertf_buffer, self)
152 
153  self.getMapgetMap()
154 
155  self.infoinfo('Loopback simulator initialized')
156 
157  def getBaseToLaserTf(self) -> None:
158  try:
159  # Wait for transform and lookup
160  transform = self.tf_buffertf_buffer.lookup_transform(
161  self.base_frame_idbase_frame_id, self.scan_frame_idscan_frame_id, rclpy.time.Time())
162  self.mat_base_to_lasermat_base_to_laser = transformStampedToMatrix(transform)
163 
164  except Exception as ex:
165  self.get_logger().error(f'Transform lookup failed: {str(ex)}')
166 
167  def setupTimerCallback(self) -> None:
168  # Publish initial identity odom transform & laser scan to warm up system
169  self.tf_broadcastertf_broadcaster.sendTransform(self.t_odom_to_base_linkt_odom_to_base_link)
170  if self.mat_base_to_lasermat_base_to_laser is None:
171  self.getBaseToLaserTfgetBaseToLaserTf()
172 
173  def clockTimerCallback(self) -> None:
174  msg = Clock()
175  msg.clock = self.get_clock().now().to_msg()
176  self.clock_pubclock_pub.publish(msg)
177 
178  def cmdVelCallback(self, msg: Twist) -> None:
179  self.debugdebug('Received cmd_vel')
180  if self.initial_poseinitial_pose is None:
181  # Don't consider velocities before the initial pose is set
182  return
183  self.curr_cmd_velcurr_cmd_vel = msg
184  self.curr_cmd_vel_timecurr_cmd_vel_time = self.get_clock().now()
185 
186  def cmdVelStampedCallback(self, msg: TwistStamped) -> None:
187  self.debugdebug('Received cmd_vel')
188  if self.initial_poseinitial_pose is None:
189  # Don't consider velocities before the initial pose is set
190  return
191  self.curr_cmd_velcurr_cmd_vel = msg.twist
192  self.curr_cmd_vel_timecurr_cmd_vel_time = rclpy.time.Time.from_msg(msg.header.stamp)
193 
194  def initialPoseCallback(self, msg: PoseWithCovarianceStamped) -> None:
195  self.infoinfo('Received initial pose!')
196  if self.initial_poseinitial_pose is None:
197  # Initialize transforms (map->odom as input pose, odom->base_link start from identity)
198  self.initial_poseinitial_pose = msg.pose.pose
199  self.t_map_to_odomt_map_to_odom.transform.translation.x = self.initial_poseinitial_pose.position.x
200  self.t_map_to_odomt_map_to_odom.transform.translation.y = self.initial_poseinitial_pose.position.y
201  self.t_map_to_odomt_map_to_odom.transform.translation.z = 0.0
202  self.t_map_to_odomt_map_to_odom.transform.rotation = self.initial_poseinitial_pose.orientation
203  self.t_odom_to_base_linkt_odom_to_base_link.transform.translation = Vector3()
204  self.t_odom_to_base_linkt_odom_to_base_link.transform.rotation = Quaternion()
205  self.publishTransformspublishTransforms(self.t_map_to_odomt_map_to_odom, self.t_odom_to_base_linkt_odom_to_base_link)
206 
207  # Start republication timer and velocity processing
208  if self.setupTimersetupTimer is not None:
209  self.setupTimersetupTimer.cancel()
210  self.setupTimersetupTimer.destroy()
211  self.setupTimersetupTimer = None
212  self.timertimer = self.create_timer(self.update_durupdate_dur, self.timerCallbacktimerCallback)
213  self.timer_lasertimer_laser = self.create_timer(self.scan_publish_durscan_publish_dur, self.publishLaserScanpublishLaserScan)
214  return
215 
216  self.initial_poseinitial_pose = msg.pose.pose
217 
218  # Adjust map->odom transform based on new initial pose, keeping odom->base_link the same
219  t_map_to_base_link = TransformStamped()
220  t_map_to_base_link.header = msg.header
221  t_map_to_base_link.child_frame_id = self.base_frame_idbase_frame_id
222  t_map_to_base_link.transform.translation.x = self.initial_poseinitial_pose.position.x
223  t_map_to_base_link.transform.translation.y = self.initial_poseinitial_pose.position.y
224  t_map_to_base_link.transform.translation.z = 0.0
225  t_map_to_base_link.transform.rotation = self.initial_poseinitial_pose.orientation
226  mat_map_to_base_link = transformStampedToMatrix(t_map_to_base_link)
227  mat_odom_to_base_link = transformStampedToMatrix(self.t_odom_to_base_linkt_odom_to_base_link)
228  mat_base_link_to_odom = tf_transformations.inverse_matrix(mat_odom_to_base_link)
229  mat_map_to_odom = \
230  tf_transformations.concatenate_matrices(mat_map_to_base_link, mat_base_link_to_odom)
231  self.t_map_to_odomt_map_to_odom.transform = matrixToTransform(mat_map_to_odom)
232 
233  def timerCallback(self) -> None:
234  # If no data, just republish existing transforms without change
235  one_sec = Duration(seconds=1)
236  if self.curr_cmd_velcurr_cmd_vel is None or self.get_clock().now() - self.curr_cmd_vel_timecurr_cmd_vel_time > one_sec:
237  self.publishTransformspublishTransforms(self.t_map_to_odomt_map_to_odom, self.t_odom_to_base_linkt_odom_to_base_link)
238  self.curr_cmd_velcurr_cmd_vel = None
239  return
240 
241  # Update odom->base_link from cmd_vel
242  dx = self.curr_cmd_velcurr_cmd_vel.linear.x * self.update_durupdate_dur
243  dy = self.curr_cmd_velcurr_cmd_vel.linear.y * self.update_durupdate_dur
244  dth = self.curr_cmd_velcurr_cmd_vel.angular.z * self.update_durupdate_dur
245  q = [self.t_odom_to_base_linkt_odom_to_base_link.transform.rotation.x,
246  self.t_odom_to_base_linkt_odom_to_base_link.transform.rotation.y,
247  self.t_odom_to_base_linkt_odom_to_base_link.transform.rotation.z,
248  self.t_odom_to_base_linkt_odom_to_base_link.transform.rotation.w]
249  _, _, yaw = tf_transformations.euler_from_quaternion(q)
250  self.t_odom_to_base_linkt_odom_to_base_link.transform.translation.x += dx * math.cos(yaw) - dy * math.sin(yaw)
251  self.t_odom_to_base_linkt_odom_to_base_link.transform.translation.y += dx * math.sin(yaw) + dy * math.cos(yaw)
252  self.t_odom_to_base_linkt_odom_to_base_link.transform.rotation = \
253  addYawToQuat(self.t_odom_to_base_linkt_odom_to_base_link.transform.rotation, dth)
254 
255  self.publishTransformspublishTransforms(self.t_map_to_odomt_map_to_odom, self.t_odom_to_base_linkt_odom_to_base_link)
256  self.publishOdometrypublishOdometry(self.t_odom_to_base_linkt_odom_to_base_link)
257 
258  def publishLaserScan(self) -> None:
259  # Publish a bogus laser scan for collision monitor
260  self.scan_msgscan_msg = LaserScan()
261  self.scan_msgscan_msg.header.stamp = (self.get_clock().now()).to_msg()
262  self.scan_msgscan_msg.header.frame_id = self.scan_frame_idscan_frame_id
263  self.scan_msgscan_msg.angle_min = self.scan_angle_minscan_angle_min
264  self.scan_msgscan_msg.angle_max = self.scan_angle_maxscan_angle_max
265  # 1.5 degrees
266  self.scan_msgscan_msg.angle_increment = self.scan_angle_incrementscan_angle_increment
267  self.scan_msgscan_msg.time_increment = 0.0
268  self.scan_msgscan_msg.scan_time = 0.1
269  self.scan_msgscan_msg.range_min = self.scan_range_minscan_range_min
270  self.scan_msgscan_msg.range_max = self.scan_range_maxscan_range_max
271  num_samples = int(
272  (self.scan_msgscan_msg.angle_max - self.scan_msgscan_msg.angle_min) /
273  self.scan_msgscan_msg.angle_increment)
274  self.scan_msgscan_msg.ranges = [0.0] * num_samples
275  self.getLaserScangetLaserScan(num_samples)
276  self.scan_pubscan_pub.publish(self.scan_msgscan_msg)
277 
278  def publishTransforms(self, map_to_odom: TransformStamped,
279  odom_to_base_link: TransformStamped) -> None:
280  map_to_odom.header.stamp = \
281  (self.get_clock().now() + Duration(seconds=self.update_durupdate_dur)).to_msg()
282  odom_to_base_link.header.stamp = self.get_clock().now().to_msg()
283  if self.publish_map_odom_tfpublish_map_odom_tf:
284  self.tf_broadcastertf_broadcaster.sendTransform(map_to_odom)
285  self.tf_broadcastertf_broadcaster.sendTransform(odom_to_base_link)
286 
287  def publishOdometry(self, odom_to_base_link: TransformStamped) -> None:
288  odom = Odometry()
289  odom.header.stamp = self.get_clock().now().to_msg()
290  odom.header.frame_id = 'odom'
291  odom.child_frame_id = 'base_link'
292  odom.pose.pose.position.x = odom_to_base_link.transform.translation.x
293  odom.pose.pose.position.y = odom_to_base_link.transform.translation.y
294  odom.pose.pose.orientation = odom_to_base_link.transform.rotation
295  odom.twist.twist = self.curr_cmd_velcurr_cmd_vel
296  self.odom_pubodom_pub.publish(odom)
297 
298  def info(self, msg: str) -> None:
299  self.get_logger().info(msg)
300  return
301 
302  def debug(self, msg: str) -> None:
303  self.get_logger().debug(msg)
304  return
305 
306  def getMap(self) -> None:
307  request = GetMap.Request()
308  if self.map_client.wait_for_service(timeout_sec=5.0):
309  # Request to get map
310  future = self.map_client.call_async(request)
311  rclpy.spin_until_future_complete(self, future)
312  result = future.result()
313  if result is not None:
314  self.mapmap = result.map
315  self.get_logger().info('Laser scan will be populated using map data')
316  else:
317  self.get_logger().warning(
318  'Failed to get map, '
319  'Laser scan will be populated using max range'
320  )
321  else:
322  self.get_logger().warning(
323  'Failed to get map, '
324  'Laser scan will be populated using max range'
325  )
326 
327  def getLaserPose(self) -> tuple[float, float, float]:
328  mat_map_to_odom = transformStampedToMatrix(self.t_map_to_odomt_map_to_odom)
329  mat_odom_to_base = transformStampedToMatrix(self.t_odom_to_base_linkt_odom_to_base_link)
330 
331  mat_map_to_laser = tf_transformations.concatenate_matrices(
332  mat_map_to_odom,
333  mat_odom_to_base,
334  self.mat_base_to_lasermat_base_to_laser
335  )
336  transform = matrixToTransform(mat_map_to_laser)
337 
338  x = transform.translation.x
339  y = transform.translation.y
340  theta = tf_transformations.euler_from_quaternion([
341  transform.rotation.x,
342  transform.rotation.y,
343  transform.rotation.z,
344  transform.rotation.w
345  ])[2]
346 
347  return x, y, theta
348 
349  def getLaserScan(self, num_samples: int) -> None:
350  if self.mapmap is None or self.initial_poseinitial_pose is None or self.mat_base_to_lasermat_base_to_laser is None:
351  if self.use_infuse_inf:
352  self.scan_msgscan_msg.ranges = [float('inf')] * num_samples
353  else:
354  self.scan_msgscan_msg.ranges = [self.scan_msgscan_msg.range_max - 0.1] * num_samples
355  return
356 
357  x0, y0, theta = self.getLaserPosegetLaserPose()
358 
359  mx0, my0 = worldToMap(x0, y0, self.mapmap)
360 
361  if not 0 < mx0 < self.mapmap.info.width or not 0 < my0 < self.mapmap.info.height:
362  # outside map
363  if self.use_infuse_inf:
364  self.scan_msgscan_msg.ranges = [float('inf')] * num_samples
365  else:
366  self.scan_msgscan_msg.ranges = [self.scan_msgscan_msg.range_max - 0.1] * num_samples
367  return
368 
369  for i in range(num_samples):
370  curr_angle = theta + self.scan_msgscan_msg.angle_min + i * self.scan_msgscan_msg.angle_increment
371  x1 = x0 + self.scan_msgscan_msg.range_max * math.cos(curr_angle)
372  y1 = y0 + self.scan_msgscan_msg.range_max * math.sin(curr_angle)
373 
374  mx1, my1 = worldToMap(x1, y1, self.mapmap)
375 
376  line_iterator = LineIterator(mx0, my0, mx1, my1, 0.5)
377 
378  while line_iterator.isValid():
379  mx, my = int(line_iterator.getX()), int(line_iterator.getY())
380 
381  if not 0 < mx < self.mapmap.info.width or not 0 < my < self.mapmap.info.height:
382  # if outside map then check next ray
383  break
384 
385  point_cost = getMapOccupancy(mx, my, self.mapmap)
386 
387  if point_cost >= 60:
388  self.scan_msgscan_msg.ranges[i] = math.sqrt(
389  (int(line_iterator.getX()) - mx0) ** 2 +
390  (int(line_iterator.getY()) - my0) ** 2
391  ) * self.mapmap.info.resolution
392  break
393 
394  line_iterator.advance()
395  if self.scan_msgscan_msg.ranges[i] == 0.0 and self.use_infuse_inf:
396  self.scan_msgscan_msg.ranges[i] = float('inf')
397 
398 
399 def main() -> None:
400  rclpy.init()
401  loopback_simulator = LoopbackSimulator()
402  rclpy.spin(loopback_simulator)
403  loopback_simulator.destroy_node()
404  rclpy.shutdown()
405  exit(0)
406 
407 
408 if __name__ == '__main__':
409  main()
None publishOdometry(self, TransformStamped odom_to_base_link)
None publishTransforms(self, TransformStamped map_to_odom, TransformStamped odom_to_base_link)
None initialPoseCallback(self, PoseWithCovarianceStamped msg)
tuple[float, float, float] getLaserPose(self)
None cmdVelStampedCallback(self, TwistStamped msg)
None getLaserScan(self, int num_samples)