ROS 2 rclcpp + rcl - jazzy  jazzy
ROS 2 C++ Client Library with ROS Client Library
intra_process_manager.hpp
1 // Copyright 2019 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef RCLCPP__EXPERIMENTAL__INTRA_PROCESS_MANAGER_HPP_
16 #define RCLCPP__EXPERIMENTAL__INTRA_PROCESS_MANAGER_HPP_
17 
18 #include <rmw/types.h>
19 
20 #include <shared_mutex>
21 
22 #include <iterator>
23 #include <memory>
24 #include <stdexcept>
25 #include <unordered_map>
26 #include <utility>
27 #include <vector>
28 #include <typeinfo>
29 
30 #include "rclcpp/allocator/allocator_deleter.hpp"
31 #include "rclcpp/experimental/buffers/intra_process_buffer.hpp"
32 #include "rclcpp/experimental/ros_message_intra_process_buffer.hpp"
33 #include "rclcpp/experimental/subscription_intra_process.hpp"
34 #include "rclcpp/experimental/subscription_intra_process_base.hpp"
35 #include "rclcpp/experimental/subscription_intra_process_buffer.hpp"
36 #include "rclcpp/logger.hpp"
37 #include "rclcpp/logging.hpp"
38 #include "rclcpp/macros.hpp"
39 #include "rclcpp/publisher_base.hpp"
40 #include "rclcpp/type_adapter.hpp"
41 #include "rclcpp/visibility_control.hpp"
42 
43 namespace rclcpp
44 {
45 
46 namespace experimental
47 {
48 
50 
93 {
94 private:
95  RCLCPP_DISABLE_COPY(IntraProcessManager)
96 
97 public:
98  RCLCPP_SMART_PTR_DEFINITIONS(IntraProcessManager)
99 
100  RCLCPP_PUBLIC
102 
103  RCLCPP_PUBLIC
104  virtual ~IntraProcessManager();
105 
107 
116  template<
117  typename ROSMessageType,
118  typename Alloc = std::allocator<ROSMessageType>
119  >
120  uint64_t
121  add_subscription(rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription)
122  {
123  std::unique_lock<std::shared_timed_mutex> lock(mutex_);
124 
125  uint64_t sub_id = IntraProcessManager::get_next_unique_id();
126 
127  subscriptions_[sub_id] = subscription;
128 
129  // adds the subscription id to all the matchable publishers
130  for (auto & pair : publishers_) {
131  auto publisher = pair.second.lock();
132  if (!publisher) {
133  continue;
134  }
135  if (can_communicate(publisher, subscription)) {
136  uint64_t pub_id = pair.first;
137  insert_sub_id_for_pub(sub_id, pub_id, subscription->use_take_shared_method());
138  if (publisher->is_durability_transient_local() &&
139  subscription->is_durability_transient_local())
140  {
141  do_transient_local_publish<ROSMessageType, Alloc>(
142  pub_id, sub_id,
143  subscription->use_take_shared_method());
144  }
145  }
146  }
147 
148  return sub_id;
149  }
150 
152 
157  RCLCPP_PUBLIC
158  void
159  remove_subscription(uint64_t intra_process_subscription_id);
160 
162 
175  RCLCPP_PUBLIC
176  uint64_t
178  rclcpp::PublisherBase::SharedPtr publisher,
179  rclcpp::experimental::buffers::IntraProcessBufferBase::SharedPtr buffer =
180  rclcpp::experimental::buffers::IntraProcessBufferBase::SharedPtr());
181 
183 
188  RCLCPP_PUBLIC
189  void
190  remove_publisher(uint64_t intra_process_publisher_id);
191 
193 
216  template<
217  typename MessageT,
218  typename ROSMessageType,
219  typename Alloc,
220  typename Deleter = std::default_delete<MessageT>
221  >
222  void
224  uint64_t intra_process_publisher_id,
225  std::unique_ptr<MessageT, Deleter> message,
226  typename allocator::AllocRebind<MessageT, Alloc>::allocator_type & allocator)
227  {
228  using MessageAllocTraits = allocator::AllocRebind<MessageT, Alloc>;
229  using MessageAllocatorT = typename MessageAllocTraits::allocator_type;
230 
231  std::shared_lock<std::shared_timed_mutex> lock(mutex_);
232 
233  auto publisher_it = pub_to_subs_.find(intra_process_publisher_id);
234  if (publisher_it == pub_to_subs_.end()) {
235  // Publisher is either invalid or no longer exists.
236  RCLCPP_WARN(
237  rclcpp::get_logger("rclcpp"),
238  "Calling do_intra_process_publish for invalid or no longer existing publisher id");
239  return;
240  }
241  const auto & sub_ids = publisher_it->second;
242 
243  if (sub_ids.take_ownership_subscriptions.empty()) {
244  // None of the buffers require ownership, so we promote the pointer
245  std::shared_ptr<MessageT> msg = std::move(message);
246 
247  this->template add_shared_msg_to_buffers<MessageT, Alloc, Deleter, ROSMessageType>(
248  msg, sub_ids.take_shared_subscriptions);
249  } else if (!sub_ids.take_ownership_subscriptions.empty() && // NOLINT
250  sub_ids.take_shared_subscriptions.size() <= 1)
251  {
252  // There is at maximum 1 buffer that does not require ownership.
253  // So this case is equivalent to all the buffers requiring ownership
254 
255  // Merge the two vector of ids into a unique one
256  std::vector<uint64_t> concatenated_vector(
257  sub_ids.take_shared_subscriptions.begin(), sub_ids.take_shared_subscriptions.end());
258  concatenated_vector.insert(
259  concatenated_vector.end(),
260  sub_ids.take_ownership_subscriptions.begin(),
261  sub_ids.take_ownership_subscriptions.end());
262  this->template add_owned_msg_to_buffers<MessageT, Alloc, Deleter, ROSMessageType>(
263  std::move(message),
264  concatenated_vector,
265  allocator);
266  } else if (!sub_ids.take_ownership_subscriptions.empty() && // NOLINT
267  sub_ids.take_shared_subscriptions.size() > 1)
268  {
269  // Construct a new shared pointer from the message
270  // for the buffers that do not require ownership
271  auto shared_msg = std::allocate_shared<MessageT, MessageAllocatorT>(allocator, *message);
272 
273  this->template add_shared_msg_to_buffers<MessageT, Alloc, Deleter, ROSMessageType>(
274  shared_msg, sub_ids.take_shared_subscriptions);
275  this->template add_owned_msg_to_buffers<MessageT, Alloc, Deleter, ROSMessageType>(
276  std::move(message), sub_ids.take_ownership_subscriptions, allocator);
277  }
278  }
279 
280  template<
281  typename MessageT,
282  typename ROSMessageType,
283  typename Alloc,
284  typename Deleter = std::default_delete<MessageT>
285  >
286  std::shared_ptr<const MessageT>
287  do_intra_process_publish_and_return_shared(
288  uint64_t intra_process_publisher_id,
289  std::unique_ptr<MessageT, Deleter> message,
290  typename allocator::AllocRebind<MessageT, Alloc>::allocator_type & allocator)
291  {
292  using MessageAllocTraits = allocator::AllocRebind<MessageT, Alloc>;
293  using MessageAllocatorT = typename MessageAllocTraits::allocator_type;
294 
295  std::shared_lock<std::shared_timed_mutex> lock(mutex_);
296 
297  auto publisher_it = pub_to_subs_.find(intra_process_publisher_id);
298  if (publisher_it == pub_to_subs_.end()) {
299  // Publisher is either invalid or no longer exists.
300  RCLCPP_WARN(
301  rclcpp::get_logger("rclcpp"),
302  "Calling do_intra_process_publish for invalid or no longer existing publisher id");
303  return nullptr;
304  }
305  const auto & sub_ids = publisher_it->second;
306 
307  if (sub_ids.take_ownership_subscriptions.empty()) {
308  // If there are no owning, just convert to shared.
309  std::shared_ptr<MessageT> shared_msg = std::move(message);
310  if (!sub_ids.take_shared_subscriptions.empty()) {
311  this->template add_shared_msg_to_buffers<MessageT, Alloc, Deleter, ROSMessageType>(
312  shared_msg, sub_ids.take_shared_subscriptions);
313  }
314  return shared_msg;
315  } else {
316  // Construct a new shared pointer from the message for the buffers that
317  // do not require ownership and to return.
318  auto shared_msg = std::allocate_shared<MessageT, MessageAllocatorT>(allocator, *message);
319 
320  if (!sub_ids.take_shared_subscriptions.empty()) {
321  this->template add_shared_msg_to_buffers<MessageT, Alloc, Deleter, ROSMessageType>(
322  shared_msg,
323  sub_ids.take_shared_subscriptions);
324  }
325  if (!sub_ids.take_ownership_subscriptions.empty()) {
326  this->template add_owned_msg_to_buffers<MessageT, Alloc, Deleter, ROSMessageType>(
327  std::move(message),
328  sub_ids.take_ownership_subscriptions,
329  allocator);
330  }
331  return shared_msg;
332  }
333  }
334 
335  template<
336  typename MessageT,
337  typename Alloc,
338  typename Deleter,
339  typename ROSMessageType>
340  void
341  add_shared_msg_to_buffer(
342  std::shared_ptr<const MessageT> message,
343  uint64_t subscription_id)
344  {
345  add_shared_msg_to_buffers<MessageT, Alloc, Deleter, ROSMessageType>(message, {subscription_id});
346  }
347 
348  template<
349  typename MessageT,
350  typename Alloc,
351  typename Deleter,
352  typename ROSMessageType>
353  void
354  add_owned_msg_to_buffer(
355  std::unique_ptr<MessageT, Deleter> message,
356  uint64_t subscription_id,
357  typename allocator::AllocRebind<MessageT, Alloc>::allocator_type & allocator)
358  {
359  add_owned_msg_to_buffers<MessageT, Alloc, Deleter, ROSMessageType>(
360  std::move(message), {subscription_id}, allocator);
361  }
362 
364  RCLCPP_PUBLIC
365  bool
366  matches_any_publishers(const rmw_gid_t * id) const;
367 
369  RCLCPP_PUBLIC
370  size_t
371  get_subscription_count(uint64_t intra_process_publisher_id) const;
372 
373  RCLCPP_PUBLIC
374  rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr
375  get_subscription_intra_process(uint64_t intra_process_subscription_id);
376 
378  RCLCPP_PUBLIC
379  size_t
380  lowest_available_capacity(const uint64_t intra_process_publisher_id) const;
381 
382 private:
383  struct SplittedSubscriptions
384  {
385  std::vector<uint64_t> take_shared_subscriptions;
386  std::vector<uint64_t> take_ownership_subscriptions;
387  };
388 
389  using SubscriptionMap =
390  std::unordered_map<uint64_t, rclcpp::experimental::SubscriptionIntraProcessBase::WeakPtr>;
391 
392  using PublisherMap =
393  std::unordered_map<uint64_t, rclcpp::PublisherBase::WeakPtr>;
394 
395  using PublisherBufferMap =
396  std::unordered_map<uint64_t, rclcpp::experimental::buffers::IntraProcessBufferBase::WeakPtr>;
397 
398  using PublisherToSubscriptionIdsMap =
399  std::unordered_map<uint64_t, SplittedSubscriptions>;
400 
401  RCLCPP_PUBLIC
402  static
403  uint64_t
404  get_next_unique_id();
405 
406  RCLCPP_PUBLIC
407  void
408  insert_sub_id_for_pub(uint64_t sub_id, uint64_t pub_id, bool use_take_shared_method);
409 
410  RCLCPP_PUBLIC
411  bool
412  can_communicate(
413  rclcpp::PublisherBase::SharedPtr pub,
414  rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr sub) const;
415 
416  template<
417  typename ROSMessageType,
418  typename Alloc = std::allocator<ROSMessageType>
419  >
420  void do_transient_local_publish(
421  const uint64_t pub_id, const uint64_t sub_id,
422  const bool use_take_shared_method)
423  {
424  using ROSMessageTypeAllocatorTraits = allocator::AllocRebind<ROSMessageType, Alloc>;
425  using ROSMessageTypeAllocator = typename ROSMessageTypeAllocatorTraits::allocator_type;
426  using ROSMessageTypeDeleter = allocator::Deleter<ROSMessageTypeAllocator, ROSMessageType>;
427 
428  auto publisher_buffer = publisher_buffers_[pub_id].lock();
429  if (!publisher_buffer) {
430  throw std::runtime_error("publisher buffer has unexpectedly gone out of scope");
431  }
432  auto buffer = std::dynamic_pointer_cast<
434  ROSMessageType,
435  ROSMessageTypeAllocator,
436  ROSMessageTypeDeleter
437  >
438  >(publisher_buffer);
439  if (!buffer) {
440  throw std::runtime_error(
441  "failed to dynamic cast publisher's IntraProcessBufferBase to "
442  "IntraProcessBuffer<ROSMessageType,ROSMessageTypeAllocator,"
443  "ROSMessageTypeDeleter> which can happen when the publisher and "
444  "subscription use different allocator types, which is not supported");
445  }
446  if (use_take_shared_method) {
447  auto data_vec = buffer->get_all_data_shared();
448  for (auto shared_data : data_vec) {
449  this->template add_shared_msg_to_buffer<
450  ROSMessageType, ROSMessageTypeAllocator, ROSMessageTypeDeleter, ROSMessageType>(
451  shared_data, sub_id);
452  }
453  } else {
454  auto data_vec = buffer->get_all_data_unique();
455  for (auto & owned_data : data_vec) {
456  auto allocator = ROSMessageTypeAllocator();
457  this->template add_owned_msg_to_buffer<
458  ROSMessageType, ROSMessageTypeAllocator, ROSMessageTypeDeleter, ROSMessageType>(
459  std::move(owned_data), sub_id, allocator);
460  }
461  }
462  }
463 
464  template<
465  typename MessageT,
466  typename Alloc,
467  typename Deleter,
468  typename ROSMessageType>
469  void
470  add_shared_msg_to_buffers(
471  std::shared_ptr<const MessageT> message,
472  std::vector<uint64_t> subscription_ids)
473  {
474  using ROSMessageTypeAllocatorTraits = allocator::AllocRebind<ROSMessageType, Alloc>;
475  using ROSMessageTypeAllocator = typename ROSMessageTypeAllocatorTraits::allocator_type;
476  using ROSMessageTypeDeleter = allocator::Deleter<ROSMessageTypeAllocator, ROSMessageType>;
477 
478  using PublishedType = typename rclcpp::TypeAdapter<MessageT>::custom_type;
479  using PublishedTypeAllocatorTraits = allocator::AllocRebind<PublishedType, Alloc>;
480  using PublishedTypeAllocator = typename PublishedTypeAllocatorTraits::allocator_type;
481  using PublishedTypeDeleter = allocator::Deleter<PublishedTypeAllocator, PublishedType>;
482 
483  for (auto id : subscription_ids) {
484  auto subscription_it = subscriptions_.find(id);
485  if (subscription_it == subscriptions_.end()) {
486  throw std::runtime_error("subscription has unexpectedly gone out of scope");
487  }
488  auto subscription_base = subscription_it->second.lock();
489  if (subscription_base == nullptr) {
490  subscriptions_.erase(id);
491  continue;
492  }
493 
494  auto subscription = std::dynamic_pointer_cast<
496  PublishedTypeAllocator, PublishedTypeDeleter, ROSMessageType>
497  >(subscription_base);
498  if (subscription != nullptr) {
499  subscription->provide_intra_process_data(message);
500  continue;
501  }
502 
503  auto ros_message_subscription = std::dynamic_pointer_cast<
505  ROSMessageTypeAllocator, ROSMessageTypeDeleter>
506  >(subscription_base);
507  if (nullptr == ros_message_subscription) {
508  throw std::runtime_error(
509  "failed to dynamic cast SubscriptionIntraProcessBase to "
510  "SubscriptionIntraProcessBuffer<MessageT, Alloc, Deleter>, or to "
511  "SubscriptionROSMsgIntraProcessBuffer<ROSMessageType,ROSMessageTypeAllocator,"
512  "ROSMessageTypeDeleter> which can happen when the publisher and "
513  "subscription use different allocator types, which is not supported");
514  }
515 
517  ROSMessageType ros_msg;
519  ros_message_subscription->provide_intra_process_message(
520  std::make_shared<ROSMessageType>(ros_msg));
521  } else {
522  if constexpr (std::is_same<MessageT, ROSMessageType>::value) {
523  ros_message_subscription->provide_intra_process_message(message);
524  } else {
525  if constexpr (std::is_same<typename rclcpp::TypeAdapter<MessageT,
526  ROSMessageType>::ros_message_type, ROSMessageType>::value)
527  {
528  ROSMessageType ros_msg;
530  *message, ros_msg);
531  ros_message_subscription->provide_intra_process_message(
532  std::make_shared<ROSMessageType>(ros_msg));
533  }
534  }
535  }
536  }
537  }
538 
539  template<
540  typename MessageT,
541  typename Alloc,
542  typename Deleter,
543  typename ROSMessageType>
544  void
545  add_owned_msg_to_buffers(
546  std::unique_ptr<MessageT, Deleter> message,
547  std::vector<uint64_t> subscription_ids,
548  typename allocator::AllocRebind<MessageT, Alloc>::allocator_type & allocator)
549  {
550  using MessageAllocTraits = allocator::AllocRebind<MessageT, Alloc>;
551  using MessageUniquePtr = std::unique_ptr<MessageT, Deleter>;
552 
553  using ROSMessageTypeAllocatorTraits = allocator::AllocRebind<ROSMessageType, Alloc>;
554  using ROSMessageTypeAllocator = typename ROSMessageTypeAllocatorTraits::allocator_type;
555  using ROSMessageTypeDeleter = allocator::Deleter<ROSMessageTypeAllocator, ROSMessageType>;
556 
557  using PublishedType = typename rclcpp::TypeAdapter<MessageT>::custom_type;
558  using PublishedTypeAllocatorTraits = allocator::AllocRebind<PublishedType, Alloc>;
559  using PublishedTypeAllocator = typename PublishedTypeAllocatorTraits::allocator_type;
560  using PublishedTypeDeleter = allocator::Deleter<PublishedTypeAllocator, PublishedType>;
561 
562  for (auto it = subscription_ids.begin(); it != subscription_ids.end(); it++) {
563  auto subscription_it = subscriptions_.find(*it);
564  if (subscription_it == subscriptions_.end()) {
565  throw std::runtime_error("subscription has unexpectedly gone out of scope");
566  }
567  auto subscription_base = subscription_it->second.lock();
568  if (subscription_base == nullptr) {
569  subscriptions_.erase(subscription_it);
570  continue;
571  }
572 
573  auto subscription = std::dynamic_pointer_cast<
575  PublishedTypeAllocator, PublishedTypeDeleter, ROSMessageType>
576  >(subscription_base);
577  if (subscription != nullptr) {
578  if (std::next(it) == subscription_ids.end()) {
579  // If this is the last subscription, give up ownership
580  subscription->provide_intra_process_data(std::move(message));
581  // Last message delivered, break from for loop
582  break;
583  } else {
584  // Copy the message since we have additional subscriptions to serve
585  Deleter deleter = message.get_deleter();
586  auto ptr = MessageAllocTraits::allocate(allocator, 1);
587  MessageAllocTraits::construct(allocator, ptr, *message);
588 
589  subscription->provide_intra_process_data(MessageUniquePtr(ptr, deleter));
590  }
591 
592  continue;
593  }
594 
595  auto ros_message_subscription = std::dynamic_pointer_cast<
597  ROSMessageTypeAllocator, ROSMessageTypeDeleter>
598  >(subscription_base);
599  if (nullptr == ros_message_subscription) {
600  throw std::runtime_error(
601  "failed to dynamic cast SubscriptionIntraProcessBase to "
602  "SubscriptionIntraProcessBuffer<MessageT, Alloc, Deleter>, or to "
603  "SubscriptionROSMsgIntraProcessBuffer<ROSMessageType,ROSMessageTypeAllocator,"
604  "ROSMessageTypeDeleter> which can happen when the publisher and "
605  "subscription use different allocator types, which is not supported");
606  }
607 
609  ROSMessageTypeAllocator ros_message_alloc(allocator);
610  auto ptr = ROSMessageTypeAllocatorTraits::allocate(ros_message_alloc, 1);
611  ROSMessageTypeAllocatorTraits::construct(ros_message_alloc, ptr);
612  ROSMessageTypeDeleter deleter;
613  allocator::set_allocator_for_deleter(&deleter, &allocator);
615  auto ros_msg = std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter>(ptr, deleter);
616  ros_message_subscription->provide_intra_process_message(std::move(ros_msg));
617  } else {
618  if constexpr (std::is_same<MessageT, ROSMessageType>::value) {
619  if (std::next(it) == subscription_ids.end()) {
620  // If this is the last subscription, give up ownership
621  ros_message_subscription->provide_intra_process_message(std::move(message));
622  // Last message delivered, break from for loop
623  break;
624  } else {
625  // Copy the message since we have additional subscriptions to serve
626  Deleter deleter = message.get_deleter();
627  allocator::set_allocator_for_deleter(&deleter, &allocator);
628  auto ptr = MessageAllocTraits::allocate(allocator, 1);
629  MessageAllocTraits::construct(allocator, ptr, *message);
630 
631  ros_message_subscription->provide_intra_process_message(
632  MessageUniquePtr(ptr, deleter));
633  }
634  }
635  }
636  }
637  }
638 
639  PublisherToSubscriptionIdsMap pub_to_subs_;
640  SubscriptionMap subscriptions_;
641  PublisherMap publishers_;
642  PublisherBufferMap publisher_buffers_;
643 
644  mutable std::shared_timed_mutex mutex_;
645 };
646 
647 } // namespace experimental
648 } // namespace rclcpp
649 
650 #endif // RCLCPP__EXPERIMENTAL__INTRA_PROCESS_MANAGER_HPP_
This class performs intra process communication between nodes.
RCLCPP_PUBLIC bool matches_any_publishers(const rmw_gid_t *id) const
Return true if the given rmw_gid_t matches any stored Publishers.
RCLCPP_PUBLIC void remove_subscription(uint64_t intra_process_subscription_id)
Unregister a subscription using the subscription's unique id.
void do_intra_process_publish(uint64_t intra_process_publisher_id, std::unique_ptr< MessageT, Deleter > message, typename allocator::AllocRebind< MessageT, Alloc >::allocator_type &allocator)
Publishes an intra-process message, passed as a unique pointer.
RCLCPP_PUBLIC void remove_publisher(uint64_t intra_process_publisher_id)
Unregister a publisher using the publisher's unique id.
uint64_t add_subscription(rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription)
Register a subscription with the manager, returns subscriptions unique id.
RCLCPP_PUBLIC uint64_t add_publisher(rclcpp::PublisherBase::SharedPtr publisher, rclcpp::experimental::buffers::IntraProcessBufferBase::SharedPtr buffer=rclcpp::experimental::buffers::IntraProcessBufferBase::SharedPtr())
Register a publisher with the manager, returns the publisher unique id.
RCLCPP_PUBLIC size_t get_subscription_count(uint64_t intra_process_publisher_id) const
Return the number of intraprocess subscriptions that are matched with a given publisher id.
RCLCPP_PUBLIC size_t lowest_available_capacity(const uint64_t intra_process_publisher_id) const
Return the lowest available capacity for all subscription buffers for a publisher id.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
RCLCPP_PUBLIC Logger get_logger(const std::string &name)
Return a named logger.
Definition: logger.cpp:33
Template structure used to adapt custom, user-defined types to ROS types.