Nav2 Navigation Stack - kilted  kilted
ROS 2 Navigation Stack
loopback_simulator.py
1 # Copyright (c) 2024 Open Navigation LLC
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 
15 
16 import math
17 from typing import Optional
18 
19 from geometry_msgs.msg import (PoseWithCovarianceStamped, Quaternion, TransformStamped, Twist,
20  TwistStamped, Vector3)
21 from nav2_simple_commander.line_iterator import LineIterator
22 from nav_msgs.msg import Odometry
23 from nav_msgs.srv import GetMap
24 import numpy as np
25 import rclpy
26 from rclpy.duration import Duration
27 from rclpy.node import Node
28 from rclpy.qos import DurabilityPolicy, QoSProfile, ReliabilityPolicy
29 from rclpy.timer import Timer
30 from rosgraph_msgs.msg import Clock
31 from sensor_msgs.msg import LaserScan
32 from tf2_ros import Buffer, TransformBroadcaster, TransformListener
33 import tf_transformations
34 
35 from .utils import (addYawToQuat, getMapOccupancy, matrixToTransform, transformStampedToMatrix,
36  worldToMap)
37 
38 """
39 This is a loopback simulator that replaces a physics simulator to create a
40 frictionless, inertialess, and collisionless simulation environment. It
41 accepts cmd_vel messages and publishes odometry & TF messages based on the
42 cumulative velocities received to mimic global localization and simulation.
43 It also accepts initialpose messages to set the initial pose of the robot
44 to place anywhere.
45 """
46 
47 
48 class LoopbackSimulator(Node):
49 
50  def __init__(self) -> None:
51  super().__init__(node_name='loopback_simulator')
52  self.curr_cmd_velcurr_cmd_vel = None
53  self.curr_cmd_vel_timecurr_cmd_vel_time = self.get_clock().now()
54  self.initial_poseinitial_pose: PoseWithCovarianceStamped = None
55  self.timertimer: Optional[Timer] = None
56  self.setupTimersetupTimer = None
57  self.mapmap = None
58  self.mat_base_to_lasermat_base_to_laser: Optional[np.ndarray[np.float64, np.dtype[np.float64]]] = None
59 
60  self.declare_parameter('update_duration', 0.01)
61  self.update_durupdate_dur = self.get_parameter('update_duration').get_parameter_value().double_value
62 
63  self.declare_parameter('base_frame_id', 'base_footprint')
64  self.base_frame_idbase_frame_id = self.get_parameter('base_frame_id').get_parameter_value().string_value
65 
66  self.declare_parameter('map_frame_id', 'map')
67  self.map_frame_idmap_frame_id = self.get_parameter('map_frame_id').get_parameter_value().string_value
68 
69  self.declare_parameter('odom_frame_id', 'odom')
70  self.odom_frame_idodom_frame_id = self.get_parameter('odom_frame_id').get_parameter_value().string_value
71 
72  self.declare_parameter('scan_frame_id', 'base_scan')
73  self.scan_frame_idscan_frame_id = self.get_parameter('scan_frame_id').get_parameter_value().string_value
74 
75  self.declare_parameter('enable_stamped_cmd_vel', True)
76  use_stamped = self.get_parameter('enable_stamped_cmd_vel').get_parameter_value().bool_value
77 
78  self.declare_parameter('scan_publish_dur', 0.1)
79  self.scan_publish_durscan_publish_dur = self.get_parameter(
80  'scan_publish_dur').get_parameter_value().double_value
81 
82  self.declare_parameter('publish_map_odom_tf', True)
83  self.publish_map_odom_tfpublish_map_odom_tf = self.get_parameter(
84  'publish_map_odom_tf').get_parameter_value().bool_value
85 
86  self.declare_parameter('publish_clock', True)
87  self.publish_clockpublish_clock = self.get_parameter('publish_clock').get_parameter_value().bool_value
88 
89  self.declare_parameter('scan_range_min', 0.05)
90  self.scan_range_minscan_range_min = \
91  self.get_parameter('scan_range_min').get_parameter_value().double_value
92 
93  self.declare_parameter('scan_range_max', 30.0)
94  self.scan_range_maxscan_range_max = \
95  self.get_parameter('scan_range_max').get_parameter_value().double_value
96 
97  self.declare_parameter('scan_angle_min', -math.pi)
98  self.scan_angle_minscan_angle_min = \
99  self.get_parameter('scan_angle_min').get_parameter_value().double_value
100 
101  self.declare_parameter('scan_angle_max', math.pi)
102  self.scan_angle_maxscan_angle_max = \
103  self.get_parameter('scan_angle_max').get_parameter_value().double_value
104 
105  self.declare_parameter('scan_angle_increment', 0.0261) # 0.0261 rad = 1.5 degrees
106  self.scan_angle_incrementscan_angle_increment = \
107  self.get_parameter('scan_angle_increment').get_parameter_value().double_value
108 
109  self.declare_parameter('scan_use_inf', True)
110  self.use_infuse_inf = \
111  self.get_parameter('scan_use_inf').get_parameter_value().bool_value
112 
113  self.t_map_to_odomt_map_to_odom = TransformStamped()
114  self.t_map_to_odomt_map_to_odom.header.frame_id = self.map_frame_idmap_frame_id
115  self.t_map_to_odomt_map_to_odom.child_frame_id = self.odom_frame_idodom_frame_id
116  self.t_odom_to_base_linkt_odom_to_base_link = TransformStamped()
117  self.t_odom_to_base_linkt_odom_to_base_link.header.frame_id = self.odom_frame_idodom_frame_id
118  self.t_odom_to_base_linkt_odom_to_base_link.child_frame_id = self.base_frame_idbase_frame_id
119 
120  self.tf_broadcastertf_broadcaster = TransformBroadcaster(self)
121 
122  self.initial_pose_subinitial_pose_sub = self.create_subscription(
123  PoseWithCovarianceStamped,
124  'initialpose', self.initialPoseCallbackinitialPoseCallback, 10)
125  if not use_stamped:
126  self.cmd_vel_subcmd_vel_sub = self.create_subscription(
127  Twist,
128  'cmd_vel', self.cmdVelCallbackcmdVelCallback, 10)
129  else:
130  self.cmd_vel_subcmd_vel_sub = self.create_subscription(
131  TwistStamped,
132  'cmd_vel', self.cmdVelStampedCallbackcmdVelStampedCallback, 10)
133  self.odom_pubodom_pub = self.create_publisher(Odometry, 'odom', 10)
134 
135  sensor_qos = QoSProfile(
136  reliability=ReliabilityPolicy.BEST_EFFORT,
137  durability=DurabilityPolicy.VOLATILE,
138  depth=10)
139  self.scan_pubscan_pub = self.create_publisher(LaserScan, 'scan', sensor_qos)
140 
141  if self.publish_clockpublish_clock:
142  self.clock_timerclock_timer = self.create_timer(0.1, self.clockTimerCallbackclockTimerCallback)
143  self.clock_pubclock_pub = self.create_publisher(Clock, '/clock', 10)
144 
145  self.setupTimersetupTimer = self.create_timer(0.1, self.setupTimerCallbacksetupTimerCallback)
146 
147  self.map_clientmap_client = self.create_client(GetMap, '/map_server/map')
148 
149  self.tf_buffertf_buffer = Buffer()
150  self.tf_listenertf_listener = TransformListener(self.tf_buffertf_buffer, self)
151 
152  self.getMapgetMap()
153 
154  self.infoinfo('Loopback simulator initialized')
155 
156  def getBaseToLaserTf(self) -> None:
157  try:
158  # Wait for transform and lookup
159  transform = self.tf_buffertf_buffer.lookup_transform(
160  self.base_frame_idbase_frame_id, self.scan_frame_idscan_frame_id, rclpy.time.Time())
161  self.mat_base_to_lasermat_base_to_laser = transformStampedToMatrix(transform)
162 
163  except Exception as ex:
164  self.get_logger().error(f'Transform lookup failed: {str(ex)}')
165 
166  def setupTimerCallback(self) -> None:
167  # Publish initial identity odom transform & laser scan to warm up system
168  self.tf_broadcastertf_broadcaster.sendTransform(self.t_odom_to_base_linkt_odom_to_base_link)
169  if self.mat_base_to_lasermat_base_to_laser is None:
170  self.getBaseToLaserTfgetBaseToLaserTf()
171 
172  def clockTimerCallback(self) -> None:
173  msg = Clock()
174  msg.clock = self.get_clock().now().to_msg()
175  self.clock_pubclock_pub.publish(msg)
176 
177  def cmdVelCallback(self, msg: Twist) -> None:
178  self.debugdebug('Received cmd_vel')
179  if self.initial_poseinitial_pose is None:
180  # Don't consider velocities before the initial pose is set
181  return
182  self.curr_cmd_velcurr_cmd_vel = msg
183  self.curr_cmd_vel_timecurr_cmd_vel_time = self.get_clock().now()
184 
185  def cmdVelStampedCallback(self, msg: TwistStamped) -> None:
186  self.debugdebug('Received cmd_vel')
187  if self.initial_poseinitial_pose is None:
188  # Don't consider velocities before the initial pose is set
189  return
190  self.curr_cmd_velcurr_cmd_vel = msg.twist
191  self.curr_cmd_vel_timecurr_cmd_vel_time = rclpy.time.Time.from_msg(msg.header.stamp)
192 
193  def initialPoseCallback(self, msg: PoseWithCovarianceStamped) -> None:
194  self.infoinfo('Received initial pose!')
195  if self.initial_poseinitial_pose is None:
196  # Initialize transforms (map->odom as input pose, odom->base_link start from identity)
197  self.initial_poseinitial_pose = msg.pose.pose
198  self.t_map_to_odomt_map_to_odom.transform.translation.x = self.initial_poseinitial_pose.position.x
199  self.t_map_to_odomt_map_to_odom.transform.translation.y = self.initial_poseinitial_pose.position.y
200  self.t_map_to_odomt_map_to_odom.transform.translation.z = 0.0
201  self.t_map_to_odomt_map_to_odom.transform.rotation = self.initial_poseinitial_pose.orientation
202  self.t_odom_to_base_linkt_odom_to_base_link.transform.translation = Vector3()
203  self.t_odom_to_base_linkt_odom_to_base_link.transform.rotation = Quaternion()
204  self.publishTransformspublishTransforms(self.t_map_to_odomt_map_to_odom, self.t_odom_to_base_linkt_odom_to_base_link)
205 
206  # Start republication timer and velocity processing
207  if self.setupTimersetupTimer is not None:
208  self.setupTimersetupTimer.cancel()
209  self.setupTimersetupTimer.destroy()
210  self.setupTimersetupTimer = None
211  self.timertimer = self.create_timer(self.update_durupdate_dur, self.timerCallbacktimerCallback)
212  self.timer_lasertimer_laser = self.create_timer(self.scan_publish_durscan_publish_dur, self.publishLaserScanpublishLaserScan)
213  return
214 
215  self.initial_poseinitial_pose = msg.pose.pose
216 
217  # Adjust map->odom transform based on new initial pose, keeping odom->base_link the same
218  t_map_to_base_link = TransformStamped()
219  t_map_to_base_link.header = msg.header
220  t_map_to_base_link.child_frame_id = self.base_frame_idbase_frame_id
221  t_map_to_base_link.transform.translation.x = self.initial_poseinitial_pose.position.x
222  t_map_to_base_link.transform.translation.y = self.initial_poseinitial_pose.position.y
223  t_map_to_base_link.transform.translation.z = 0.0
224  t_map_to_base_link.transform.rotation = self.initial_poseinitial_pose.orientation
225  mat_map_to_base_link = transformStampedToMatrix(t_map_to_base_link)
226  mat_odom_to_base_link = transformStampedToMatrix(self.t_odom_to_base_linkt_odom_to_base_link)
227  mat_base_link_to_odom = tf_transformations.inverse_matrix(mat_odom_to_base_link)
228  mat_map_to_odom = \
229  tf_transformations.concatenate_matrices(mat_map_to_base_link, mat_base_link_to_odom)
230  self.t_map_to_odomt_map_to_odom.transform = matrixToTransform(mat_map_to_odom)
231 
232  def timerCallback(self) -> None:
233  # If no data, just republish existing transforms without change
234  one_sec = Duration(seconds=1)
235  if self.curr_cmd_velcurr_cmd_vel is None or self.get_clock().now() - self.curr_cmd_vel_timecurr_cmd_vel_time > one_sec:
236  self.publishTransformspublishTransforms(self.t_map_to_odomt_map_to_odom, self.t_odom_to_base_linkt_odom_to_base_link)
237  self.curr_cmd_velcurr_cmd_vel = None
238  return
239 
240  # Update odom->base_link from cmd_vel
241  dx = self.curr_cmd_velcurr_cmd_vel.linear.x * self.update_durupdate_dur
242  dy = self.curr_cmd_velcurr_cmd_vel.linear.y * self.update_durupdate_dur
243  dth = self.curr_cmd_velcurr_cmd_vel.angular.z * self.update_durupdate_dur
244  q = [self.t_odom_to_base_linkt_odom_to_base_link.transform.rotation.x,
245  self.t_odom_to_base_linkt_odom_to_base_link.transform.rotation.y,
246  self.t_odom_to_base_linkt_odom_to_base_link.transform.rotation.z,
247  self.t_odom_to_base_linkt_odom_to_base_link.transform.rotation.w]
248  _, _, yaw = tf_transformations.euler_from_quaternion(q)
249  self.t_odom_to_base_linkt_odom_to_base_link.transform.translation.x += dx * math.cos(yaw) - dy * math.sin(yaw)
250  self.t_odom_to_base_linkt_odom_to_base_link.transform.translation.y += dx * math.sin(yaw) + dy * math.cos(yaw)
251  self.t_odom_to_base_linkt_odom_to_base_link.transform.rotation = \
252  addYawToQuat(self.t_odom_to_base_linkt_odom_to_base_link.transform.rotation, dth)
253 
254  self.publishTransformspublishTransforms(self.t_map_to_odomt_map_to_odom, self.t_odom_to_base_linkt_odom_to_base_link)
255  self.publishOdometrypublishOdometry(self.t_odom_to_base_linkt_odom_to_base_link)
256 
257  def publishLaserScan(self) -> None:
258  # Publish a bogus laser scan for collision monitor
259  self.scan_msgscan_msg = LaserScan()
260  self.scan_msgscan_msg.header.stamp = (self.get_clock().now()).to_msg()
261  self.scan_msgscan_msg.header.frame_id = self.scan_frame_idscan_frame_id
262  self.scan_msgscan_msg.angle_min = self.scan_angle_minscan_angle_min
263  self.scan_msgscan_msg.angle_max = self.scan_angle_maxscan_angle_max
264  # 1.5 degrees
265  self.scan_msgscan_msg.angle_increment = self.scan_angle_incrementscan_angle_increment
266  self.scan_msgscan_msg.time_increment = 0.0
267  self.scan_msgscan_msg.scan_time = 0.1
268  self.scan_msgscan_msg.range_min = self.scan_range_minscan_range_min
269  self.scan_msgscan_msg.range_max = self.scan_range_maxscan_range_max
270  num_samples = int(
271  (self.scan_msgscan_msg.angle_max - self.scan_msgscan_msg.angle_min) /
272  self.scan_msgscan_msg.angle_increment)
273  self.scan_msgscan_msg.ranges = [0.0] * num_samples
274  self.getLaserScangetLaserScan(num_samples)
275  self.scan_pubscan_pub.publish(self.scan_msgscan_msg)
276 
277  def publishTransforms(self, map_to_odom: TransformStamped,
278  odom_to_base_link: TransformStamped) -> None:
279  map_to_odom.header.stamp = \
280  (self.get_clock().now() + Duration(seconds=self.update_durupdate_dur)).to_msg()
281  odom_to_base_link.header.stamp = self.get_clock().now().to_msg()
282  if self.publish_map_odom_tfpublish_map_odom_tf:
283  self.tf_broadcastertf_broadcaster.sendTransform(map_to_odom)
284  self.tf_broadcastertf_broadcaster.sendTransform(odom_to_base_link)
285 
286  def publishOdometry(self, odom_to_base_link: TransformStamped) -> None:
287  odom = Odometry()
288  odom.header.stamp = self.get_clock().now().to_msg()
289  odom.header.frame_id = 'odom'
290  odom.child_frame_id = 'base_link'
291  odom.pose.pose.position.x = odom_to_base_link.transform.translation.x
292  odom.pose.pose.position.y = odom_to_base_link.transform.translation.y
293  odom.pose.pose.orientation = odom_to_base_link.transform.rotation
294  odom.twist.twist = self.curr_cmd_velcurr_cmd_vel
295  self.odom_pubodom_pub.publish(odom)
296 
297  def info(self, msg: str) -> None:
298  self.get_logger().info(msg)
299  return
300 
301  def debug(self, msg: str) -> None:
302  self.get_logger().debug(msg)
303  return
304 
305  def getMap(self) -> None:
306  request = GetMap.Request()
307  if self.map_clientmap_client.wait_for_service(timeout_sec=5.0):
308  # Request to get map
309  future = self.map_clientmap_client.call_async(request)
310  rclpy.spin_until_future_complete(self, future)
311  result = future.result()
312  if result is not None:
313  self.mapmap = result.map
314  self.get_logger().info('Laser scan will be populated using map data')
315  else:
316  self.get_logger().warn(
317  'Failed to get map, '
318  'Laser scan will be populated using max range'
319  )
320  else:
321  self.get_logger().warn(
322  'Failed to get map, '
323  'Laser scan will be populated using max range'
324  )
325 
326  def getLaserPose(self) -> tuple[float, float, float]:
327  mat_map_to_odom = transformStampedToMatrix(self.t_map_to_odomt_map_to_odom)
328  mat_odom_to_base = transformStampedToMatrix(self.t_odom_to_base_linkt_odom_to_base_link)
329 
330  mat_map_to_laser = tf_transformations.concatenate_matrices(
331  mat_map_to_odom,
332  mat_odom_to_base,
333  self.mat_base_to_lasermat_base_to_laser
334  )
335  transform = matrixToTransform(mat_map_to_laser)
336 
337  x = transform.translation.x
338  y = transform.translation.y
339  theta = tf_transformations.euler_from_quaternion([
340  transform.rotation.x,
341  transform.rotation.y,
342  transform.rotation.z,
343  transform.rotation.w
344  ])[2]
345 
346  return x, y, theta
347 
348  def getLaserScan(self, num_samples: int) -> None:
349  if self.mapmap is None or self.initial_poseinitial_pose is None or self.mat_base_to_lasermat_base_to_laser is None:
350  if self.use_infuse_inf:
351  self.scan_msgscan_msg.ranges = [float('inf')] * num_samples
352  else:
353  self.scan_msgscan_msg.ranges = [self.scan_msgscan_msg.range_max - 0.1] * num_samples
354  return
355 
356  x0, y0, theta = self.getLaserPosegetLaserPose()
357 
358  mx0, my0 = worldToMap(x0, y0, self.mapmap)
359 
360  if not 0 < mx0 < self.mapmap.info.width or not 0 < my0 < self.mapmap.info.height:
361  # outside map
362  if self.use_infuse_inf:
363  self.scan_msgscan_msg.ranges = [float('inf')] * num_samples
364  else:
365  self.scan_msgscan_msg.ranges = [self.scan_msgscan_msg.range_max - 0.1] * num_samples
366  return
367 
368  for i in range(num_samples):
369  curr_angle = theta + self.scan_msgscan_msg.angle_min + i * self.scan_msgscan_msg.angle_increment
370  x1 = x0 + self.scan_msgscan_msg.range_max * math.cos(curr_angle)
371  y1 = y0 + self.scan_msgscan_msg.range_max * math.sin(curr_angle)
372 
373  mx1, my1 = worldToMap(x1, y1, self.mapmap)
374 
375  line_iterator = LineIterator(mx0, my0, mx1, my1, 0.5)
376 
377  while line_iterator.isValid():
378  mx, my = int(line_iterator.getX()), int(line_iterator.getY())
379 
380  if not 0 < mx < self.mapmap.info.width or not 0 < my < self.mapmap.info.height:
381  # if outside map then check next ray
382  break
383 
384  point_cost = getMapOccupancy(mx, my, self.mapmap)
385 
386  if point_cost >= 60:
387  self.scan_msgscan_msg.ranges[i] = math.sqrt(
388  (int(line_iterator.getX()) - mx0) ** 2 +
389  (int(line_iterator.getY()) - my0) ** 2
390  ) * self.mapmap.info.resolution
391  break
392 
393  line_iterator.advance()
394  if self.scan_msgscan_msg.ranges[i] == 0.0 and self.use_infuse_inf:
395  self.scan_msgscan_msg.ranges[i] = float('inf')
396 
397 
398 def main() -> None:
399  rclpy.init()
400  loopback_simulator = LoopbackSimulator()
401  rclpy.spin(loopback_simulator)
402  loopback_simulator.destroy_node()
403  rclpy.shutdown()
404  exit(0)
405 
406 
407 if __name__ == '__main__':
408  main()
None publishOdometry(self, TransformStamped odom_to_base_link)
None publishTransforms(self, TransformStamped map_to_odom, TransformStamped odom_to_base_link)
None initialPoseCallback(self, PoseWithCovarianceStamped msg)
tuple[float, float, float] getLaserPose(self)
None cmdVelStampedCallback(self, TwistStamped msg)
None getLaserScan(self, int num_samples)