17 from typing
import Optional
19 from geometry_msgs.msg import (PoseWithCovarianceStamped, Quaternion, TransformStamped, Twist,
20 TwistStamped, Vector3)
23 from nav_msgs.srv
import GetMap
26 from rclpy.duration
import Duration
27 from rclpy.node
import Node
28 from rclpy.qos
import DurabilityPolicy, QoSProfile, ReliabilityPolicy
29 from rclpy.timer
import Timer
30 from rosgraph_msgs.msg
import Clock
31 from sensor_msgs.msg
import LaserScan
32 from tf2_ros
import Buffer, TransformBroadcaster, TransformListener
33 import tf_transformations
35 from .utils
import (addYawToQuat, getMapOccupancy, matrixToTransform, transformStampedToMatrix,
39 This is a loopback simulator that replaces a physics simulator to create a
40 frictionless, inertialess, and collisionless simulation environment. It
41 accepts cmd_vel messages and publishes odometry & TF messages based on the
42 cumulative velocities received to mimic global localization and simulation.
43 It also accepts initialpose messages to set the initial pose of the robot
50 def __init__(self) -> None:
51 super().__init__(node_name=
'loopback_simulator')
54 self.
initial_poseinitial_pose: PoseWithCovarianceStamped =
None
55 self.
timertimer: Optional[Timer] =
None
58 self.
mat_base_to_lasermat_base_to_laser: Optional[np.ndarray[np.float64, np.dtype[np.float64]]] =
None
60 self.declare_parameter(
'update_duration', 0.01)
61 self.
update_durupdate_dur = self.get_parameter(
'update_duration').get_parameter_value().double_value
63 self.declare_parameter(
'base_frame_id',
'base_footprint')
64 self.
base_frame_idbase_frame_id = self.get_parameter(
'base_frame_id').get_parameter_value().string_value
66 self.declare_parameter(
'map_frame_id',
'map')
67 self.
map_frame_idmap_frame_id = self.get_parameter(
'map_frame_id').get_parameter_value().string_value
69 self.declare_parameter(
'odom_frame_id',
'odom')
70 self.
odom_frame_idodom_frame_id = self.get_parameter(
'odom_frame_id').get_parameter_value().string_value
72 self.declare_parameter(
'scan_frame_id',
'base_scan')
73 self.
scan_frame_idscan_frame_id = self.get_parameter(
'scan_frame_id').get_parameter_value().string_value
75 self.declare_parameter(
'enable_stamped_cmd_vel',
True)
76 use_stamped = self.get_parameter(
'enable_stamped_cmd_vel').get_parameter_value().bool_value
78 self.declare_parameter(
'scan_publish_dur', 0.1)
80 'scan_publish_dur').get_parameter_value().double_value
82 self.declare_parameter(
'publish_map_odom_tf',
True)
84 'publish_map_odom_tf').get_parameter_value().bool_value
86 self.declare_parameter(
'publish_clock',
True)
87 self.
publish_clockpublish_clock = self.get_parameter(
'publish_clock').get_parameter_value().bool_value
89 self.declare_parameter(
'scan_range_min', 0.05)
91 self.get_parameter(
'scan_range_min').get_parameter_value().double_value
93 self.declare_parameter(
'scan_range_max', 30.0)
95 self.get_parameter(
'scan_range_max').get_parameter_value().double_value
97 self.declare_parameter(
'scan_angle_min', -math.pi)
99 self.get_parameter(
'scan_angle_min').get_parameter_value().double_value
101 self.declare_parameter(
'scan_angle_max', math.pi)
103 self.get_parameter(
'scan_angle_max').get_parameter_value().double_value
105 self.declare_parameter(
'scan_angle_increment', 0.0261)
107 self.get_parameter(
'scan_angle_increment').get_parameter_value().double_value
109 self.declare_parameter(
'scan_use_inf',
True)
111 self.get_parameter(
'scan_use_inf').get_parameter_value().bool_value
123 PoseWithCovarianceStamped,
126 self.
cmd_vel_subcmd_vel_sub = self.create_subscription(
130 self.
cmd_vel_subcmd_vel_sub = self.create_subscription(
133 self.
odom_pubodom_pub = self.create_publisher(Odometry,
'odom', 10)
135 sensor_qos = QoSProfile(
136 reliability=ReliabilityPolicy.BEST_EFFORT,
137 durability=DurabilityPolicy.VOLATILE,
139 self.
scan_pubscan_pub = self.create_publisher(LaserScan,
'scan', sensor_qos)
143 self.
clock_pubclock_pub = self.create_publisher(Clock,
'/clock', 10)
147 self.
map_clientmap_client = self.create_client(GetMap,
'/map_server/map')
154 self.
infoinfo(
'Loopback simulator initialized')
156 def getBaseToLaserTf(self) -> None:
159 transform = self.
tf_buffertf_buffer.lookup_transform(
163 except Exception
as ex:
164 self.get_logger().error(f
'Transform lookup failed: {str(ex)}')
166 def setupTimerCallback(self) -> None:
172 def clockTimerCallback(self) -> None:
174 msg.clock = self.get_clock().now().to_msg()
177 def cmdVelCallback(self, msg: Twist) ->
None:
178 self.
debugdebug(
'Received cmd_vel')
185 def cmdVelStampedCallback(self, msg: TwistStamped) ->
None:
186 self.
debugdebug(
'Received cmd_vel')
191 self.
curr_cmd_vel_timecurr_cmd_vel_time = rclpy.time.Time.from_msg(msg.header.stamp)
193 def initialPoseCallback(self, msg: PoseWithCovarianceStamped) ->
None:
194 self.
infoinfo(
'Received initial pose!')
200 self.
t_map_to_odomt_map_to_odom.transform.translation.z = 0.0
218 t_map_to_base_link = TransformStamped()
219 t_map_to_base_link.header = msg.header
220 t_map_to_base_link.child_frame_id = self.
base_frame_idbase_frame_id
221 t_map_to_base_link.transform.translation.x = self.
initial_poseinitial_pose.position.x
222 t_map_to_base_link.transform.translation.y = self.
initial_poseinitial_pose.position.y
223 t_map_to_base_link.transform.translation.z = 0.0
224 t_map_to_base_link.transform.rotation = self.
initial_poseinitial_pose.orientation
225 mat_map_to_base_link = transformStampedToMatrix(t_map_to_base_link)
226 mat_odom_to_base_link = transformStampedToMatrix(self.
t_odom_to_base_linkt_odom_to_base_link)
227 mat_base_link_to_odom = tf_transformations.inverse_matrix(mat_odom_to_base_link)
229 tf_transformations.concatenate_matrices(mat_map_to_base_link, mat_base_link_to_odom)
230 self.
t_map_to_odomt_map_to_odom.transform = matrixToTransform(mat_map_to_odom)
232 def timerCallback(self) -> None:
234 one_sec = Duration(seconds=1)
248 _, _, yaw = tf_transformations.euler_from_quaternion(q)
249 self.
t_odom_to_base_linkt_odom_to_base_link.transform.translation.x += dx * math.cos(yaw) - dy * math.sin(yaw)
250 self.
t_odom_to_base_linkt_odom_to_base_link.transform.translation.y += dx * math.sin(yaw) + dy * math.cos(yaw)
257 def publishLaserScan(self) -> None:
260 self.
scan_msgscan_msg.header.stamp = (self.get_clock().now()).to_msg()
266 self.
scan_msgscan_msg.time_increment = 0.0
267 self.
scan_msgscan_msg.scan_time = 0.1
272 self.
scan_msgscan_msg.angle_increment)
273 self.
scan_msgscan_msg.ranges = [0.0] * num_samples
277 def publishTransforms(self, map_to_odom: TransformStamped,
278 odom_to_base_link: TransformStamped) ->
None:
279 map_to_odom.header.stamp = \
280 (self.get_clock().now() + Duration(seconds=self.
update_durupdate_dur)).to_msg()
281 odom_to_base_link.header.stamp = self.get_clock().now().to_msg()
284 self.
tf_broadcastertf_broadcaster.sendTransform(odom_to_base_link)
286 def publishOdometry(self, odom_to_base_link: TransformStamped) ->
None:
288 odom.header.stamp = self.get_clock().now().to_msg()
289 odom.header.frame_id =
'odom'
290 odom.child_frame_id =
'base_link'
291 odom.pose.pose.position.x = odom_to_base_link.transform.translation.x
292 odom.pose.pose.position.y = odom_to_base_link.transform.translation.y
293 odom.pose.pose.orientation = odom_to_base_link.transform.rotation
297 def info(self, msg: str) ->
None:
298 self.get_logger().info(msg)
301 def debug(self, msg: str) ->
None:
302 self.get_logger().debug(msg)
305 def getMap(self) -> None:
306 request = GetMap.Request()
307 if self.
map_clientmap_client.wait_for_service(timeout_sec=5.0):
309 future = self.
map_clientmap_client.call_async(request)
310 rclpy.spin_until_future_complete(self, future)
311 result = future.result()
312 if result
is not None:
313 self.
mapmap = result.map
314 self.get_logger().info(
'Laser scan will be populated using map data')
316 self.get_logger().warn(
317 'Failed to get map, '
318 'Laser scan will be populated using max range'
321 self.get_logger().warn(
322 'Failed to get map, '
323 'Laser scan will be populated using max range'
326 def getLaserPose(self) -> tuple[float, float, float]:
327 mat_map_to_odom = transformStampedToMatrix(self.
t_map_to_odomt_map_to_odom)
330 mat_map_to_laser = tf_transformations.concatenate_matrices(
335 transform = matrixToTransform(mat_map_to_laser)
337 x = transform.translation.x
338 y = transform.translation.y
339 theta = tf_transformations.euler_from_quaternion([
340 transform.rotation.x,
341 transform.rotation.y,
342 transform.rotation.z,
348 def getLaserScan(self, num_samples: int) ->
None:
351 self.
scan_msgscan_msg.ranges = [float(
'inf')] * num_samples
353 self.
scan_msgscan_msg.ranges = [self.
scan_msgscan_msg.range_max - 0.1] * num_samples
358 mx0, my0 = worldToMap(x0, y0, self.
mapmap)
360 if not 0 < mx0 < self.
mapmap.info.width
or not 0 < my0 < self.
mapmap.info.height:
363 self.
scan_msgscan_msg.ranges = [float(
'inf')] * num_samples
365 self.
scan_msgscan_msg.ranges = [self.
scan_msgscan_msg.range_max - 0.1] * num_samples
368 for i
in range(num_samples):
369 curr_angle = theta + self.
scan_msgscan_msg.angle_min + i * self.
scan_msgscan_msg.angle_increment
370 x1 = x0 + self.
scan_msgscan_msg.range_max * math.cos(curr_angle)
371 y1 = y0 + self.
scan_msgscan_msg.range_max * math.sin(curr_angle)
373 mx1, my1 = worldToMap(x1, y1, self.
mapmap)
377 while line_iterator.isValid():
378 mx, my = int(line_iterator.getX()), int(line_iterator.getY())
380 if not 0 < mx < self.
mapmap.info.width
or not 0 < my < self.
mapmap.info.height:
384 point_cost = getMapOccupancy(mx, my, self.
mapmap)
387 self.
scan_msgscan_msg.ranges[i] = math.sqrt(
388 (int(line_iterator.getX()) - mx0) ** 2 +
389 (int(line_iterator.getY()) - my0) ** 2
390 ) * self.
mapmap.info.resolution
393 line_iterator.advance()
395 self.
scan_msgscan_msg.ranges[i] = float(
'inf')
401 rclpy.spin(loopback_simulator)
402 loopback_simulator.destroy_node()
407 if __name__ ==
'__main__':
None publishOdometry(self, TransformStamped odom_to_base_link)
None getBaseToLaserTf(self)
None publishTransforms(self, TransformStamped map_to_odom, TransformStamped odom_to_base_link)
None initialPoseCallback(self, PoseWithCovarianceStamped msg)
None clockTimerCallback(self)
None setupTimerCallback(self)
None debug(self, str msg)
None cmdVelCallback(self, Twist msg)
None publishLaserScan(self)
tuple[float, float, float] getLaserPose(self)
None cmdVelStampedCallback(self, TwistStamped msg)
None getLaserScan(self, int num_samples)