ROS 2 rclcpp + rcl - jazzy  jazzy
ROS 2 C++ Client Library with ROS Client Library
intra_process_manager.cpp
1 // Copyright 2015 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "rclcpp/experimental/intra_process_manager.hpp"
16 
17 #include <atomic>
18 #include <memory>
19 #include <mutex>
20 
21 namespace rclcpp
22 {
23 namespace experimental
24 {
25 
26 static std::atomic<uint64_t> _next_unique_id {1};
27 
28 IntraProcessManager::IntraProcessManager()
29 {}
30 
31 IntraProcessManager::~IntraProcessManager()
32 {}
33 
34 uint64_t
36  rclcpp::PublisherBase::SharedPtr publisher,
37  rclcpp::experimental::buffers::IntraProcessBufferBase::SharedPtr buffer)
38 {
39  std::unique_lock<std::shared_timed_mutex> lock(mutex_);
40 
41  uint64_t pub_id = IntraProcessManager::get_next_unique_id();
42 
43  publishers_[pub_id] = publisher;
44  if (publisher->is_durability_transient_local()) {
45  if (buffer) {
46  publisher_buffers_[pub_id] = buffer;
47  } else {
48  throw std::runtime_error(
49  "transient_local publisher needs to pass"
50  "a valid publisher buffer ptr when calling add_publisher()");
51  }
52  }
53 
54  // Initialize the subscriptions storage for this publisher.
55  pub_to_subs_[pub_id] = SplittedSubscriptions();
56 
57  // create an entry for the publisher id and populate with already existing subscriptions
58  for (auto & pair : subscriptions_) {
59  auto subscription = pair.second.lock();
60  if (!subscription) {
61  continue;
62  }
63  if (can_communicate(publisher, subscription)) {
64  uint64_t sub_id = pair.first;
65  insert_sub_id_for_pub(sub_id, pub_id, subscription->use_take_shared_method());
66  }
67  }
68 
69  return pub_id;
70 }
71 
72 void
73 IntraProcessManager::remove_subscription(uint64_t intra_process_subscription_id)
74 {
75  std::unique_lock<std::shared_timed_mutex> lock(mutex_);
76 
77  subscriptions_.erase(intra_process_subscription_id);
78 
79  for (auto & pair : pub_to_subs_) {
80  pair.second.take_shared_subscriptions.erase(
81  std::remove(
82  pair.second.take_shared_subscriptions.begin(),
83  pair.second.take_shared_subscriptions.end(),
84  intra_process_subscription_id),
85  pair.second.take_shared_subscriptions.end());
86 
87  pair.second.take_ownership_subscriptions.erase(
88  std::remove(
89  pair.second.take_ownership_subscriptions.begin(),
90  pair.second.take_ownership_subscriptions.end(),
91  intra_process_subscription_id),
92  pair.second.take_ownership_subscriptions.end());
93  }
94 }
95 
96 void
97 IntraProcessManager::remove_publisher(uint64_t intra_process_publisher_id)
98 {
99  std::unique_lock<std::shared_timed_mutex> lock(mutex_);
100 
101  publishers_.erase(intra_process_publisher_id);
102  publisher_buffers_.erase(intra_process_publisher_id);
103  pub_to_subs_.erase(intra_process_publisher_id);
104 }
105 
106 bool
108 {
109  std::shared_lock<std::shared_timed_mutex> lock(mutex_);
110 
111  for (auto & publisher_pair : publishers_) {
112  auto publisher = publisher_pair.second.lock();
113  if (!publisher) {
114  continue;
115  }
116  if (*publisher.get() == id) {
117  return true;
118  }
119  }
120  return false;
121 }
122 
123 size_t
124 IntraProcessManager::get_subscription_count(uint64_t intra_process_publisher_id) const
125 {
126  std::shared_lock<std::shared_timed_mutex> lock(mutex_);
127 
128  auto publisher_it = pub_to_subs_.find(intra_process_publisher_id);
129  if (publisher_it == pub_to_subs_.end()) {
130  // Publisher is either invalid or no longer exists.
131  RCLCPP_WARN(
132  rclcpp::get_logger("rclcpp"),
133  "Calling get_subscription_count for invalid or no longer existing publisher id");
134  return 0;
135  }
136 
137  auto count =
138  publisher_it->second.take_shared_subscriptions.size() +
139  publisher_it->second.take_ownership_subscriptions.size();
140 
141  return count;
142 }
143 
144 SubscriptionIntraProcessBase::SharedPtr
145 IntraProcessManager::get_subscription_intra_process(uint64_t intra_process_subscription_id)
146 {
147  std::shared_lock<std::shared_timed_mutex> lock(mutex_);
148 
149  auto subscription_it = subscriptions_.find(intra_process_subscription_id);
150  if (subscription_it == subscriptions_.end()) {
151  return nullptr;
152  } else {
153  auto subscription = subscription_it->second.lock();
154  if (subscription) {
155  return subscription;
156  } else {
157  subscriptions_.erase(subscription_it);
158  return nullptr;
159  }
160  }
161 }
162 
163 uint64_t
164 IntraProcessManager::get_next_unique_id()
165 {
166  auto next_id = _next_unique_id.fetch_add(1, std::memory_order_relaxed);
167  // Check for rollover (we started at 1).
168  if (0 == next_id) {
169  // This puts a technical limit on the number of times you can add a publisher or subscriber.
170  // But even if you could add (and remove) them at 1 kHz (very optimistic rate)
171  // it would still be a very long time before you could exhaust the pool of id's:
172  // 2^64 / 1000 times per sec / 60 sec / 60 min / 24 hours / 365 days = 584,942,417 years
173  // So around 585 million years. Even at 1 GHz, it would take 585 years.
174  // I think it's safe to avoid trying to handle overflow.
175  // If we roll over then it's most likely a bug.
176  // *INDENT-OFF* (prevent uncrustify from making unnecessary indents here)
177  throw std::overflow_error(
178  "exhausted the unique id's for publishers and subscribers in this process "
179  "(congratulations your computer is either extremely fast or extremely old)");
180  // *INDENT-ON*
181  }
182  return next_id;
183 }
184 
185 void
186 IntraProcessManager::insert_sub_id_for_pub(
187  uint64_t sub_id,
188  uint64_t pub_id,
189  bool use_take_shared_method)
190 {
191  if (use_take_shared_method) {
192  pub_to_subs_[pub_id].take_shared_subscriptions.push_back(sub_id);
193  } else {
194  pub_to_subs_[pub_id].take_ownership_subscriptions.push_back(sub_id);
195  }
196 }
197 
198 bool
199 IntraProcessManager::can_communicate(
200  rclcpp::PublisherBase::SharedPtr pub,
201  rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr sub) const
202 {
203  // publisher and subscription must be on the same topic
204  if (strcmp(pub->get_topic_name(), sub->get_topic_name()) != 0) {
205  return false;
206  }
207 
208  auto check_result = rclcpp::qos_check_compatible(pub->get_actual_qos(), sub->get_actual_qos());
209  if (check_result.compatibility == rclcpp::QoSCompatibility::Error) {
210  return false;
211  }
212 
213  return true;
214 }
215 
216 size_t
217 IntraProcessManager::lowest_available_capacity(const uint64_t intra_process_publisher_id) const
218 {
219  size_t capacity = std::numeric_limits<size_t>::max();
220 
221  auto publisher_it = pub_to_subs_.find(intra_process_publisher_id);
222  if (publisher_it == pub_to_subs_.end()) {
223  // Publisher is either invalid or no longer exists.
224  RCLCPP_WARN(
225  rclcpp::get_logger("rclcpp"),
226  "Calling lowest_available_capacity for invalid or no longer existing publisher id");
227  return 0u;
228  }
229 
230  if (publisher_it->second.take_shared_subscriptions.empty() &&
231  publisher_it->second.take_ownership_subscriptions.empty())
232  {
233  // no subscriptions available
234  return 0u;
235  }
236 
237  auto available_capacity = [this, &capacity](const uint64_t intra_process_subscription_id)
238  {
239  auto subscription_it = subscriptions_.find(intra_process_subscription_id);
240  if (subscription_it != subscriptions_.end()) {
241  auto subscription = subscription_it->second.lock();
242  if (subscription) {
243  capacity = std::min(capacity, subscription->available_capacity());
244  }
245  } else {
246  // Subscription is either invalid or no longer exists.
247  RCLCPP_WARN(
248  rclcpp::get_logger("rclcpp"),
249  "Calling available_capacity for invalid or no longer existing subscription id");
250  }
251  };
252 
253  for (const auto sub_id : publisher_it->second.take_shared_subscriptions) {
254  available_capacity(sub_id);
255  }
256 
257  for (const auto sub_id : publisher_it->second.take_ownership_subscriptions) {
258  available_capacity(sub_id);
259  }
260 
261  return capacity;
262 }
263 } // namespace experimental
264 } // namespace rclcpp
RCLCPP_PUBLIC bool matches_any_publishers(const rmw_gid_t *id) const
Return true if the given rmw_gid_t matches any stored Publishers.
RCLCPP_PUBLIC void remove_subscription(uint64_t intra_process_subscription_id)
Unregister a subscription using the subscription's unique id.
RCLCPP_PUBLIC void remove_publisher(uint64_t intra_process_publisher_id)
Unregister a publisher using the publisher's unique id.
RCLCPP_PUBLIC uint64_t add_publisher(rclcpp::PublisherBase::SharedPtr publisher, rclcpp::experimental::buffers::IntraProcessBufferBase::SharedPtr buffer=rclcpp::experimental::buffers::IntraProcessBufferBase::SharedPtr())
Register a publisher with the manager, returns the publisher unique id.
RCLCPP_PUBLIC size_t get_subscription_count(uint64_t intra_process_publisher_id) const
Return the number of intraprocess subscriptions that are matched with a given publisher id.
RCLCPP_PUBLIC size_t lowest_available_capacity(const uint64_t intra_process_publisher_id) const
Return the lowest available capacity for all subscription buffers for a publisher id.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
RCLCPP_PUBLIC QoSCheckCompatibleResult qos_check_compatible(const QoS &publisher_qos, const QoS &subscription_qos)
Check if two QoS profiles are compatible.
Definition: qos.cpp:351
RCLCPP_PUBLIC Logger get_logger(const std::string &name)
Return a named logger.
Definition: logger.cpp:33