17 from typing
import Optional
19 from geometry_msgs.msg import (PoseWithCovarianceStamped, Quaternion, TransformStamped, Twist,
20 TwistStamped, Vector3)
21 from nav2_loopback_sim.utils
import (addYawToQuat, getMapOccupancy, matrixToTransform,
22 transformStampedToMatrix, worldToMap)
25 from nav_msgs.srv
import GetMap
28 from rclpy.client
import Client
29 from rclpy.duration
import Duration
30 from rclpy.node
import Node
31 from rclpy.qos
import DurabilityPolicy, QoSProfile, ReliabilityPolicy
32 from rclpy.timer
import Timer
33 from rosgraph_msgs.msg
import Clock
34 from sensor_msgs.msg
import LaserScan
35 from tf2_ros
import Buffer, TransformBroadcaster, TransformListener
36 import tf_transformations
39 This is a loopback simulator that replaces a physics simulator to create a
40 frictionless, inertialess, and collisionless simulation environment. It
41 accepts cmd_vel messages and publishes odometry & TF messages based on the
42 cumulative velocities received to mimic global localization and simulation.
43 It also accepts initialpose messages to set the initial pose of the robot
50 def __init__(self) -> None:
51 super().__init__(node_name=
'loopback_simulator')
54 self.
initial_poseinitial_pose: PoseWithCovarianceStamped =
None
55 self.
timertimer: Optional[Timer] =
None
58 self.
mat_base_to_lasermat_base_to_laser: Optional[np.ndarray[np.float64, np.dtype[np.float64]]] =
None
60 self.declare_parameter(
'update_duration', 0.01)
61 self.
update_durupdate_dur = self.get_parameter(
'update_duration').get_parameter_value().double_value
63 self.declare_parameter(
'base_frame_id',
'base_footprint')
64 self.
base_frame_idbase_frame_id = self.get_parameter(
'base_frame_id').get_parameter_value().string_value
66 self.declare_parameter(
'map_frame_id',
'map')
67 self.
map_frame_idmap_frame_id = self.get_parameter(
'map_frame_id').get_parameter_value().string_value
69 self.declare_parameter(
'odom_frame_id',
'odom')
70 self.
odom_frame_idodom_frame_id = self.get_parameter(
'odom_frame_id').get_parameter_value().string_value
72 self.declare_parameter(
'scan_frame_id',
'base_scan')
73 self.
scan_frame_idscan_frame_id = self.get_parameter(
'scan_frame_id').get_parameter_value().string_value
75 self.declare_parameter(
'enable_stamped_cmd_vel',
True)
76 use_stamped = self.get_parameter(
'enable_stamped_cmd_vel').get_parameter_value().bool_value
78 self.declare_parameter(
'scan_publish_dur', 0.1)
80 'scan_publish_dur').get_parameter_value().double_value
82 self.declare_parameter(
'publish_map_odom_tf',
True)
84 'publish_map_odom_tf').get_parameter_value().bool_value
86 self.declare_parameter(
'publish_clock',
True)
87 self.
publish_clockpublish_clock = self.get_parameter(
'publish_clock').get_parameter_value().bool_value
89 self.declare_parameter(
'scan_range_min', 0.05)
91 self.get_parameter(
'scan_range_min').get_parameter_value().double_value
93 self.declare_parameter(
'scan_range_max', 30.0)
95 self.get_parameter(
'scan_range_max').get_parameter_value().double_value
97 self.declare_parameter(
'scan_angle_min', -math.pi)
99 self.get_parameter(
'scan_angle_min').get_parameter_value().double_value
101 self.declare_parameter(
'scan_angle_max', math.pi)
103 self.get_parameter(
'scan_angle_max').get_parameter_value().double_value
105 self.declare_parameter(
'scan_angle_increment', 0.0261)
107 self.get_parameter(
'scan_angle_increment').get_parameter_value().double_value
109 self.declare_parameter(
'scan_use_inf',
True)
111 self.get_parameter(
'scan_use_inf').get_parameter_value().bool_value
123 PoseWithCovarianceStamped,
126 self.
cmd_vel_subcmd_vel_sub = self.create_subscription(
130 self.
cmd_vel_subcmd_vel_sub = self.create_subscription(
133 self.
odom_pubodom_pub = self.create_publisher(Odometry,
'odom', 10)
135 sensor_qos = QoSProfile(
136 reliability=ReliabilityPolicy.BEST_EFFORT,
137 durability=DurabilityPolicy.VOLATILE,
139 self.
scan_pubscan_pub = self.create_publisher(LaserScan,
'scan', sensor_qos)
143 self.
clock_pubclock_pub = self.create_publisher(Clock,
'/clock', 10)
147 self.map_client: Client[GetMap.Request, GetMap.Response] = \
148 self.create_client(GetMap,
'/map_server/map')
155 self.
infoinfo(
'Loopback simulator initialized')
157 def getBaseToLaserTf(self) -> None:
160 transform = self.
tf_buffertf_buffer.lookup_transform(
164 except Exception
as ex:
165 self.get_logger().error(f
'Transform lookup failed: {str(ex)}')
167 def setupTimerCallback(self) -> None:
173 def clockTimerCallback(self) -> None:
175 msg.clock = self.get_clock().now().to_msg()
178 def cmdVelCallback(self, msg: Twist) ->
None:
179 self.
debugdebug(
'Received cmd_vel')
186 def cmdVelStampedCallback(self, msg: TwistStamped) ->
None:
187 self.
debugdebug(
'Received cmd_vel')
192 self.
curr_cmd_vel_timecurr_cmd_vel_time = rclpy.time.Time.from_msg(msg.header.stamp)
194 def initialPoseCallback(self, msg: PoseWithCovarianceStamped) ->
None:
195 self.
infoinfo(
'Received initial pose!')
201 self.
t_map_to_odomt_map_to_odom.transform.translation.z = 0.0
219 t_map_to_base_link = TransformStamped()
220 t_map_to_base_link.header = msg.header
221 t_map_to_base_link.child_frame_id = self.
base_frame_idbase_frame_id
222 t_map_to_base_link.transform.translation.x = self.
initial_poseinitial_pose.position.x
223 t_map_to_base_link.transform.translation.y = self.
initial_poseinitial_pose.position.y
224 t_map_to_base_link.transform.translation.z = 0.0
225 t_map_to_base_link.transform.rotation = self.
initial_poseinitial_pose.orientation
226 mat_map_to_base_link = transformStampedToMatrix(t_map_to_base_link)
227 mat_odom_to_base_link = transformStampedToMatrix(self.
t_odom_to_base_linkt_odom_to_base_link)
228 mat_base_link_to_odom = tf_transformations.inverse_matrix(mat_odom_to_base_link)
230 tf_transformations.concatenate_matrices(mat_map_to_base_link, mat_base_link_to_odom)
231 self.
t_map_to_odomt_map_to_odom.transform = matrixToTransform(mat_map_to_odom)
233 def timerCallback(self) -> None:
235 one_sec = Duration(seconds=1)
249 _, _, yaw = tf_transformations.euler_from_quaternion(q)
250 self.
t_odom_to_base_linkt_odom_to_base_link.transform.translation.x += dx * math.cos(yaw) - dy * math.sin(yaw)
251 self.
t_odom_to_base_linkt_odom_to_base_link.transform.translation.y += dx * math.sin(yaw) + dy * math.cos(yaw)
258 def publishLaserScan(self) -> None:
261 self.
scan_msgscan_msg.header.stamp = (self.get_clock().now()).to_msg()
267 self.
scan_msgscan_msg.time_increment = 0.0
268 self.
scan_msgscan_msg.scan_time = 0.1
273 self.
scan_msgscan_msg.angle_increment)
274 self.
scan_msgscan_msg.ranges = [0.0] * num_samples
278 def publishTransforms(self, map_to_odom: TransformStamped,
279 odom_to_base_link: TransformStamped) ->
None:
280 map_to_odom.header.stamp = \
281 (self.get_clock().now() + Duration(seconds=self.
update_durupdate_dur)).to_msg()
282 odom_to_base_link.header.stamp = self.get_clock().now().to_msg()
285 self.
tf_broadcastertf_broadcaster.sendTransform(odom_to_base_link)
287 def publishOdometry(self, odom_to_base_link: TransformStamped) ->
None:
289 odom.header.stamp = self.get_clock().now().to_msg()
290 odom.header.frame_id =
'odom'
291 odom.child_frame_id =
'base_link'
292 odom.pose.pose.position.x = odom_to_base_link.transform.translation.x
293 odom.pose.pose.position.y = odom_to_base_link.transform.translation.y
294 odom.pose.pose.orientation = odom_to_base_link.transform.rotation
298 def info(self, msg: str) ->
None:
299 self.get_logger().info(msg)
302 def debug(self, msg: str) ->
None:
303 self.get_logger().debug(msg)
306 def getMap(self) -> None:
307 request = GetMap.Request()
308 if self.map_client.wait_for_service(timeout_sec=5.0):
310 future = self.map_client.call_async(request)
311 rclpy.spin_until_future_complete(self, future)
312 result = future.result()
313 if result
is not None:
314 self.
mapmap = result.map
315 self.get_logger().info(
'Laser scan will be populated using map data')
317 self.get_logger().warning(
318 'Failed to get map, '
319 'Laser scan will be populated using max range'
322 self.get_logger().warning(
323 'Failed to get map, '
324 'Laser scan will be populated using max range'
327 def getLaserPose(self) -> tuple[float, float, float]:
328 mat_map_to_odom = transformStampedToMatrix(self.
t_map_to_odomt_map_to_odom)
331 mat_map_to_laser = tf_transformations.concatenate_matrices(
336 transform = matrixToTransform(mat_map_to_laser)
338 x = transform.translation.x
339 y = transform.translation.y
340 theta = tf_transformations.euler_from_quaternion([
341 transform.rotation.x,
342 transform.rotation.y,
343 transform.rotation.z,
349 def getLaserScan(self, num_samples: int) ->
None:
352 self.
scan_msgscan_msg.ranges = [float(
'inf')] * num_samples
354 self.
scan_msgscan_msg.ranges = [self.
scan_msgscan_msg.range_max - 0.1] * num_samples
359 mx0, my0 = worldToMap(x0, y0, self.
mapmap)
361 if not 0 < mx0 < self.
mapmap.info.width
or not 0 < my0 < self.
mapmap.info.height:
364 self.
scan_msgscan_msg.ranges = [float(
'inf')] * num_samples
366 self.
scan_msgscan_msg.ranges = [self.
scan_msgscan_msg.range_max - 0.1] * num_samples
369 for i
in range(num_samples):
370 curr_angle = theta + self.
scan_msgscan_msg.angle_min + i * self.
scan_msgscan_msg.angle_increment
371 x1 = x0 + self.
scan_msgscan_msg.range_max * math.cos(curr_angle)
372 y1 = y0 + self.
scan_msgscan_msg.range_max * math.sin(curr_angle)
374 mx1, my1 = worldToMap(x1, y1, self.
mapmap)
378 while line_iterator.isValid():
379 mx, my = int(line_iterator.getX()), int(line_iterator.getY())
381 if not 0 < mx < self.
mapmap.info.width
or not 0 < my < self.
mapmap.info.height:
385 point_cost = getMapOccupancy(mx, my, self.
mapmap)
388 self.
scan_msgscan_msg.ranges[i] = math.sqrt(
389 (int(line_iterator.getX()) - mx0) ** 2 +
390 (int(line_iterator.getY()) - my0) ** 2
391 ) * self.
mapmap.info.resolution
394 line_iterator.advance()
396 self.
scan_msgscan_msg.ranges[i] = float(
'inf')
402 rclpy.spin(loopback_simulator)
403 loopback_simulator.destroy_node()
408 if __name__ ==
'__main__':
None publishOdometry(self, TransformStamped odom_to_base_link)
None getBaseToLaserTf(self)
None publishTransforms(self, TransformStamped map_to_odom, TransformStamped odom_to_base_link)
None initialPoseCallback(self, PoseWithCovarianceStamped msg)
None clockTimerCallback(self)
None setupTimerCallback(self)
None debug(self, str msg)
None cmdVelCallback(self, Twist msg)
None publishLaserScan(self)
tuple[float, float, float] getLaserPose(self)
None cmdVelStampedCallback(self, TwistStamped msg)
None getLaserScan(self, int num_samples)