Nav2 Navigation Stack - rolling  main
ROS 2 Navigation Stack
loopback_simulator.py
1 # Copyright (c) 2024 Open Navigation LLC
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 
15 
16 import math
17 from typing import Optional, Union
18 
19 from geometry_msgs.msg import (Pose, PoseWithCovarianceStamped, Quaternion, TransformStamped,
20  Twist, TwistStamped, Vector3)
21 from nav2_loopback_sim.utils import (addYawToQuat, getMapOccupancy, matrixToTransform,
22  transformStampedToMatrix, worldToMap)
23 from nav2_simple_commander.line_iterator import LineIterator
24 from nav_msgs.msg import OccupancyGrid, Odometry
25 from nav_msgs.srv import GetMap
26 import numpy as np
27 import rclpy
28 from rclpy.client import Client
29 from rclpy.duration import Duration
30 from rclpy.node import Node
31 from rclpy.qos import DurabilityPolicy, QoSProfile, ReliabilityPolicy
32 from rclpy.subscription import Subscription
33 from rclpy.timer import Timer
34 from rosgraph_msgs.msg import Clock
35 from sensor_msgs.msg import LaserScan
36 from tf2_ros import Buffer, TransformBroadcaster, TransformListener
37 import tf_transformations
38 
39 """
40 This is a loopback simulator that replaces a physics simulator to create a
41 frictionless, inertialess, and collisionless simulation environment. It
42 accepts cmd_vel messages and publishes odometry & TF messages based on the
43 cumulative velocities received to mimic global localization and simulation.
44 It also accepts initialpose messages to set the initial pose of the robot
45 to place anywhere.
46 """
47 
48 
49 class LoopbackSimulator(Node):
50 
51  def __init__(self) -> None:
52  super().__init__(node_name='loopback_simulator')
53  self.curr_cmd_velcurr_cmd_vel: Optional[Twist] = None
54  self.curr_cmd_vel_timecurr_cmd_vel_time = self.get_clock().now()
55  self.initial_poseinitial_pose: Optional[Pose] = None
56  self.timertimer: Optional[Timer] = None
57  self.setupTimersetupTimer = None
58  self.mapmap: Optional[OccupancyGrid] = None
59  self.mat_base_to_lasermat_base_to_laser: Optional[np.ndarray[np.float64, np.dtype[np.float64]]] = None
60 
61  self.declare_parameter('update_duration', 0.01)
62  self.update_durupdate_dur = self.get_parameter('update_duration').get_parameter_value().double_value
63 
64  self.declare_parameter('base_frame_id', 'base_footprint')
65  self.base_frame_idbase_frame_id = self.get_parameter('base_frame_id').get_parameter_value().string_value
66 
67  self.declare_parameter('map_frame_id', 'map')
68  self.map_frame_idmap_frame_id = self.get_parameter('map_frame_id').get_parameter_value().string_value
69 
70  self.declare_parameter('odom_frame_id', 'odom')
71  self.odom_frame_idodom_frame_id = self.get_parameter('odom_frame_id').get_parameter_value().string_value
72 
73  self.declare_parameter('scan_frame_id', 'base_scan')
74  self.scan_frame_idscan_frame_id = self.get_parameter('scan_frame_id').get_parameter_value().string_value
75 
76  self.declare_parameter('enable_stamped_cmd_vel', True)
77  use_stamped = self.get_parameter('enable_stamped_cmd_vel').get_parameter_value().bool_value
78 
79  self.declare_parameter('scan_publish_dur', 0.1)
80  self.scan_publish_durscan_publish_dur = self.get_parameter(
81  'scan_publish_dur').get_parameter_value().double_value
82 
83  self.declare_parameter('publish_map_odom_tf', True)
84  self.publish_map_odom_tfpublish_map_odom_tf = self.get_parameter(
85  'publish_map_odom_tf').get_parameter_value().bool_value
86 
87  self.declare_parameter('publish_clock', True)
88  self.publish_clockpublish_clock = self.get_parameter('publish_clock').get_parameter_value().bool_value
89 
90  self.declare_parameter('scan_range_min', 0.05)
91  self.scan_range_minscan_range_min = \
92  self.get_parameter('scan_range_min').get_parameter_value().double_value
93 
94  self.declare_parameter('scan_range_max', 30.0)
95  self.scan_range_maxscan_range_max = \
96  self.get_parameter('scan_range_max').get_parameter_value().double_value
97 
98  self.declare_parameter('scan_angle_min', -math.pi)
99  self.scan_angle_minscan_angle_min = \
100  self.get_parameter('scan_angle_min').get_parameter_value().double_value
101 
102  self.declare_parameter('scan_angle_max', math.pi)
103  self.scan_angle_maxscan_angle_max = \
104  self.get_parameter('scan_angle_max').get_parameter_value().double_value
105 
106  self.declare_parameter('scan_angle_increment', 0.0261) # 0.0261 rad = 1.5 degrees
107  self.scan_angle_incrementscan_angle_increment = \
108  self.get_parameter('scan_angle_increment').get_parameter_value().double_value
109 
110  self.declare_parameter('scan_use_inf', True)
111  self.use_infuse_inf = \
112  self.get_parameter('scan_use_inf').get_parameter_value().bool_value
113 
114  self.t_map_to_odomt_map_to_odom = TransformStamped()
115  self.t_map_to_odomt_map_to_odom.header.frame_id = self.map_frame_idmap_frame_id
116  self.t_map_to_odomt_map_to_odom.child_frame_id = self.odom_frame_idodom_frame_id
117  self.t_odom_to_base_linkt_odom_to_base_link = TransformStamped()
118  self.t_odom_to_base_linkt_odom_to_base_link.header.frame_id = self.odom_frame_idodom_frame_id
119  self.t_odom_to_base_linkt_odom_to_base_link.child_frame_id = self.base_frame_idbase_frame_id
120 
121  self.tf_broadcastertf_broadcaster = TransformBroadcaster(self)
122 
123  self.initial_pose_subinitial_pose_sub = self.create_subscription(
124  PoseWithCovarianceStamped,
125  'initialpose', self.initialPoseCallbackinitialPoseCallback, 10)
126 
127  self.cmd_vel_subcmd_vel_sub: Union[Subscription[Twist], Subscription[TwistStamped]]
128 
129  if not use_stamped:
130  self.cmd_vel_subcmd_vel_sub = self.create_subscription(
131  Twist,
132  'cmd_vel', self.cmdVelCallbackcmdVelCallback, 10)
133  else:
134  self.cmd_vel_subcmd_vel_sub = self.create_subscription(
135  TwistStamped,
136  'cmd_vel', self.cmdVelStampedCallbackcmdVelStampedCallback, 10)
137  self.odom_pubodom_pub = self.create_publisher(Odometry, 'odom', 10)
138 
139  sensor_qos = QoSProfile(
140  reliability=ReliabilityPolicy.BEST_EFFORT,
141  durability=DurabilityPolicy.VOLATILE,
142  depth=10)
143  self.scan_pubscan_pub = self.create_publisher(LaserScan, 'scan', sensor_qos)
144 
145  if self.publish_clockpublish_clock:
146  self.clock_timerclock_timer = self.create_timer(0.1, self.clockTimerCallbackclockTimerCallback)
147  self.clock_pubclock_pub = self.create_publisher(Clock, '/clock', 10)
148 
149  self.setupTimersetupTimer = self.create_timer(0.1, self.setupTimerCallbacksetupTimerCallback)
150 
151  self.map_client: Client[GetMap.Request, GetMap.Response] = \
152  self.create_client(GetMap, '/map_server/map')
153 
154  self.tf_buffertf_buffer = Buffer()
155  self.tf_listenertf_listener = TransformListener(self.tf_buffertf_buffer, self)
156 
157  self.getMapgetMap()
158 
159  self.infoinfo('Loopback simulator initialized')
160 
161  def getBaseToLaserTf(self) -> None:
162  try:
163  # Wait for transform and lookup
164  transform = self.tf_buffertf_buffer.lookup_transform(
165  self.base_frame_idbase_frame_id, self.scan_frame_idscan_frame_id, rclpy.time.Time())
166  self.mat_base_to_lasermat_base_to_laser = transformStampedToMatrix(transform)
167 
168  except Exception as ex:
169  self.get_logger().error(f'Transform lookup failed: {str(ex)}')
170 
171  def setupTimerCallback(self) -> None:
172  # Publish initial identity odom transform & laser scan to warm up system
173  self.tf_broadcastertf_broadcaster.sendTransform(self.t_odom_to_base_linkt_odom_to_base_link)
174  if self.mat_base_to_lasermat_base_to_laser is None:
175  self.getBaseToLaserTfgetBaseToLaserTf()
176 
177  def clockTimerCallback(self) -> None:
178  msg = Clock()
179  msg.clock = self.get_clock().now().to_msg()
180  self.clock_pubclock_pub.publish(msg)
181 
182  def cmdVelCallback(self, msg: Twist) -> None:
183  self.debugdebug('Received cmd_vel')
184  if self.initial_poseinitial_pose is None:
185  # Don't consider velocities before the initial pose is set
186  return
187  self.curr_cmd_velcurr_cmd_vel = msg
188  self.curr_cmd_vel_timecurr_cmd_vel_time = self.get_clock().now()
189 
190  def cmdVelStampedCallback(self, msg: TwistStamped) -> None:
191  self.debugdebug('Received cmd_vel')
192  if self.initial_poseinitial_pose is None:
193  # Don't consider velocities before the initial pose is set
194  return
195  self.curr_cmd_velcurr_cmd_vel = msg.twist
196  self.curr_cmd_vel_timecurr_cmd_vel_time = rclpy.time.Time.from_msg(msg.header.stamp)
197 
198  def initialPoseCallback(self, msg: PoseWithCovarianceStamped) -> None:
199  self.infoinfo('Received initial pose!')
200  if self.initial_poseinitial_pose is None:
201  # Initialize transforms (map->odom as input pose, odom->base_link start from identity)
202  self.initial_poseinitial_pose = msg.pose.pose
203  self.t_map_to_odomt_map_to_odom.transform.translation.x = self.initial_poseinitial_pose.position.x
204  self.t_map_to_odomt_map_to_odom.transform.translation.y = self.initial_poseinitial_pose.position.y
205  self.t_map_to_odomt_map_to_odom.transform.translation.z = 0.0
206  self.t_map_to_odomt_map_to_odom.transform.rotation = self.initial_poseinitial_pose.orientation
207  self.t_odom_to_base_linkt_odom_to_base_link.transform.translation = Vector3()
208  self.t_odom_to_base_linkt_odom_to_base_link.transform.rotation = Quaternion()
209  self.publishTransformspublishTransforms(self.t_map_to_odomt_map_to_odom, self.t_odom_to_base_linkt_odom_to_base_link)
210 
211  # Start republication timer and velocity processing
212  if self.setupTimersetupTimer is not None:
213  self.setupTimersetupTimer.cancel()
214  self.setupTimersetupTimer.destroy()
215  self.setupTimersetupTimer = None
216  self.timertimer = self.create_timer(self.update_durupdate_dur, self.timerCallbacktimerCallback)
217  self.timer_lasertimer_laser = self.create_timer(self.scan_publish_durscan_publish_dur, self.publishLaserScanpublishLaserScan)
218  return
219 
220  self.initial_poseinitial_pose = msg.pose.pose
221 
222  # Adjust map->odom transform based on new initial pose, keeping odom->base_link the same
223  t_map_to_base_link = TransformStamped()
224  t_map_to_base_link.header = msg.header
225  t_map_to_base_link.child_frame_id = self.base_frame_idbase_frame_id
226  t_map_to_base_link.transform.translation.x = self.initial_poseinitial_pose.position.x
227  t_map_to_base_link.transform.translation.y = self.initial_poseinitial_pose.position.y
228  t_map_to_base_link.transform.translation.z = 0.0
229  t_map_to_base_link.transform.rotation = self.initial_poseinitial_pose.orientation
230  mat_map_to_base_link = transformStampedToMatrix(t_map_to_base_link)
231  mat_odom_to_base_link = transformStampedToMatrix(self.t_odom_to_base_linkt_odom_to_base_link)
232  mat_base_link_to_odom = tf_transformations.inverse_matrix(mat_odom_to_base_link)
233  mat_map_to_odom = \
234  tf_transformations.concatenate_matrices(mat_map_to_base_link, mat_base_link_to_odom)
235  self.t_map_to_odomt_map_to_odom.transform = matrixToTransform(mat_map_to_odom)
236 
237  def timerCallback(self) -> None:
238  # If no data, just republish existing transforms without change
239  one_sec = Duration(seconds=1)
240  if self.curr_cmd_velcurr_cmd_vel is None or self.get_clock().now() - self.curr_cmd_vel_timecurr_cmd_vel_time > one_sec:
241  self.publishTransformspublishTransforms(self.t_map_to_odomt_map_to_odom, self.t_odom_to_base_linkt_odom_to_base_link)
242  self.curr_cmd_velcurr_cmd_vel = None
243  return
244 
245  # Update odom->base_link from cmd_vel
246  dx = self.curr_cmd_velcurr_cmd_vel.linear.x * self.update_durupdate_dur
247  dy = self.curr_cmd_velcurr_cmd_vel.linear.y * self.update_durupdate_dur
248  dth = self.curr_cmd_velcurr_cmd_vel.angular.z * self.update_durupdate_dur
249  q = [self.t_odom_to_base_linkt_odom_to_base_link.transform.rotation.x,
250  self.t_odom_to_base_linkt_odom_to_base_link.transform.rotation.y,
251  self.t_odom_to_base_linkt_odom_to_base_link.transform.rotation.z,
252  self.t_odom_to_base_linkt_odom_to_base_link.transform.rotation.w]
253  _, _, yaw = tf_transformations.euler_from_quaternion(q)
254  self.t_odom_to_base_linkt_odom_to_base_link.transform.translation.x += dx * math.cos(yaw) - dy * math.sin(yaw)
255  self.t_odom_to_base_linkt_odom_to_base_link.transform.translation.y += dx * math.sin(yaw) + dy * math.cos(yaw)
256  self.t_odom_to_base_linkt_odom_to_base_link.transform.rotation = \
257  addYawToQuat(self.t_odom_to_base_linkt_odom_to_base_link.transform.rotation, dth)
258 
259  self.publishTransformspublishTransforms(self.t_map_to_odomt_map_to_odom, self.t_odom_to_base_linkt_odom_to_base_link)
260  self.publishOdometrypublishOdometry(self.t_odom_to_base_linkt_odom_to_base_link)
261 
262  def publishLaserScan(self) -> None:
263  # Publish a bogus laser scan for collision monitor
264  self.scan_msgscan_msg = LaserScan()
265  self.scan_msgscan_msg.header.stamp = (self.get_clock().now()).to_msg()
266  self.scan_msgscan_msg.header.frame_id = self.scan_frame_idscan_frame_id
267  self.scan_msgscan_msg.angle_min = self.scan_angle_minscan_angle_min
268  self.scan_msgscan_msg.angle_max = self.scan_angle_maxscan_angle_max
269  # 1.5 degrees
270  self.scan_msgscan_msg.angle_increment = self.scan_angle_incrementscan_angle_increment
271  self.scan_msgscan_msg.time_increment = 0.0
272  self.scan_msgscan_msg.scan_time = 0.1
273  self.scan_msgscan_msg.range_min = self.scan_range_minscan_range_min
274  self.scan_msgscan_msg.range_max = self.scan_range_maxscan_range_max
275  num_samples = int(
276  (self.scan_msgscan_msg.angle_max - self.scan_msgscan_msg.angle_min) /
277  self.scan_msgscan_msg.angle_increment)
278  self.scan_msgscan_msg.ranges = [0.0] * num_samples
279  self.getLaserScangetLaserScan(num_samples)
280  self.scan_pubscan_pub.publish(self.scan_msgscan_msg)
281 
282  def publishTransforms(self, map_to_odom: TransformStamped,
283  odom_to_base_link: TransformStamped) -> None:
284  map_to_odom.header.stamp = \
285  (self.get_clock().now() + Duration(seconds=self.update_durupdate_dur)).to_msg()
286  odom_to_base_link.header.stamp = self.get_clock().now().to_msg()
287  if self.publish_map_odom_tfpublish_map_odom_tf:
288  self.tf_broadcastertf_broadcaster.sendTransform(map_to_odom)
289  self.tf_broadcastertf_broadcaster.sendTransform(odom_to_base_link)
290 
291  def publishOdometry(self, odom_to_base_link: TransformStamped) -> None:
292  odom = Odometry()
293  odom.header.stamp = self.get_clock().now().to_msg()
294  odom.header.frame_id = 'odom'
295  odom.child_frame_id = 'base_link'
296  odom.pose.pose.position.x = odom_to_base_link.transform.translation.x
297  odom.pose.pose.position.y = odom_to_base_link.transform.translation.y
298  odom.pose.pose.orientation = odom_to_base_link.transform.rotation
299  if self.curr_cmd_velcurr_cmd_vel is not None:
300  odom.twist.twist = self.curr_cmd_velcurr_cmd_vel
301  self.odom_pubodom_pub.publish(odom)
302 
303  def info(self, msg: str) -> None:
304  self.get_logger().info(msg)
305  return
306 
307  def debug(self, msg: str) -> None:
308  self.get_logger().debug(msg)
309  return
310 
311  def getMap(self) -> None:
312  request = GetMap.Request()
313  if self.map_client.wait_for_service(timeout_sec=5.0):
314  # Request to get map
315  future = self.map_client.call_async(request)
316  rclpy.spin_until_future_complete(self, future)
317  result = future.result()
318  if result is not None:
319  self.mapmap = result.map
320  self.get_logger().info('Laser scan will be populated using map data')
321  else:
322  self.get_logger().warning(
323  'Failed to get map, '
324  'Laser scan will be populated using max range'
325  )
326  else:
327  self.get_logger().warning(
328  'Failed to get map, '
329  'Laser scan will be populated using max range'
330  )
331 
332  def getLaserPose(self) -> tuple[float, float, float]:
333  mat_map_to_odom = transformStampedToMatrix(self.t_map_to_odomt_map_to_odom)
334  mat_odom_to_base = transformStampedToMatrix(self.t_odom_to_base_linkt_odom_to_base_link)
335 
336  mat_map_to_laser = tf_transformations.concatenate_matrices(
337  mat_map_to_odom,
338  mat_odom_to_base,
339  self.mat_base_to_lasermat_base_to_laser
340  )
341  transform = matrixToTransform(mat_map_to_laser)
342 
343  x = transform.translation.x
344  y = transform.translation.y
345  theta = tf_transformations.euler_from_quaternion([
346  transform.rotation.x,
347  transform.rotation.y,
348  transform.rotation.z,
349  transform.rotation.w
350  ])[2]
351 
352  return x, y, theta
353 
354  def getLaserScan(self, num_samples: int) -> None:
355  if self.mapmap is None or self.initial_poseinitial_pose is None or self.mat_base_to_lasermat_base_to_laser is None:
356  if self.use_infuse_inf:
357  self.scan_msgscan_msg.ranges = [float('inf')] * num_samples
358  else:
359  self.scan_msgscan_msg.ranges = [self.scan_msgscan_msg.range_max - 0.1] * num_samples
360  return
361 
362  x0, y0, theta = self.getLaserPosegetLaserPose()
363 
364  mx0, my0 = worldToMap(x0, y0, self.mapmap)
365 
366  if not 0 < mx0 < self.mapmap.info.width or not 0 < my0 < self.mapmap.info.height:
367  # outside map
368  if self.use_infuse_inf:
369  self.scan_msgscan_msg.ranges = [float('inf')] * num_samples
370  else:
371  self.scan_msgscan_msg.ranges = [self.scan_msgscan_msg.range_max - 0.1] * num_samples
372  return
373 
374  for i in range(num_samples):
375  curr_angle = theta + self.scan_msgscan_msg.angle_min + i * self.scan_msgscan_msg.angle_increment
376  x1 = x0 + self.scan_msgscan_msg.range_max * math.cos(curr_angle)
377  y1 = y0 + self.scan_msgscan_msg.range_max * math.sin(curr_angle)
378 
379  mx1, my1 = worldToMap(x1, y1, self.mapmap)
380 
381  line_iterator = LineIterator(mx0, my0, mx1, my1, 0.5)
382 
383  while line_iterator.isValid():
384  mx, my = int(line_iterator.getX()), int(line_iterator.getY())
385 
386  if not 0 < mx < self.mapmap.info.width or not 0 < my < self.mapmap.info.height:
387  # if outside map then check next ray
388  break
389 
390  point_cost = getMapOccupancy(mx, my, self.mapmap)
391 
392  if point_cost >= 60:
393  self.scan_msgscan_msg.ranges[i] = math.sqrt(
394  (int(line_iterator.getX()) - mx0) ** 2 +
395  (int(line_iterator.getY()) - my0) ** 2
396  ) * self.mapmap.info.resolution
397  break
398 
399  line_iterator.advance()
400  if self.scan_msgscan_msg.ranges[i] == 0.0 and self.use_infuse_inf:
401  self.scan_msgscan_msg.ranges[i] = float('inf')
402 
403 
404 def main() -> None:
405  rclpy.init()
406  loopback_simulator = LoopbackSimulator()
407  rclpy.spin(loopback_simulator)
408  loopback_simulator.destroy_node()
409  rclpy.shutdown()
410  exit(0)
411 
412 
413 if __name__ == '__main__':
414  main()
None publishOdometry(self, TransformStamped odom_to_base_link)
None publishTransforms(self, TransformStamped map_to_odom, TransformStamped odom_to_base_link)
None initialPoseCallback(self, PoseWithCovarianceStamped msg)
tuple[float, float, float] getLaserPose(self)
None cmdVelStampedCallback(self, TwistStamped msg)
None getLaserScan(self, int num_samples)