15 #include "rclcpp/experimental/intra_process_manager.hpp"
23 namespace experimental
26 static std::atomic<uint64_t> _next_unique_id {1};
28 IntraProcessManager::IntraProcessManager()
31 IntraProcessManager::~IntraProcessManager()
37 std::unique_lock<std::shared_timed_mutex> lock(mutex_);
39 uint64_t pub_id = IntraProcessManager::get_next_unique_id();
41 publishers_[pub_id] = publisher;
44 pub_to_subs_[pub_id] = SplittedSubscriptions();
47 for (
auto & pair : subscriptions_) {
48 auto subscription = pair.second.lock();
52 if (can_communicate(publisher, subscription)) {
53 uint64_t sub_id = pair.first;
54 insert_sub_id_for_pub(sub_id, pub_id, subscription->use_take_shared_method());
64 std::unique_lock<std::shared_timed_mutex> lock(mutex_);
66 uint64_t sub_id = IntraProcessManager::get_next_unique_id();
68 subscriptions_[sub_id] = subscription;
71 for (
auto & pair : publishers_) {
72 auto publisher = pair.second.lock();
76 if (can_communicate(publisher, subscription)) {
77 uint64_t pub_id = pair.first;
78 insert_sub_id_for_pub(sub_id, pub_id, subscription->use_take_shared_method());
88 std::unique_lock<std::shared_timed_mutex> lock(mutex_);
90 subscriptions_.erase(intra_process_subscription_id);
92 for (
auto & pair : pub_to_subs_) {
93 pair.second.take_shared_subscriptions.erase(
95 pair.second.take_shared_subscriptions.begin(),
96 pair.second.take_shared_subscriptions.end(),
97 intra_process_subscription_id),
98 pair.second.take_shared_subscriptions.end());
100 pair.second.take_ownership_subscriptions.erase(
102 pair.second.take_ownership_subscriptions.begin(),
103 pair.second.take_ownership_subscriptions.end(),
104 intra_process_subscription_id),
105 pair.second.take_ownership_subscriptions.end());
112 std::unique_lock<std::shared_timed_mutex> lock(mutex_);
114 publishers_.erase(intra_process_publisher_id);
115 pub_to_subs_.erase(intra_process_publisher_id);
121 std::shared_lock<std::shared_timed_mutex> lock(mutex_);
123 for (
auto & publisher_pair : publishers_) {
124 auto publisher = publisher_pair.second.lock();
128 if (*publisher.get() ==
id) {
138 std::shared_lock<std::shared_timed_mutex> lock(mutex_);
140 auto publisher_it = pub_to_subs_.find(intra_process_publisher_id);
141 if (publisher_it == pub_to_subs_.end()) {
145 "Calling get_subscription_count for invalid or no longer existing publisher id");
150 publisher_it->second.take_shared_subscriptions.size() +
151 publisher_it->second.take_ownership_subscriptions.size();
156 SubscriptionIntraProcessBase::SharedPtr
157 IntraProcessManager::get_subscription_intra_process(uint64_t intra_process_subscription_id)
159 std::shared_lock<std::shared_timed_mutex> lock(mutex_);
161 auto subscription_it = subscriptions_.find(intra_process_subscription_id);
162 if (subscription_it == subscriptions_.end()) {
165 auto subscription = subscription_it->second.lock();
169 subscriptions_.erase(subscription_it);
176 IntraProcessManager::get_next_unique_id()
178 auto next_id = _next_unique_id.fetch_add(1, std::memory_order_relaxed);
189 throw std::overflow_error(
190 "exhausted the unique id's for publishers and subscribers in this process "
191 "(congratulations your computer is either extremely fast or extremely old)");
198 IntraProcessManager::insert_sub_id_for_pub(
201 bool use_take_shared_method)
203 if (use_take_shared_method) {
204 pub_to_subs_[pub_id].take_shared_subscriptions.push_back(sub_id);
206 pub_to_subs_[pub_id].take_ownership_subscriptions.push_back(sub_id);
211 IntraProcessManager::can_communicate(
212 rclcpp::PublisherBase::SharedPtr pub,
213 rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr sub)
const
216 if (strcmp(pub->get_topic_name(), sub->get_topic_name()) != 0) {
221 if (check_result.compatibility == rclcpp::QoSCompatibility::Error) {
RCLCPP_PUBLIC bool matches_any_publishers(const rmw_gid_t *id) const
Return true if the given rmw_gid_t matches any stored Publishers.
RCLCPP_PUBLIC uint64_t add_subscription(rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription)
Register a subscription with the manager, returns subscriptions unique id.
RCLCPP_PUBLIC void remove_subscription(uint64_t intra_process_subscription_id)
Unregister a subscription using the subscription's unique id.
RCLCPP_PUBLIC uint64_t add_publisher(rclcpp::PublisherBase::SharedPtr publisher)
Register a publisher with the manager, returns the publisher unique id.
RCLCPP_PUBLIC void remove_publisher(uint64_t intra_process_publisher_id)
Unregister a publisher using the publisher's unique id.
RCLCPP_PUBLIC size_t get_subscription_count(uint64_t intra_process_publisher_id) const
Return the number of intraprocess subscriptions that are matched with a given publisher id.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
RCLCPP_PUBLIC QoSCheckCompatibleResult qos_check_compatible(const QoS &publisher_qos, const QoS &subscription_qos)
Check if two QoS profiles are compatible.
RCLCPP_PUBLIC Logger get_logger(const std::string &name)
Return a named logger.