18 from geometry_msgs.msg
import PoseWithCovarianceStamped, Twist, TwistStamped
19 from geometry_msgs.msg
import Quaternion, TransformStamped, Vector3
21 from nav_msgs.msg
import Odometry
22 from nav_msgs.srv
import GetMap
24 from rclpy.duration
import Duration
25 from rclpy.node
import Node
26 from rclpy.qos
import DurabilityPolicy, QoSProfile, ReliabilityPolicy
27 from rosgraph_msgs.msg
import Clock
28 from sensor_msgs.msg
import LaserScan
29 from tf2_ros
import Buffer, TransformBroadcaster, TransformListener
30 import tf_transformations
36 transformStampedToMatrix,
41 This is a loopback simulator that replaces a physics simulator to create a
42 frictionless, inertialess, and collisionless simulation environment. It
43 accepts cmd_vel messages and publishes odometry & TF messages based on the
44 cumulative velocities received to mimick global localization and simulation.
45 It also accepts initialpose messages to set the initial pose of the robot
53 super().__init__(node_name=
'loopback_simulator')
57 self.
timertimer =
None
62 self.declare_parameter(
'update_duration', 0.01)
63 self.
update_durupdate_dur = self.get_parameter(
'update_duration').get_parameter_value().double_value
65 self.declare_parameter(
'base_frame_id',
'base_footprint')
66 self.
base_frame_idbase_frame_id = self.get_parameter(
'base_frame_id').get_parameter_value().string_value
68 self.declare_parameter(
'map_frame_id',
'map')
69 self.
map_frame_idmap_frame_id = self.get_parameter(
'map_frame_id').get_parameter_value().string_value
71 self.declare_parameter(
'odom_frame_id',
'odom')
72 self.
odom_frame_idodom_frame_id = self.get_parameter(
'odom_frame_id').get_parameter_value().string_value
74 self.declare_parameter(
'scan_frame_id',
'base_scan')
75 self.
scan_frame_idscan_frame_id = self.get_parameter(
'scan_frame_id').get_parameter_value().string_value
77 self.declare_parameter(
'enable_stamped_cmd_vel',
False)
78 use_stamped = self.get_parameter(
'enable_stamped_cmd_vel').get_parameter_value().bool_value
80 self.declare_parameter(
'scan_publish_dur', 0.1)
82 'scan_publish_dur').get_parameter_value().double_value
84 self.declare_parameter(
'publish_map_odom_tf',
True)
86 'publish_map_odom_tf').get_parameter_value().bool_value
88 self.declare_parameter(
'publish_clock',
True)
89 self.
publish_clockpublish_clock = self.get_parameter(
'publish_clock').get_parameter_value().bool_value
91 self.declare_parameter(
'scan_range_min', 0.05)
93 self.get_parameter(
'scan_range_min').get_parameter_value().double_value
95 self.declare_parameter(
'scan_range_max', 30.0)
97 self.get_parameter(
'scan_range_max').get_parameter_value().double_value
99 self.declare_parameter(
'scan_angle_min', -math.pi)
101 self.get_parameter(
'scan_angle_min').get_parameter_value().double_value
103 self.declare_parameter(
'scan_angle_max', math.pi)
105 self.get_parameter(
'scan_angle_max').get_parameter_value().double_value
107 self.declare_parameter(
'scan_angle_increment', 0.0261)
109 self.get_parameter(
'scan_angle_increment').get_parameter_value().double_value
111 self.declare_parameter(
'scan_use_inf',
True)
113 self.get_parameter(
'scan_use_inf').get_parameter_value().bool_value
125 PoseWithCovarianceStamped,
128 self.
cmd_vel_subcmd_vel_sub = self.create_subscription(
132 self.
cmd_vel_subcmd_vel_sub = self.create_subscription(
135 self.
odom_pubodom_pub = self.create_publisher(Odometry,
'odom', 10)
137 sensor_qos = QoSProfile(
138 reliability=ReliabilityPolicy.BEST_EFFORT,
139 durability=DurabilityPolicy.VOLATILE,
141 self.
scan_pubscan_pub = self.create_publisher(LaserScan,
'scan', sensor_qos)
145 self.
clock_pubclock_pub = self.create_publisher(Clock,
'/clock', 10)
149 self.
map_clientmap_client = self.create_client(GetMap,
'/map_server/map')
156 self.
infoinfo(
'Loopback simulator initialized')
158 def getBaseToLaserTf(self):
161 transform = self.
tf_buffertf_buffer.lookup_transform(
165 except Exception
as ex:
166 self.get_logger().error(
'Transform lookup failed: %s' % str(ex))
168 def setupTimerCallback(self):
174 def clockTimerCallback(self):
176 msg.clock = self.get_clock().now().to_msg()
179 def cmdVelCallback(self, msg):
180 self.
debugdebug(
'Received cmd_vel')
187 def cmdVelStampedCallback(self, msg):
188 self.
debugdebug(
'Received cmd_vel')
193 self.
curr_cmd_vel_timecurr_cmd_vel_time = rclpy.time.Time.from_msg(msg.header.stamp)
195 def initialPoseCallback(self, msg):
196 self.
infoinfo(
'Received initial pose!')
202 self.
t_map_to_odomt_map_to_odom.transform.translation.z = 0.0
220 t_map_to_base_link = TransformStamped()
221 t_map_to_base_link.header = msg.header
222 t_map_to_base_link.child_frame_id = self.
base_frame_idbase_frame_id
223 t_map_to_base_link.transform.translation.x = self.
initial_poseinitial_pose.position.x
224 t_map_to_base_link.transform.translation.y = self.
initial_poseinitial_pose.position.y
225 t_map_to_base_link.transform.translation.z = 0.0
226 t_map_to_base_link.transform.rotation = self.
initial_poseinitial_pose.orientation
227 mat_map_to_base_link = transformStampedToMatrix(t_map_to_base_link)
228 mat_odom_to_base_link = transformStampedToMatrix(self.
t_odom_to_base_linkt_odom_to_base_link)
229 mat_base_link_to_odom = tf_transformations.inverse_matrix(mat_odom_to_base_link)
231 tf_transformations.concatenate_matrices(mat_map_to_base_link, mat_base_link_to_odom)
232 self.
t_map_to_odomt_map_to_odom.transform = matrixToTransform(mat_map_to_odom)
234 def timerCallback(self):
236 one_sec = Duration(seconds=1)
250 _, _, yaw = tf_transformations.euler_from_quaternion(q)
251 self.
t_odom_to_base_linkt_odom_to_base_link.transform.translation.x += dx * math.cos(yaw) - dy * math.sin(yaw)
252 self.
t_odom_to_base_linkt_odom_to_base_link.transform.translation.y += dx * math.sin(yaw) + dy * math.cos(yaw)
259 def publishLaserScan(self):
262 self.
scan_msgscan_msg.header.stamp = (self.get_clock().now()).to_msg()
268 self.
scan_msgscan_msg.time_increment = 0.0
269 self.
scan_msgscan_msg.scan_time = 0.1
274 self.
scan_msgscan_msg.angle_increment)
275 self.
scan_msgscan_msg.ranges = [0.0] * num_samples
279 def publishTransforms(self, map_to_odom, odom_to_base_link):
280 map_to_odom.header.stamp = \
281 (self.get_clock().now() + Duration(seconds=self.
update_durupdate_dur)).to_msg()
282 odom_to_base_link.header.stamp = self.get_clock().now().to_msg()
285 self.
tf_broadcastertf_broadcaster.sendTransform(odom_to_base_link)
287 def publishOdometry(self, odom_to_base_link):
289 odom.header.stamp = self.get_clock().now().to_msg()
290 odom.header.frame_id =
'odom'
291 odom.child_frame_id =
'base_link'
292 odom.pose.pose.position.x = odom_to_base_link.transform.translation.x
293 odom.pose.pose.position.y = odom_to_base_link.transform.translation.y
294 odom.pose.pose.orientation = odom_to_base_link.transform.rotation
299 self.get_logger().info(msg)
302 def debug(self, msg):
303 self.get_logger().debug(msg)
307 request = GetMap.Request()
308 if self.
map_clientmap_client.wait_for_service(timeout_sec=5.0):
310 future = self.
map_clientmap_client.call_async(request)
311 rclpy.spin_until_future_complete(self, future)
312 if future.result()
is not None:
313 self.
mapmap = future.result().map
314 self.get_logger().info(
'Laser scan will be populated using map data')
316 self.get_logger().warn(
317 'Failed to get map, '
318 'Laser scan will be populated using max range'
321 self.get_logger().warn(
322 'Failed to get map, '
323 'Laser scan will be populated using max range'
326 def getLaserPose(self):
327 mat_map_to_odom = transformStampedToMatrix(self.
t_map_to_odomt_map_to_odom)
330 mat_map_to_laser = tf_transformations.concatenate_matrices(
335 transform = matrixToTransform(mat_map_to_laser)
337 x = transform.translation.x
338 y = transform.translation.y
339 theta = tf_transformations.euler_from_quaternion([
340 transform.rotation.x,
341 transform.rotation.y,
342 transform.rotation.z,
348 def getLaserScan(self, num_samples):
351 self.
scan_msgscan_msg.ranges = [float(
'inf')] * num_samples
353 self.
scan_msgscan_msg.ranges = [self.
scan_msgscan_msg.range_max - 0.1] * num_samples
358 mx0, my0 = worldToMap(x0, y0, self.
mapmap)
360 if not 0 < mx0 < self.
mapmap.info.width
or not 0 < my0 < self.
mapmap.info.height:
363 self.
scan_msgscan_msg.ranges = [float(
'inf')] * num_samples
365 self.
scan_msgscan_msg.ranges = [self.
scan_msgscan_msg.range_max - 0.1] * num_samples
368 for i
in range(num_samples):
369 curr_angle = theta + self.
scan_msgscan_msg.angle_min + i * self.
scan_msgscan_msg.angle_increment
370 x1 = x0 + self.
scan_msgscan_msg.range_max * math.cos(curr_angle)
371 y1 = y0 + self.
scan_msgscan_msg.range_max * math.sin(curr_angle)
373 mx1, my1 = worldToMap(x1, y1, self.
mapmap)
377 while line_iterator.isValid():
378 mx, my = int(line_iterator.getX()), int(line_iterator.getY())
380 if not 0 < mx < self.
mapmap.info.width
or not 0 < my < self.
mapmap.info.height:
384 point_cost = getMapOccupancy(mx, my, self.
mapmap)
387 self.
scan_msgscan_msg.ranges[i] = math.sqrt(
388 (int(line_iterator.getX()) - mx0) ** 2 +
389 (int(line_iterator.getY()) - my0) ** 2
390 ) * self.
mapmap.info.resolution
393 line_iterator.advance()
395 self.
scan_msgscan_msg.ranges[i] = float(
'inf')
401 rclpy.spin(loopback_simulator)
402 loopback_simulator.destroy_node()
407 if __name__ ==
'__main__':
def publishOdometry(self, odom_to_base_link)
def publishLaserScan(self)
def clockTimerCallback(self)
def setupTimerCallback(self)
def getBaseToLaserTf(self)
def getLaserScan(self, num_samples)
def publishTransforms(self, map_to_odom, odom_to_base_link)
def initialPoseCallback(self, msg)
def cmdVelCallback(self, msg)
def cmdVelStampedCallback(self, msg)