ROS 2 rclcpp + rcl - humble  humble
ROS 2 C++ Client Library with ROS Client Library
intra_process_manager.hpp
1 // Copyright 2019 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef RCLCPP__EXPERIMENTAL__INTRA_PROCESS_MANAGER_HPP_
16 #define RCLCPP__EXPERIMENTAL__INTRA_PROCESS_MANAGER_HPP_
17 
18 #include <rmw/types.h>
19 
20 #include <shared_mutex>
21 
22 #include <iterator>
23 #include <memory>
24 #include <stdexcept>
25 #include <unordered_map>
26 #include <utility>
27 #include <vector>
28 #include <typeinfo>
29 
30 #include "rclcpp/allocator/allocator_deleter.hpp"
31 #include "rclcpp/experimental/ros_message_intra_process_buffer.hpp"
32 #include "rclcpp/experimental/subscription_intra_process.hpp"
33 #include "rclcpp/experimental/subscription_intra_process_base.hpp"
34 #include "rclcpp/experimental/subscription_intra_process_buffer.hpp"
35 #include "rclcpp/logger.hpp"
36 #include "rclcpp/logging.hpp"
37 #include "rclcpp/macros.hpp"
38 #include "rclcpp/publisher_base.hpp"
39 #include "rclcpp/type_adapter.hpp"
40 #include "rclcpp/visibility_control.hpp"
41 
42 namespace rclcpp
43 {
44 
45 namespace experimental
46 {
47 
49 
92 {
93 private:
94  RCLCPP_DISABLE_COPY(IntraProcessManager)
95 
96 public:
97  RCLCPP_SMART_PTR_DEFINITIONS(IntraProcessManager)
98 
99  RCLCPP_PUBLIC
101 
102  RCLCPP_PUBLIC
103  virtual ~IntraProcessManager();
104 
106 
115  RCLCPP_PUBLIC
116  uint64_t
117  add_subscription(rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription);
118 
120 
125  RCLCPP_PUBLIC
126  void
127  remove_subscription(uint64_t intra_process_subscription_id);
128 
130 
139  RCLCPP_PUBLIC
140  uint64_t
141  add_publisher(rclcpp::PublisherBase::SharedPtr publisher);
142 
144 
149  RCLCPP_PUBLIC
150  void
151  remove_publisher(uint64_t intra_process_publisher_id);
152 
154 
177  template<
178  typename MessageT,
179  typename ROSMessageType,
180  typename Alloc,
181  typename Deleter = std::default_delete<MessageT>
182  >
183  void
185  uint64_t intra_process_publisher_id,
186  std::unique_ptr<MessageT, Deleter> message,
187  typename allocator::AllocRebind<MessageT, Alloc>::allocator_type & allocator)
188  {
189  using MessageAllocTraits = allocator::AllocRebind<MessageT, Alloc>;
190  using MessageAllocatorT = typename MessageAllocTraits::allocator_type;
191 
192  std::shared_lock<std::shared_timed_mutex> lock(mutex_);
193 
194  auto publisher_it = pub_to_subs_.find(intra_process_publisher_id);
195  if (publisher_it == pub_to_subs_.end()) {
196  // Publisher is either invalid or no longer exists.
197  RCLCPP_WARN(
198  rclcpp::get_logger("rclcpp"),
199  "Calling do_intra_process_publish for invalid or no longer existing publisher id");
200  return;
201  }
202  const auto & sub_ids = publisher_it->second;
203 
204  if (sub_ids.take_ownership_subscriptions.empty()) {
205  // None of the buffers require ownership, so we promote the pointer
206  std::shared_ptr<MessageT> msg = std::move(message);
207 
208  this->template add_shared_msg_to_buffers<MessageT, Alloc, Deleter, ROSMessageType>(
209  msg, sub_ids.take_shared_subscriptions);
210  } else if (!sub_ids.take_ownership_subscriptions.empty() && // NOLINT
211  sub_ids.take_shared_subscriptions.size() <= 1)
212  {
213  // There is at maximum 1 buffer that does not require ownership.
214  // So this case is equivalent to all the buffers requiring ownership
215 
216  // Merge the two vector of ids into a unique one
217  std::vector<uint64_t> concatenated_vector(sub_ids.take_shared_subscriptions);
218  concatenated_vector.insert(
219  concatenated_vector.end(),
220  sub_ids.take_ownership_subscriptions.begin(),
221  sub_ids.take_ownership_subscriptions.end());
222  this->template add_owned_msg_to_buffers<MessageT, Alloc, Deleter, ROSMessageType>(
223  std::move(message),
224  concatenated_vector,
225  allocator);
226  } else if (!sub_ids.take_ownership_subscriptions.empty() && // NOLINT
227  sub_ids.take_shared_subscriptions.size() > 1)
228  {
229  // Construct a new shared pointer from the message
230  // for the buffers that do not require ownership
231  auto shared_msg = std::allocate_shared<MessageT, MessageAllocatorT>(allocator, *message);
232 
233  this->template add_shared_msg_to_buffers<MessageT, Alloc, Deleter, ROSMessageType>(
234  shared_msg, sub_ids.take_shared_subscriptions);
235  this->template add_owned_msg_to_buffers<MessageT, Alloc, Deleter, ROSMessageType>(
236  std::move(message), sub_ids.take_ownership_subscriptions, allocator);
237  }
238  }
239 
240  template<
241  typename MessageT,
242  typename ROSMessageType,
243  typename Alloc,
244  typename Deleter = std::default_delete<MessageT>
245  >
246  std::shared_ptr<const MessageT>
247  do_intra_process_publish_and_return_shared(
248  uint64_t intra_process_publisher_id,
249  std::unique_ptr<MessageT, Deleter> message,
250  typename allocator::AllocRebind<MessageT, Alloc>::allocator_type & allocator)
251  {
252  using MessageAllocTraits = allocator::AllocRebind<MessageT, Alloc>;
253  using MessageAllocatorT = typename MessageAllocTraits::allocator_type;
254 
255  std::shared_lock<std::shared_timed_mutex> lock(mutex_);
256 
257  auto publisher_it = pub_to_subs_.find(intra_process_publisher_id);
258  if (publisher_it == pub_to_subs_.end()) {
259  // Publisher is either invalid or no longer exists.
260  RCLCPP_WARN(
261  rclcpp::get_logger("rclcpp"),
262  "Calling do_intra_process_publish for invalid or no longer existing publisher id");
263  return nullptr;
264  }
265  const auto & sub_ids = publisher_it->second;
266 
267  if (sub_ids.take_ownership_subscriptions.empty()) {
268  // If there are no owning, just convert to shared.
269  std::shared_ptr<MessageT> shared_msg = std::move(message);
270  if (!sub_ids.take_shared_subscriptions.empty()) {
271  this->template add_shared_msg_to_buffers<MessageT, Alloc, Deleter, ROSMessageType>(
272  shared_msg, sub_ids.take_shared_subscriptions);
273  }
274  return shared_msg;
275  } else {
276  // Construct a new shared pointer from the message for the buffers that
277  // do not require ownership and to return.
278  auto shared_msg = std::allocate_shared<MessageT, MessageAllocatorT>(allocator, *message);
279 
280  if (!sub_ids.take_shared_subscriptions.empty()) {
281  this->template add_shared_msg_to_buffers<MessageT, Alloc, Deleter, ROSMessageType>(
282  shared_msg,
283  sub_ids.take_shared_subscriptions);
284  }
285  if (!sub_ids.take_ownership_subscriptions.empty()) {
286  this->template add_owned_msg_to_buffers<MessageT, Alloc, Deleter, ROSMessageType>(
287  std::move(message),
288  sub_ids.take_ownership_subscriptions,
289  allocator);
290  }
291  return shared_msg;
292  }
293  }
294 
296  RCLCPP_PUBLIC
297  bool
298  matches_any_publishers(const rmw_gid_t * id) const;
299 
301  RCLCPP_PUBLIC
302  size_t
303  get_subscription_count(uint64_t intra_process_publisher_id) const;
304 
305  RCLCPP_PUBLIC
306  rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr
307  get_subscription_intra_process(uint64_t intra_process_subscription_id);
308 
309 private:
310  struct SplittedSubscriptions
311  {
312  std::vector<uint64_t> take_shared_subscriptions;
313  std::vector<uint64_t> take_ownership_subscriptions;
314  };
315 
316  using SubscriptionMap =
317  std::unordered_map<uint64_t, rclcpp::experimental::SubscriptionIntraProcessBase::WeakPtr>;
318 
319  using PublisherMap =
320  std::unordered_map<uint64_t, rclcpp::PublisherBase::WeakPtr>;
321 
322  using PublisherToSubscriptionIdsMap =
323  std::unordered_map<uint64_t, SplittedSubscriptions>;
324 
325  RCLCPP_PUBLIC
326  static
327  uint64_t
328  get_next_unique_id();
329 
330  RCLCPP_PUBLIC
331  void
332  insert_sub_id_for_pub(uint64_t sub_id, uint64_t pub_id, bool use_take_shared_method);
333 
334  RCLCPP_PUBLIC
335  bool
336  can_communicate(
337  rclcpp::PublisherBase::SharedPtr pub,
338  rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr sub) const;
339 
340  template<
341  typename MessageT,
342  typename Alloc,
343  typename Deleter,
344  typename ROSMessageType>
345  void
346  add_shared_msg_to_buffers(
347  std::shared_ptr<const MessageT> message,
348  std::vector<uint64_t> subscription_ids)
349  {
350  using ROSMessageTypeAllocatorTraits = allocator::AllocRebind<ROSMessageType, Alloc>;
351  using ROSMessageTypeAllocator = typename ROSMessageTypeAllocatorTraits::allocator_type;
352  using ROSMessageTypeDeleter = allocator::Deleter<ROSMessageTypeAllocator, ROSMessageType>;
353 
354  using PublishedType = typename rclcpp::TypeAdapter<MessageT>::custom_type;
355  using PublishedTypeAllocatorTraits = allocator::AllocRebind<PublishedType, Alloc>;
356  using PublishedTypeAllocator = typename PublishedTypeAllocatorTraits::allocator_type;
357  using PublishedTypeDeleter = allocator::Deleter<PublishedTypeAllocator, PublishedType>;
358 
359  for (auto id : subscription_ids) {
360  auto subscription_it = subscriptions_.find(id);
361  if (subscription_it == subscriptions_.end()) {
362  throw std::runtime_error("subscription has unexpectedly gone out of scope");
363  }
364  auto subscription_base = subscription_it->second.lock();
365  if (subscription_base == nullptr) {
366  subscriptions_.erase(id);
367  continue;
368  }
369 
370  auto subscription = std::dynamic_pointer_cast<
372  PublishedTypeAllocator, PublishedTypeDeleter, ROSMessageType>
373  >(subscription_base);
374  if (subscription != nullptr) {
375  subscription->provide_intra_process_data(message);
376  continue;
377  }
378 
379  auto ros_message_subscription = std::dynamic_pointer_cast<
381  ROSMessageTypeAllocator, ROSMessageTypeDeleter>
382  >(subscription_base);
383  if (nullptr == ros_message_subscription) {
384  throw std::runtime_error(
385  "failed to dynamic cast SubscriptionIntraProcessBase to "
386  "SubscriptionIntraProcessBuffer<MessageT, Alloc, Deleter>, or to "
387  "SubscriptionROSMsgIntraProcessBuffer<ROSMessageType,ROSMessageTypeAllocator,"
388  "ROSMessageTypeDeleter> which can happen when the publisher and "
389  "subscription use different allocator types, which is not supported");
390  }
391 
393  ROSMessageType ros_msg;
395  ros_message_subscription->provide_intra_process_message(
396  std::make_shared<ROSMessageType>(ros_msg));
397  } else {
398  if constexpr (std::is_same<MessageT, ROSMessageType>::value) {
399  ros_message_subscription->provide_intra_process_message(message);
400  } else {
401  if constexpr (std::is_same<typename rclcpp::TypeAdapter<MessageT,
402  ROSMessageType>::ros_message_type, ROSMessageType>::value)
403  {
404  ROSMessageType ros_msg;
406  *message, ros_msg);
407  ros_message_subscription->provide_intra_process_message(
408  std::make_shared<ROSMessageType>(ros_msg));
409  }
410  }
411  }
412  }
413  }
414 
415  template<
416  typename MessageT,
417  typename Alloc,
418  typename Deleter,
419  typename ROSMessageType>
420  void
421  add_owned_msg_to_buffers(
422  std::unique_ptr<MessageT, Deleter> message,
423  std::vector<uint64_t> subscription_ids,
424  typename allocator::AllocRebind<MessageT, Alloc>::allocator_type & allocator)
425  {
426  using MessageAllocTraits = allocator::AllocRebind<MessageT, Alloc>;
427  using MessageUniquePtr = std::unique_ptr<MessageT, Deleter>;
428 
429  using ROSMessageTypeAllocatorTraits = allocator::AllocRebind<ROSMessageType, Alloc>;
430  using ROSMessageTypeAllocator = typename ROSMessageTypeAllocatorTraits::allocator_type;
431  using ROSMessageTypeDeleter = allocator::Deleter<ROSMessageTypeAllocator, ROSMessageType>;
432 
433  using PublishedType = typename rclcpp::TypeAdapter<MessageT>::custom_type;
434  using PublishedTypeAllocatorTraits = allocator::AllocRebind<PublishedType, Alloc>;
435  using PublishedTypeAllocator = typename PublishedTypeAllocatorTraits::allocator_type;
436  using PublishedTypeDeleter = allocator::Deleter<PublishedTypeAllocator, PublishedType>;
437 
438  for (auto it = subscription_ids.begin(); it != subscription_ids.end(); it++) {
439  auto subscription_it = subscriptions_.find(*it);
440  if (subscription_it == subscriptions_.end()) {
441  throw std::runtime_error("subscription has unexpectedly gone out of scope");
442  }
443  auto subscription_base = subscription_it->second.lock();
444  if (subscription_base == nullptr) {
445  subscriptions_.erase(subscription_it);
446  continue;
447  }
448 
449  auto subscription = std::dynamic_pointer_cast<
451  PublishedTypeAllocator, PublishedTypeDeleter, ROSMessageType>
452  >(subscription_base);
453  if (subscription != nullptr) {
454  if (std::next(it) == subscription_ids.end()) {
455  // If this is the last subscription, give up ownership
456  subscription->provide_intra_process_data(std::move(message));
457  // Last message delivered, break from for loop
458  break;
459  } else {
460  // Copy the message since we have additional subscriptions to serve
461  Deleter deleter = message.get_deleter();
462  auto ptr = MessageAllocTraits::allocate(allocator, 1);
463  MessageAllocTraits::construct(allocator, ptr, *message);
464 
465  subscription->provide_intra_process_data(std::move(MessageUniquePtr(ptr, deleter)));
466  }
467 
468  continue;
469  }
470 
471  auto ros_message_subscription = std::dynamic_pointer_cast<
473  ROSMessageTypeAllocator, ROSMessageTypeDeleter>
474  >(subscription_base);
475  if (nullptr == ros_message_subscription) {
476  throw std::runtime_error(
477  "failed to dynamic cast SubscriptionIntraProcessBase to "
478  "SubscriptionIntraProcessBuffer<MessageT, Alloc, Deleter>, or to "
479  "SubscriptionROSMsgIntraProcessBuffer<ROSMessageType,ROSMessageTypeAllocator,"
480  "ROSMessageTypeDeleter> which can happen when the publisher and "
481  "subscription use different allocator types, which is not supported");
482  }
483 
485  ROSMessageTypeAllocator ros_message_alloc(allocator);
486  auto ptr = ROSMessageTypeAllocatorTraits::allocate(ros_message_alloc, 1);
487  ROSMessageTypeAllocatorTraits::construct(ros_message_alloc, ptr);
488  ROSMessageTypeDeleter deleter;
489  allocator::set_allocator_for_deleter(&deleter, &allocator);
491  auto ros_msg = std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter>(ptr, deleter);
492  ros_message_subscription->provide_intra_process_message(std::move(ros_msg));
493  } else {
494  if constexpr (std::is_same<MessageT, ROSMessageType>::value) {
495  if (std::next(it) == subscription_ids.end()) {
496  // If this is the last subscription, give up ownership
497  ros_message_subscription->provide_intra_process_message(std::move(message));
498  // Last message delivered, break from for loop
499  break;
500  } else {
501  // Copy the message since we have additional subscriptions to serve
502  Deleter deleter = message.get_deleter();
503  allocator::set_allocator_for_deleter(&deleter, &allocator);
504  auto ptr = MessageAllocTraits::allocate(allocator, 1);
505  MessageAllocTraits::construct(allocator, ptr, *message);
506 
507  ros_message_subscription->provide_intra_process_message(
508  std::move(MessageUniquePtr(ptr, deleter)));
509  }
510  }
511  }
512  }
513  }
514 
515  PublisherToSubscriptionIdsMap pub_to_subs_;
516  SubscriptionMap subscriptions_;
517  PublisherMap publishers_;
518 
519  mutable std::shared_timed_mutex mutex_;
520 };
521 
522 } // namespace experimental
523 } // namespace rclcpp
524 
525 #endif // RCLCPP__EXPERIMENTAL__INTRA_PROCESS_MANAGER_HPP_
This class performs intra process communication between nodes.
RCLCPP_PUBLIC bool matches_any_publishers(const rmw_gid_t *id) const
Return true if the given rmw_gid_t matches any stored Publishers.
RCLCPP_PUBLIC uint64_t add_subscription(rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription)
Register a subscription with the manager, returns subscriptions unique id.
RCLCPP_PUBLIC void remove_subscription(uint64_t intra_process_subscription_id)
Unregister a subscription using the subscription's unique id.
void do_intra_process_publish(uint64_t intra_process_publisher_id, std::unique_ptr< MessageT, Deleter > message, typename allocator::AllocRebind< MessageT, Alloc >::allocator_type &allocator)
Publishes an intra-process message, passed as a unique pointer.
RCLCPP_PUBLIC uint64_t add_publisher(rclcpp::PublisherBase::SharedPtr publisher)
Register a publisher with the manager, returns the publisher unique id.
RCLCPP_PUBLIC void remove_publisher(uint64_t intra_process_publisher_id)
Unregister a publisher using the publisher's unique id.
RCLCPP_PUBLIC size_t get_subscription_count(uint64_t intra_process_publisher_id) const
Return the number of intraprocess subscriptions that are matched with a given publisher id.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
RCLCPP_PUBLIC Logger get_logger(const std::string &name)
Return a named logger.
Definition: logger.cpp:27
Template structure used to adapt custom, user-defined types to ROS types.