ROS 2 rclcpp + rcl - humble  humble
ROS 2 C++ Client Library with ROS Client Library
intra_process_manager.cpp
1 // Copyright 2015 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "rclcpp/experimental/intra_process_manager.hpp"
16 
17 #include <atomic>
18 #include <memory>
19 #include <mutex>
20 
21 namespace rclcpp
22 {
23 namespace experimental
24 {
25 
26 static std::atomic<uint64_t> _next_unique_id {1};
27 
28 IntraProcessManager::IntraProcessManager()
29 {}
30 
31 IntraProcessManager::~IntraProcessManager()
32 {}
33 
34 uint64_t
35 IntraProcessManager::add_publisher(rclcpp::PublisherBase::SharedPtr publisher)
36 {
37  std::unique_lock<std::shared_timed_mutex> lock(mutex_);
38 
39  uint64_t pub_id = IntraProcessManager::get_next_unique_id();
40 
41  publishers_[pub_id] = publisher;
42 
43  // Initialize the subscriptions storage for this publisher.
44  pub_to_subs_[pub_id] = SplittedSubscriptions();
45 
46  // create an entry for the publisher id and populate with already existing subscriptions
47  for (auto & pair : subscriptions_) {
48  auto subscription = pair.second.lock();
49  if (!subscription) {
50  continue;
51  }
52  if (can_communicate(publisher, subscription)) {
53  uint64_t sub_id = pair.first;
54  insert_sub_id_for_pub(sub_id, pub_id, subscription->use_take_shared_method());
55  }
56  }
57 
58  return pub_id;
59 }
60 
61 uint64_t
62 IntraProcessManager::add_subscription(SubscriptionIntraProcessBase::SharedPtr subscription)
63 {
64  std::unique_lock<std::shared_timed_mutex> lock(mutex_);
65 
66  uint64_t sub_id = IntraProcessManager::get_next_unique_id();
67 
68  subscriptions_[sub_id] = subscription;
69 
70  // adds the subscription id to all the matchable publishers
71  for (auto & pair : publishers_) {
72  auto publisher = pair.second.lock();
73  if (!publisher) {
74  continue;
75  }
76  if (can_communicate(publisher, subscription)) {
77  uint64_t pub_id = pair.first;
78  insert_sub_id_for_pub(sub_id, pub_id, subscription->use_take_shared_method());
79  }
80  }
81 
82  return sub_id;
83 }
84 
85 void
86 IntraProcessManager::remove_subscription(uint64_t intra_process_subscription_id)
87 {
88  std::unique_lock<std::shared_timed_mutex> lock(mutex_);
89 
90  subscriptions_.erase(intra_process_subscription_id);
91 
92  for (auto & pair : pub_to_subs_) {
93  pair.second.take_shared_subscriptions.erase(
94  std::remove(
95  pair.second.take_shared_subscriptions.begin(),
96  pair.second.take_shared_subscriptions.end(),
97  intra_process_subscription_id),
98  pair.second.take_shared_subscriptions.end());
99 
100  pair.second.take_ownership_subscriptions.erase(
101  std::remove(
102  pair.second.take_ownership_subscriptions.begin(),
103  pair.second.take_ownership_subscriptions.end(),
104  intra_process_subscription_id),
105  pair.second.take_ownership_subscriptions.end());
106  }
107 }
108 
109 void
110 IntraProcessManager::remove_publisher(uint64_t intra_process_publisher_id)
111 {
112  std::unique_lock<std::shared_timed_mutex> lock(mutex_);
113 
114  publishers_.erase(intra_process_publisher_id);
115  pub_to_subs_.erase(intra_process_publisher_id);
116 }
117 
118 bool
120 {
121  std::shared_lock<std::shared_timed_mutex> lock(mutex_);
122 
123  for (auto & publisher_pair : publishers_) {
124  auto publisher = publisher_pair.second.lock();
125  if (!publisher) {
126  continue;
127  }
128  if (*publisher.get() == id) {
129  return true;
130  }
131  }
132  return false;
133 }
134 
135 size_t
136 IntraProcessManager::get_subscription_count(uint64_t intra_process_publisher_id) const
137 {
138  std::shared_lock<std::shared_timed_mutex> lock(mutex_);
139 
140  auto publisher_it = pub_to_subs_.find(intra_process_publisher_id);
141  if (publisher_it == pub_to_subs_.end()) {
142  // Publisher is either invalid or no longer exists.
143  RCLCPP_WARN(
144  rclcpp::get_logger("rclcpp"),
145  "Calling get_subscription_count for invalid or no longer existing publisher id");
146  return 0;
147  }
148 
149  auto count =
150  publisher_it->second.take_shared_subscriptions.size() +
151  publisher_it->second.take_ownership_subscriptions.size();
152 
153  return count;
154 }
155 
156 SubscriptionIntraProcessBase::SharedPtr
157 IntraProcessManager::get_subscription_intra_process(uint64_t intra_process_subscription_id)
158 {
159  std::shared_lock<std::shared_timed_mutex> lock(mutex_);
160 
161  auto subscription_it = subscriptions_.find(intra_process_subscription_id);
162  if (subscription_it == subscriptions_.end()) {
163  return nullptr;
164  } else {
165  auto subscription = subscription_it->second.lock();
166  if (subscription) {
167  return subscription;
168  } else {
169  subscriptions_.erase(subscription_it);
170  return nullptr;
171  }
172  }
173 }
174 
175 uint64_t
176 IntraProcessManager::get_next_unique_id()
177 {
178  auto next_id = _next_unique_id.fetch_add(1, std::memory_order_relaxed);
179  // Check for rollover (we started at 1).
180  if (0 == next_id) {
181  // This puts a technical limit on the number of times you can add a publisher or subscriber.
182  // But even if you could add (and remove) them at 1 kHz (very optimistic rate)
183  // it would still be a very long time before you could exhaust the pool of id's:
184  // 2^64 / 1000 times per sec / 60 sec / 60 min / 24 hours / 365 days = 584,942,417 years
185  // So around 585 million years. Even at 1 GHz, it would take 585 years.
186  // I think it's safe to avoid trying to handle overflow.
187  // If we roll over then it's most likely a bug.
188  // *INDENT-OFF* (prevent uncrustify from making unnecessary indents here)
189  throw std::overflow_error(
190  "exhausted the unique id's for publishers and subscribers in this process "
191  "(congratulations your computer is either extremely fast or extremely old)");
192  // *INDENT-ON*
193  }
194  return next_id;
195 }
196 
197 void
198 IntraProcessManager::insert_sub_id_for_pub(
199  uint64_t sub_id,
200  uint64_t pub_id,
201  bool use_take_shared_method)
202 {
203  if (use_take_shared_method) {
204  pub_to_subs_[pub_id].take_shared_subscriptions.push_back(sub_id);
205  } else {
206  pub_to_subs_[pub_id].take_ownership_subscriptions.push_back(sub_id);
207  }
208 }
209 
210 bool
211 IntraProcessManager::can_communicate(
212  rclcpp::PublisherBase::SharedPtr pub,
213  rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr sub) const
214 {
215  // publisher and subscription must be on the same topic
216  if (strcmp(pub->get_topic_name(), sub->get_topic_name()) != 0) {
217  return false;
218  }
219 
220  auto check_result = rclcpp::qos_check_compatible(pub->get_actual_qos(), sub->get_actual_qos());
221  if (check_result.compatibility == rclcpp::QoSCompatibility::Error) {
222  return false;
223  }
224 
225  return true;
226 }
227 
228 } // namespace experimental
229 } // namespace rclcpp
RCLCPP_PUBLIC bool matches_any_publishers(const rmw_gid_t *id) const
Return true if the given rmw_gid_t matches any stored Publishers.
RCLCPP_PUBLIC uint64_t add_subscription(rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription)
Register a subscription with the manager, returns subscriptions unique id.
RCLCPP_PUBLIC void remove_subscription(uint64_t intra_process_subscription_id)
Unregister a subscription using the subscription's unique id.
RCLCPP_PUBLIC uint64_t add_publisher(rclcpp::PublisherBase::SharedPtr publisher)
Register a publisher with the manager, returns the publisher unique id.
RCLCPP_PUBLIC void remove_publisher(uint64_t intra_process_publisher_id)
Unregister a publisher using the publisher's unique id.
RCLCPP_PUBLIC size_t get_subscription_count(uint64_t intra_process_publisher_id) const
Return the number of intraprocess subscriptions that are matched with a given publisher id.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
RCLCPP_PUBLIC QoSCheckCompatibleResult qos_check_compatible(const QoS &publisher_qos, const QoS &subscription_qos)
Check if two QoS profiles are compatible.
Definition: qos.cpp:317
RCLCPP_PUBLIC Logger get_logger(const std::string &name)
Return a named logger.
Definition: logger.cpp:27