15 #include "rclcpp/experimental/intra_process_manager.hpp"
23 namespace experimental
26 static std::atomic<uint64_t> _next_unique_id {1};
28 IntraProcessManager::IntraProcessManager()
31 IntraProcessManager::~IntraProcessManager()
36 rclcpp::PublisherBase::SharedPtr publisher,
37 rclcpp::experimental::buffers::IntraProcessBufferBase::SharedPtr buffer)
39 std::unique_lock<std::shared_timed_mutex> lock(mutex_);
41 uint64_t pub_id = IntraProcessManager::get_next_unique_id();
43 publishers_[pub_id] = publisher;
44 if (publisher->is_durability_transient_local()) {
46 publisher_buffers_[pub_id] = buffer;
48 throw std::runtime_error(
49 "transient_local publisher needs to pass"
50 "a valid publisher buffer ptr when calling add_publisher()");
55 pub_to_subs_[pub_id] = SplittedSubscriptions();
58 for (
auto & pair : subscriptions_) {
59 auto subscription = pair.second.lock();
63 if (can_communicate(publisher, subscription)) {
64 uint64_t sub_id = pair.first;
65 insert_sub_id_for_pub(sub_id, pub_id, subscription->use_take_shared_method());
75 std::unique_lock<std::shared_timed_mutex> lock(mutex_);
77 subscriptions_.erase(intra_process_subscription_id);
79 for (
auto & pair : pub_to_subs_) {
80 pair.second.take_shared_subscriptions.erase(
82 pair.second.take_shared_subscriptions.begin(),
83 pair.second.take_shared_subscriptions.end(),
84 intra_process_subscription_id),
85 pair.second.take_shared_subscriptions.end());
87 pair.second.take_ownership_subscriptions.erase(
89 pair.second.take_ownership_subscriptions.begin(),
90 pair.second.take_ownership_subscriptions.end(),
91 intra_process_subscription_id),
92 pair.second.take_ownership_subscriptions.end());
99 std::unique_lock<std::shared_timed_mutex> lock(mutex_);
101 publishers_.erase(intra_process_publisher_id);
102 publisher_buffers_.erase(intra_process_publisher_id);
103 pub_to_subs_.erase(intra_process_publisher_id);
109 std::shared_lock<std::shared_timed_mutex> lock(mutex_);
111 for (
auto & publisher_pair : publishers_) {
112 auto publisher = publisher_pair.second.lock();
116 if (*publisher.get() ==
id) {
126 std::shared_lock<std::shared_timed_mutex> lock(mutex_);
128 auto publisher_it = pub_to_subs_.find(intra_process_publisher_id);
129 if (publisher_it == pub_to_subs_.end()) {
133 "Calling get_subscription_count for invalid or no longer existing publisher id");
138 publisher_it->second.take_shared_subscriptions.size() +
139 publisher_it->second.take_ownership_subscriptions.size();
144 SubscriptionIntraProcessBase::SharedPtr
145 IntraProcessManager::get_subscription_intra_process(uint64_t intra_process_subscription_id)
147 std::shared_lock<std::shared_timed_mutex> lock(mutex_);
149 auto subscription_it = subscriptions_.find(intra_process_subscription_id);
150 if (subscription_it == subscriptions_.end()) {
153 auto subscription = subscription_it->second.lock();
157 subscriptions_.erase(subscription_it);
164 IntraProcessManager::get_next_unique_id()
166 auto next_id = _next_unique_id.fetch_add(1, std::memory_order_relaxed);
177 throw std::overflow_error(
178 "exhausted the unique id's for publishers and subscribers in this process "
179 "(congratulations your computer is either extremely fast or extremely old)");
186 IntraProcessManager::insert_sub_id_for_pub(
189 bool use_take_shared_method)
191 if (use_take_shared_method) {
192 pub_to_subs_[pub_id].take_shared_subscriptions.push_back(sub_id);
194 pub_to_subs_[pub_id].take_ownership_subscriptions.push_back(sub_id);
199 IntraProcessManager::can_communicate(
200 rclcpp::PublisherBase::SharedPtr pub,
201 rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr sub)
const
204 if (strcmp(pub->get_topic_name(), sub->get_topic_name()) != 0) {
209 if (check_result.compatibility == rclcpp::QoSCompatibility::Error) {
219 size_t capacity = std::numeric_limits<size_t>::max();
221 auto publisher_it = pub_to_subs_.find(intra_process_publisher_id);
222 if (publisher_it == pub_to_subs_.end()) {
226 "Calling lowest_available_capacity for invalid or no longer existing publisher id");
230 if (publisher_it->second.take_shared_subscriptions.empty() &&
231 publisher_it->second.take_ownership_subscriptions.empty())
237 auto available_capacity = [
this, &capacity](
const uint64_t intra_process_subscription_id)
239 auto subscription_it = subscriptions_.find(intra_process_subscription_id);
240 if (subscription_it != subscriptions_.end()) {
241 auto subscription = subscription_it->second.lock();
243 capacity = std::min(capacity, subscription->available_capacity());
249 "Calling available_capacity for invalid or no longer existing subscription id");
253 for (
const auto sub_id : publisher_it->second.take_shared_subscriptions) {
254 available_capacity(sub_id);
257 for (
const auto sub_id : publisher_it->second.take_ownership_subscriptions) {
258 available_capacity(sub_id);
RCLCPP_PUBLIC bool matches_any_publishers(const rmw_gid_t *id) const
Return true if the given rmw_gid_t matches any stored Publishers.
RCLCPP_PUBLIC void remove_subscription(uint64_t intra_process_subscription_id)
Unregister a subscription using the subscription's unique id.
RCLCPP_PUBLIC void remove_publisher(uint64_t intra_process_publisher_id)
Unregister a publisher using the publisher's unique id.
RCLCPP_PUBLIC uint64_t add_publisher(rclcpp::PublisherBase::SharedPtr publisher, rclcpp::experimental::buffers::IntraProcessBufferBase::SharedPtr buffer=rclcpp::experimental::buffers::IntraProcessBufferBase::SharedPtr())
Register a publisher with the manager, returns the publisher unique id.
RCLCPP_PUBLIC size_t get_subscription_count(uint64_t intra_process_publisher_id) const
Return the number of intraprocess subscriptions that are matched with a given publisher id.
RCLCPP_PUBLIC size_t lowest_available_capacity(const uint64_t intra_process_publisher_id) const
Return the lowest available capacity for all subscription buffers for a publisher id.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
RCLCPP_PUBLIC QoSCheckCompatibleResult qos_check_compatible(const QoS &publisher_qos, const QoS &subscription_qos)
Check if two QoS profiles are compatible.
RCLCPP_PUBLIC Logger get_logger(const std::string &name)
Return a named logger.