17 from typing
import Optional, Union
19 from geometry_msgs.msg import (Pose, PoseWithCovarianceStamped, Quaternion, TransformStamped,
20 Twist, TwistStamped, Vector3)
21 from nav2_loopback_sim.utils
import (addYawToQuat, getMapOccupancy, matrixToTransform,
22 transformStampedToMatrix, worldToMap)
25 from nav_msgs.srv
import GetMap
28 from rclpy.client
import Client
29 from rclpy.duration
import Duration
30 from rclpy.node
import Node
31 from rclpy.qos
import DurabilityPolicy, QoSProfile, ReliabilityPolicy
32 from rclpy.subscription
import Subscription
33 from rclpy.timer
import Timer
34 from rosgraph_msgs.msg
import Clock
35 from sensor_msgs.msg
import LaserScan
36 from tf2_ros
import Buffer, TransformBroadcaster, TransformListener
37 import tf_transformations
40 This is a loopback simulator that replaces a physics simulator to create a
41 frictionless, inertialess, and collisionless simulation environment. It
42 accepts cmd_vel messages and publishes odometry & TF messages based on the
43 cumulative velocities received to mimic global localization and simulation.
44 It also accepts initialpose messages to set the initial pose of the robot
51 def __init__(self) -> None:
52 super().__init__(node_name=
'loopback_simulator')
56 self.
timertimer: Optional[Timer] =
None
58 self.
mapmap: Optional[OccupancyGrid] =
None
59 self.
mat_base_to_lasermat_base_to_laser: Optional[np.ndarray[np.float64, np.dtype[np.float64]]] =
None
61 self.declare_parameter(
'update_duration', 0.01)
62 self.
update_durupdate_dur = self.get_parameter(
'update_duration').get_parameter_value().double_value
64 self.declare_parameter(
'base_frame_id',
'base_footprint')
65 self.
base_frame_idbase_frame_id = self.get_parameter(
'base_frame_id').get_parameter_value().string_value
67 self.declare_parameter(
'map_frame_id',
'map')
68 self.
map_frame_idmap_frame_id = self.get_parameter(
'map_frame_id').get_parameter_value().string_value
70 self.declare_parameter(
'odom_frame_id',
'odom')
71 self.
odom_frame_idodom_frame_id = self.get_parameter(
'odom_frame_id').get_parameter_value().string_value
73 self.declare_parameter(
'scan_frame_id',
'base_scan')
74 self.
scan_frame_idscan_frame_id = self.get_parameter(
'scan_frame_id').get_parameter_value().string_value
76 self.declare_parameter(
'enable_stamped_cmd_vel',
True)
77 use_stamped = self.get_parameter(
'enable_stamped_cmd_vel').get_parameter_value().bool_value
79 self.declare_parameter(
'scan_publish_dur', 0.1)
81 'scan_publish_dur').get_parameter_value().double_value
83 self.declare_parameter(
'publish_map_odom_tf',
True)
85 'publish_map_odom_tf').get_parameter_value().bool_value
87 self.declare_parameter(
'publish_clock',
True)
88 self.
publish_clockpublish_clock = self.get_parameter(
'publish_clock').get_parameter_value().bool_value
90 self.declare_parameter(
'scan_range_min', 0.05)
92 self.get_parameter(
'scan_range_min').get_parameter_value().double_value
94 self.declare_parameter(
'scan_range_max', 30.0)
96 self.get_parameter(
'scan_range_max').get_parameter_value().double_value
98 self.declare_parameter(
'scan_angle_min', -math.pi)
100 self.get_parameter(
'scan_angle_min').get_parameter_value().double_value
102 self.declare_parameter(
'scan_angle_max', math.pi)
104 self.get_parameter(
'scan_angle_max').get_parameter_value().double_value
106 self.declare_parameter(
'scan_angle_increment', 0.0261)
108 self.get_parameter(
'scan_angle_increment').get_parameter_value().double_value
110 self.declare_parameter(
'scan_use_inf',
True)
112 self.get_parameter(
'scan_use_inf').get_parameter_value().bool_value
124 PoseWithCovarianceStamped,
127 self.
cmd_vel_subcmd_vel_sub: Union[Subscription[Twist], Subscription[TwistStamped]]
130 self.
cmd_vel_subcmd_vel_sub = self.create_subscription(
134 self.
cmd_vel_subcmd_vel_sub = self.create_subscription(
137 self.
odom_pubodom_pub = self.create_publisher(Odometry,
'odom', 10)
139 sensor_qos = QoSProfile(
140 reliability=ReliabilityPolicy.BEST_EFFORT,
141 durability=DurabilityPolicy.VOLATILE,
143 self.
scan_pubscan_pub = self.create_publisher(LaserScan,
'scan', sensor_qos)
147 self.
clock_pubclock_pub = self.create_publisher(Clock,
'/clock', 10)
151 self.map_client: Client[GetMap.Request, GetMap.Response] = \
152 self.create_client(GetMap,
'/map_server/map')
159 self.
infoinfo(
'Loopback simulator initialized')
161 def getBaseToLaserTf(self) -> None:
164 transform = self.
tf_buffertf_buffer.lookup_transform(
168 except Exception
as ex:
169 self.get_logger().error(f
'Transform lookup failed: {str(ex)}')
171 def setupTimerCallback(self) -> None:
177 def clockTimerCallback(self) -> None:
179 msg.clock = self.get_clock().now().to_msg()
182 def cmdVelCallback(self, msg: Twist) ->
None:
183 self.
debugdebug(
'Received cmd_vel')
190 def cmdVelStampedCallback(self, msg: TwistStamped) ->
None:
191 self.
debugdebug(
'Received cmd_vel')
196 self.
curr_cmd_vel_timecurr_cmd_vel_time = rclpy.time.Time.from_msg(msg.header.stamp)
198 def initialPoseCallback(self, msg: PoseWithCovarianceStamped) ->
None:
199 self.
infoinfo(
'Received initial pose!')
205 self.
t_map_to_odomt_map_to_odom.transform.translation.z = 0.0
223 t_map_to_base_link = TransformStamped()
224 t_map_to_base_link.header = msg.header
225 t_map_to_base_link.child_frame_id = self.
base_frame_idbase_frame_id
226 t_map_to_base_link.transform.translation.x = self.
initial_poseinitial_pose.position.x
227 t_map_to_base_link.transform.translation.y = self.
initial_poseinitial_pose.position.y
228 t_map_to_base_link.transform.translation.z = 0.0
229 t_map_to_base_link.transform.rotation = self.
initial_poseinitial_pose.orientation
230 mat_map_to_base_link = transformStampedToMatrix(t_map_to_base_link)
231 mat_odom_to_base_link = transformStampedToMatrix(self.
t_odom_to_base_linkt_odom_to_base_link)
232 mat_base_link_to_odom = tf_transformations.inverse_matrix(mat_odom_to_base_link)
234 tf_transformations.concatenate_matrices(mat_map_to_base_link, mat_base_link_to_odom)
235 self.
t_map_to_odomt_map_to_odom.transform = matrixToTransform(mat_map_to_odom)
237 def timerCallback(self) -> None:
239 one_sec = Duration(seconds=1)
253 _, _, yaw = tf_transformations.euler_from_quaternion(q)
254 self.
t_odom_to_base_linkt_odom_to_base_link.transform.translation.x += dx * math.cos(yaw) - dy * math.sin(yaw)
255 self.
t_odom_to_base_linkt_odom_to_base_link.transform.translation.y += dx * math.sin(yaw) + dy * math.cos(yaw)
262 def publishLaserScan(self) -> None:
265 self.
scan_msgscan_msg.header.stamp = (self.get_clock().now()).to_msg()
271 self.
scan_msgscan_msg.time_increment = 0.0
272 self.
scan_msgscan_msg.scan_time = 0.1
277 self.
scan_msgscan_msg.angle_increment)
278 self.
scan_msgscan_msg.ranges = [0.0] * num_samples
282 def publishTransforms(self, map_to_odom: TransformStamped,
283 odom_to_base_link: TransformStamped) ->
None:
284 map_to_odom.header.stamp = \
285 (self.get_clock().now() + Duration(seconds=self.
update_durupdate_dur)).to_msg()
286 odom_to_base_link.header.stamp = self.get_clock().now().to_msg()
289 self.
tf_broadcastertf_broadcaster.sendTransform(odom_to_base_link)
291 def publishOdometry(self, odom_to_base_link: TransformStamped) ->
None:
293 odom.header.stamp = self.get_clock().now().to_msg()
294 odom.header.frame_id =
'odom'
295 odom.child_frame_id =
'base_link'
296 odom.pose.pose.position.x = odom_to_base_link.transform.translation.x
297 odom.pose.pose.position.y = odom_to_base_link.transform.translation.y
298 odom.pose.pose.orientation = odom_to_base_link.transform.rotation
303 def info(self, msg: str) ->
None:
304 self.get_logger().info(msg)
307 def debug(self, msg: str) ->
None:
308 self.get_logger().debug(msg)
311 def getMap(self) -> None:
312 request = GetMap.Request()
313 if self.map_client.wait_for_service(timeout_sec=5.0):
315 future = self.map_client.call_async(request)
316 rclpy.spin_until_future_complete(self, future)
317 result = future.result()
318 if result
is not None:
319 self.
mapmap = result.map
320 self.get_logger().info(
'Laser scan will be populated using map data')
322 self.get_logger().warning(
323 'Failed to get map, '
324 'Laser scan will be populated using max range'
327 self.get_logger().warning(
328 'Failed to get map, '
329 'Laser scan will be populated using max range'
332 def getLaserPose(self) -> tuple[float, float, float]:
333 mat_map_to_odom = transformStampedToMatrix(self.
t_map_to_odomt_map_to_odom)
336 mat_map_to_laser = tf_transformations.concatenate_matrices(
341 transform = matrixToTransform(mat_map_to_laser)
343 x = transform.translation.x
344 y = transform.translation.y
345 theta = tf_transformations.euler_from_quaternion([
346 transform.rotation.x,
347 transform.rotation.y,
348 transform.rotation.z,
354 def getLaserScan(self, num_samples: int) ->
None:
357 self.
scan_msgscan_msg.ranges = [float(
'inf')] * num_samples
359 self.
scan_msgscan_msg.ranges = [self.
scan_msgscan_msg.range_max - 0.1] * num_samples
364 mx0, my0 = worldToMap(x0, y0, self.
mapmap)
366 if not 0 < mx0 < self.
mapmap.info.width
or not 0 < my0 < self.
mapmap.info.height:
369 self.
scan_msgscan_msg.ranges = [float(
'inf')] * num_samples
371 self.
scan_msgscan_msg.ranges = [self.
scan_msgscan_msg.range_max - 0.1] * num_samples
374 for i
in range(num_samples):
375 curr_angle = theta + self.
scan_msgscan_msg.angle_min + i * self.
scan_msgscan_msg.angle_increment
376 x1 = x0 + self.
scan_msgscan_msg.range_max * math.cos(curr_angle)
377 y1 = y0 + self.
scan_msgscan_msg.range_max * math.sin(curr_angle)
379 mx1, my1 = worldToMap(x1, y1, self.
mapmap)
383 while line_iterator.isValid():
384 mx, my = int(line_iterator.getX()), int(line_iterator.getY())
386 if not 0 < mx < self.
mapmap.info.width
or not 0 < my < self.
mapmap.info.height:
390 point_cost = getMapOccupancy(mx, my, self.
mapmap)
393 self.
scan_msgscan_msg.ranges[i] = math.sqrt(
394 (int(line_iterator.getX()) - mx0) ** 2 +
395 (int(line_iterator.getY()) - my0) ** 2
396 ) * self.
mapmap.info.resolution
399 line_iterator.advance()
401 self.
scan_msgscan_msg.ranges[i] = float(
'inf')
407 rclpy.spin(loopback_simulator)
408 loopback_simulator.destroy_node()
413 if __name__ ==
'__main__':
None publishOdometry(self, TransformStamped odom_to_base_link)
None getBaseToLaserTf(self)
None publishTransforms(self, TransformStamped map_to_odom, TransformStamped odom_to_base_link)
None initialPoseCallback(self, PoseWithCovarianceStamped msg)
None clockTimerCallback(self)
None setupTimerCallback(self)
None debug(self, str msg)
None cmdVelCallback(self, Twist msg)
None publishLaserScan(self)
tuple[float, float, float] getLaserPose(self)
None cmdVelStampedCallback(self, TwistStamped msg)
None getLaserScan(self, int num_samples)