15 #ifndef RCLCPP__EXPERIMENTAL__INTRA_PROCESS_MANAGER_HPP_
16 #define RCLCPP__EXPERIMENTAL__INTRA_PROCESS_MANAGER_HPP_
18 #include <rmw/types.h>
20 #include <shared_mutex>
25 #include <unordered_map>
30 #include "rclcpp/allocator/allocator_deleter.hpp"
31 #include "rclcpp/experimental/buffers/intra_process_buffer.hpp"
32 #include "rclcpp/experimental/ros_message_intra_process_buffer.hpp"
33 #include "rclcpp/experimental/subscription_intra_process.hpp"
34 #include "rclcpp/experimental/subscription_intra_process_base.hpp"
35 #include "rclcpp/experimental/subscription_intra_process_buffer.hpp"
36 #include "rclcpp/logger.hpp"
37 #include "rclcpp/logging.hpp"
38 #include "rclcpp/macros.hpp"
39 #include "rclcpp/publisher_base.hpp"
40 #include "rclcpp/type_adapter.hpp"
41 #include "rclcpp/visibility_control.hpp"
46 namespace experimental
117 typename ROSMessageType,
118 typename Alloc = std::allocator<ROSMessageType>
121 add_subscription(rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription)
123 std::unique_lock<std::shared_timed_mutex> lock(mutex_);
125 uint64_t sub_id = IntraProcessManager::get_next_unique_id();
127 subscriptions_[sub_id] = subscription;
130 for (
auto & pair : publishers_) {
131 auto publisher = pair.second.lock();
135 if (can_communicate(publisher, subscription)) {
136 uint64_t pub_id = pair.first;
137 insert_sub_id_for_pub(sub_id, pub_id, subscription->use_take_shared_method());
138 if (publisher->is_durability_transient_local() &&
139 subscription->is_durability_transient_local())
141 do_transient_local_publish<ROSMessageType, Alloc>(
143 subscription->use_take_shared_method());
178 rclcpp::PublisherBase::SharedPtr publisher,
179 rclcpp::experimental::buffers::IntraProcessBufferBase::SharedPtr buffer =
180 rclcpp::experimental::buffers::IntraProcessBufferBase::SharedPtr());
218 typename ROSMessageType,
220 typename Deleter = std::default_delete<MessageT>
224 uint64_t intra_process_publisher_id,
225 std::unique_ptr<MessageT, Deleter> message,
226 typename allocator::AllocRebind<MessageT, Alloc>::allocator_type & allocator)
228 using MessageAllocTraits = allocator::AllocRebind<MessageT, Alloc>;
229 using MessageAllocatorT =
typename MessageAllocTraits::allocator_type;
231 std::shared_lock<std::shared_timed_mutex> lock(mutex_);
233 auto publisher_it = pub_to_subs_.find(intra_process_publisher_id);
234 if (publisher_it == pub_to_subs_.end()) {
238 "Calling do_intra_process_publish for invalid or no longer existing publisher id");
241 const auto & sub_ids = publisher_it->second;
243 if (sub_ids.take_ownership_subscriptions.empty()) {
245 std::shared_ptr<MessageT> msg = std::move(message);
247 this->
template add_shared_msg_to_buffers<MessageT, Alloc, Deleter, ROSMessageType>(
248 msg, sub_ids.take_shared_subscriptions);
249 }
else if (!sub_ids.take_ownership_subscriptions.empty() &&
250 sub_ids.take_shared_subscriptions.size() <= 1)
256 std::vector<uint64_t> concatenated_vector(
257 sub_ids.take_shared_subscriptions.begin(), sub_ids.take_shared_subscriptions.end());
258 concatenated_vector.insert(
259 concatenated_vector.end(),
260 sub_ids.take_ownership_subscriptions.begin(),
261 sub_ids.take_ownership_subscriptions.end());
262 this->
template add_owned_msg_to_buffers<MessageT, Alloc, Deleter, ROSMessageType>(
266 }
else if (!sub_ids.take_ownership_subscriptions.empty() &&
267 sub_ids.take_shared_subscriptions.size() > 1)
271 auto shared_msg = std::allocate_shared<MessageT, MessageAllocatorT>(allocator, *message);
273 this->
template add_shared_msg_to_buffers<MessageT, Alloc, Deleter, ROSMessageType>(
274 shared_msg, sub_ids.take_shared_subscriptions);
275 this->
template add_owned_msg_to_buffers<MessageT, Alloc, Deleter, ROSMessageType>(
276 std::move(message), sub_ids.take_ownership_subscriptions, allocator);
282 typename ROSMessageType,
284 typename Deleter = std::default_delete<MessageT>
286 std::shared_ptr<const MessageT>
287 do_intra_process_publish_and_return_shared(
288 uint64_t intra_process_publisher_id,
289 std::unique_ptr<MessageT, Deleter> message,
290 typename allocator::AllocRebind<MessageT, Alloc>::allocator_type & allocator)
292 using MessageAllocTraits = allocator::AllocRebind<MessageT, Alloc>;
293 using MessageAllocatorT =
typename MessageAllocTraits::allocator_type;
295 std::shared_lock<std::shared_timed_mutex> lock(mutex_);
297 auto publisher_it = pub_to_subs_.find(intra_process_publisher_id);
298 if (publisher_it == pub_to_subs_.end()) {
302 "Calling do_intra_process_publish for invalid or no longer existing publisher id");
305 const auto & sub_ids = publisher_it->second;
307 if (sub_ids.take_ownership_subscriptions.empty()) {
309 std::shared_ptr<MessageT> shared_msg = std::move(message);
310 if (!sub_ids.take_shared_subscriptions.empty()) {
311 this->
template add_shared_msg_to_buffers<MessageT, Alloc, Deleter, ROSMessageType>(
312 shared_msg, sub_ids.take_shared_subscriptions);
318 auto shared_msg = std::allocate_shared<MessageT, MessageAllocatorT>(allocator, *message);
320 if (!sub_ids.take_shared_subscriptions.empty()) {
321 this->
template add_shared_msg_to_buffers<MessageT, Alloc, Deleter, ROSMessageType>(
323 sub_ids.take_shared_subscriptions);
325 if (!sub_ids.take_ownership_subscriptions.empty()) {
326 this->
template add_owned_msg_to_buffers<MessageT, Alloc, Deleter, ROSMessageType>(
328 sub_ids.take_ownership_subscriptions,
339 typename ROSMessageType>
341 add_shared_msg_to_buffer(
342 std::shared_ptr<const MessageT> message,
343 uint64_t subscription_id)
345 add_shared_msg_to_buffers<MessageT, Alloc, Deleter, ROSMessageType>(message, {subscription_id});
352 typename ROSMessageType>
354 add_owned_msg_to_buffer(
355 std::unique_ptr<MessageT, Deleter> message,
356 uint64_t subscription_id,
357 typename allocator::AllocRebind<MessageT, Alloc>::allocator_type & allocator)
359 add_owned_msg_to_buffers<MessageT, Alloc, Deleter, ROSMessageType>(
360 std::move(message), {subscription_id}, allocator);
374 rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr
375 get_subscription_intra_process(uint64_t intra_process_subscription_id);
383 struct SplittedSubscriptions
385 std::vector<uint64_t> take_shared_subscriptions;
386 std::vector<uint64_t> take_ownership_subscriptions;
389 using SubscriptionMap =
390 std::unordered_map<uint64_t, rclcpp::experimental::SubscriptionIntraProcessBase::WeakPtr>;
393 std::unordered_map<uint64_t, rclcpp::PublisherBase::WeakPtr>;
395 using PublisherBufferMap =
396 std::unordered_map<uint64_t, rclcpp::experimental::buffers::IntraProcessBufferBase::WeakPtr>;
398 using PublisherToSubscriptionIdsMap =
399 std::unordered_map<uint64_t, SplittedSubscriptions>;
404 get_next_unique_id();
408 insert_sub_id_for_pub(uint64_t sub_id, uint64_t pub_id,
bool use_take_shared_method);
413 rclcpp::PublisherBase::SharedPtr pub,
414 rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr sub)
const;
417 typename ROSMessageType,
418 typename Alloc = std::allocator<ROSMessageType>
420 void do_transient_local_publish(
421 const uint64_t pub_id,
const uint64_t sub_id,
422 const bool use_take_shared_method)
424 using ROSMessageTypeAllocatorTraits = allocator::AllocRebind<ROSMessageType, Alloc>;
425 using ROSMessageTypeAllocator =
typename ROSMessageTypeAllocatorTraits::allocator_type;
426 using ROSMessageTypeDeleter = allocator::Deleter<ROSMessageTypeAllocator, ROSMessageType>;
428 auto publisher_buffer = publisher_buffers_[pub_id].lock();
429 if (!publisher_buffer) {
430 throw std::runtime_error(
"publisher buffer has unexpectedly gone out of scope");
432 auto buffer = std::dynamic_pointer_cast<
435 ROSMessageTypeAllocator,
436 ROSMessageTypeDeleter
440 throw std::runtime_error(
441 "failed to dynamic cast publisher's IntraProcessBufferBase to "
442 "IntraProcessBuffer<ROSMessageType,ROSMessageTypeAllocator,"
443 "ROSMessageTypeDeleter> which can happen when the publisher and "
444 "subscription use different allocator types, which is not supported");
446 if (use_take_shared_method) {
447 auto data_vec = buffer->get_all_data_shared();
448 for (
auto shared_data : data_vec) {
449 this->
template add_shared_msg_to_buffer<
450 ROSMessageType, ROSMessageTypeAllocator, ROSMessageTypeDeleter, ROSMessageType>(
451 shared_data, sub_id);
454 auto data_vec = buffer->get_all_data_unique();
455 for (
auto & owned_data : data_vec) {
456 auto allocator = ROSMessageTypeAllocator();
457 this->
template add_owned_msg_to_buffer<
458 ROSMessageType, ROSMessageTypeAllocator, ROSMessageTypeDeleter, ROSMessageType>(
459 std::move(owned_data), sub_id, allocator);
468 typename ROSMessageType>
470 add_shared_msg_to_buffers(
471 std::shared_ptr<const MessageT> message,
472 std::vector<uint64_t> subscription_ids)
474 using ROSMessageTypeAllocatorTraits = allocator::AllocRebind<ROSMessageType, Alloc>;
475 using ROSMessageTypeAllocator =
typename ROSMessageTypeAllocatorTraits::allocator_type;
476 using ROSMessageTypeDeleter = allocator::Deleter<ROSMessageTypeAllocator, ROSMessageType>;
478 using PublishedType =
typename rclcpp::TypeAdapter<MessageT>::custom_type;
479 using PublishedTypeAllocatorTraits = allocator::AllocRebind<PublishedType, Alloc>;
480 using PublishedTypeAllocator =
typename PublishedTypeAllocatorTraits::allocator_type;
481 using PublishedTypeDeleter = allocator::Deleter<PublishedTypeAllocator, PublishedType>;
483 for (
auto id : subscription_ids) {
484 auto subscription_it = subscriptions_.find(
id);
485 if (subscription_it == subscriptions_.end()) {
486 throw std::runtime_error(
"subscription has unexpectedly gone out of scope");
488 auto subscription_base = subscription_it->second.lock();
489 if (subscription_base ==
nullptr) {
490 subscriptions_.erase(
id);
494 auto subscription = std::dynamic_pointer_cast<
496 PublishedTypeAllocator, PublishedTypeDeleter, ROSMessageType>
497 >(subscription_base);
498 if (subscription !=
nullptr) {
499 subscription->provide_intra_process_data(message);
503 auto ros_message_subscription = std::dynamic_pointer_cast<
505 ROSMessageTypeAllocator, ROSMessageTypeDeleter>
506 >(subscription_base);
507 if (
nullptr == ros_message_subscription) {
508 throw std::runtime_error(
509 "failed to dynamic cast SubscriptionIntraProcessBase to "
510 "SubscriptionIntraProcessBuffer<MessageT, Alloc, Deleter>, or to "
511 "SubscriptionROSMsgIntraProcessBuffer<ROSMessageType,ROSMessageTypeAllocator,"
512 "ROSMessageTypeDeleter> which can happen when the publisher and "
513 "subscription use different allocator types, which is not supported");
517 ROSMessageType ros_msg;
519 ros_message_subscription->provide_intra_process_message(
520 std::make_shared<ROSMessageType>(ros_msg));
522 if constexpr (std::is_same<MessageT, ROSMessageType>::value) {
523 ros_message_subscription->provide_intra_process_message(message);
526 ROSMessageType>::ros_message_type, ROSMessageType>::value)
528 ROSMessageType ros_msg;
531 ros_message_subscription->provide_intra_process_message(
532 std::make_shared<ROSMessageType>(ros_msg));
543 typename ROSMessageType>
545 add_owned_msg_to_buffers(
546 std::unique_ptr<MessageT, Deleter> message,
547 std::vector<uint64_t> subscription_ids,
548 typename allocator::AllocRebind<MessageT, Alloc>::allocator_type & allocator)
550 using MessageAllocTraits = allocator::AllocRebind<MessageT, Alloc>;
551 using MessageUniquePtr = std::unique_ptr<MessageT, Deleter>;
553 using ROSMessageTypeAllocatorTraits = allocator::AllocRebind<ROSMessageType, Alloc>;
554 using ROSMessageTypeAllocator =
typename ROSMessageTypeAllocatorTraits::allocator_type;
555 using ROSMessageTypeDeleter = allocator::Deleter<ROSMessageTypeAllocator, ROSMessageType>;
557 using PublishedType =
typename rclcpp::TypeAdapter<MessageT>::custom_type;
558 using PublishedTypeAllocatorTraits = allocator::AllocRebind<PublishedType, Alloc>;
559 using PublishedTypeAllocator =
typename PublishedTypeAllocatorTraits::allocator_type;
560 using PublishedTypeDeleter = allocator::Deleter<PublishedTypeAllocator, PublishedType>;
562 for (
auto it = subscription_ids.begin(); it != subscription_ids.end(); it++) {
563 auto subscription_it = subscriptions_.find(*it);
564 if (subscription_it == subscriptions_.end()) {
565 throw std::runtime_error(
"subscription has unexpectedly gone out of scope");
567 auto subscription_base = subscription_it->second.lock();
568 if (subscription_base ==
nullptr) {
569 subscriptions_.erase(subscription_it);
573 auto subscription = std::dynamic_pointer_cast<
575 PublishedTypeAllocator, PublishedTypeDeleter, ROSMessageType>
576 >(subscription_base);
577 if (subscription !=
nullptr) {
578 if (std::next(it) == subscription_ids.end()) {
580 subscription->provide_intra_process_data(std::move(message));
585 Deleter deleter = message.get_deleter();
586 auto ptr = MessageAllocTraits::allocate(allocator, 1);
587 MessageAllocTraits::construct(allocator, ptr, *message);
589 subscription->provide_intra_process_data(MessageUniquePtr(ptr, deleter));
595 auto ros_message_subscription = std::dynamic_pointer_cast<
597 ROSMessageTypeAllocator, ROSMessageTypeDeleter>
598 >(subscription_base);
599 if (
nullptr == ros_message_subscription) {
600 throw std::runtime_error(
601 "failed to dynamic cast SubscriptionIntraProcessBase to "
602 "SubscriptionIntraProcessBuffer<MessageT, Alloc, Deleter>, or to "
603 "SubscriptionROSMsgIntraProcessBuffer<ROSMessageType,ROSMessageTypeAllocator,"
604 "ROSMessageTypeDeleter> which can happen when the publisher and "
605 "subscription use different allocator types, which is not supported");
609 ROSMessageTypeAllocator ros_message_alloc(allocator);
610 auto ptr = ROSMessageTypeAllocatorTraits::allocate(ros_message_alloc, 1);
611 ROSMessageTypeAllocatorTraits::construct(ros_message_alloc, ptr);
612 ROSMessageTypeDeleter deleter;
613 allocator::set_allocator_for_deleter(&deleter, &allocator);
615 auto ros_msg = std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter>(ptr, deleter);
616 ros_message_subscription->provide_intra_process_message(std::move(ros_msg));
618 if constexpr (std::is_same<MessageT, ROSMessageType>::value) {
619 if (std::next(it) == subscription_ids.end()) {
621 ros_message_subscription->provide_intra_process_message(std::move(message));
626 Deleter deleter = message.get_deleter();
627 allocator::set_allocator_for_deleter(&deleter, &allocator);
628 auto ptr = MessageAllocTraits::allocate(allocator, 1);
629 MessageAllocTraits::construct(allocator, ptr, *message);
631 ros_message_subscription->provide_intra_process_message(
632 MessageUniquePtr(ptr, deleter));
639 PublisherToSubscriptionIdsMap pub_to_subs_;
640 SubscriptionMap subscriptions_;
641 PublisherMap publishers_;
642 PublisherBufferMap publisher_buffers_;
644 mutable std::shared_timed_mutex mutex_;
This class performs intra process communication between nodes.
RCLCPP_PUBLIC bool matches_any_publishers(const rmw_gid_t *id) const
Return true if the given rmw_gid_t matches any stored Publishers.
RCLCPP_PUBLIC void remove_subscription(uint64_t intra_process_subscription_id)
Unregister a subscription using the subscription's unique id.
void do_intra_process_publish(uint64_t intra_process_publisher_id, std::unique_ptr< MessageT, Deleter > message, typename allocator::AllocRebind< MessageT, Alloc >::allocator_type &allocator)
Publishes an intra-process message, passed as a unique pointer.
RCLCPP_PUBLIC void remove_publisher(uint64_t intra_process_publisher_id)
Unregister a publisher using the publisher's unique id.
uint64_t add_subscription(rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription)
Register a subscription with the manager, returns subscriptions unique id.
RCLCPP_PUBLIC uint64_t add_publisher(rclcpp::PublisherBase::SharedPtr publisher, rclcpp::experimental::buffers::IntraProcessBufferBase::SharedPtr buffer=rclcpp::experimental::buffers::IntraProcessBufferBase::SharedPtr())
Register a publisher with the manager, returns the publisher unique id.
RCLCPP_PUBLIC size_t get_subscription_count(uint64_t intra_process_publisher_id) const
Return the number of intraprocess subscriptions that are matched with a given publisher id.
RCLCPP_PUBLIC size_t lowest_available_capacity(const uint64_t intra_process_publisher_id) const
Return the lowest available capacity for all subscription buffers for a publisher id.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
RCLCPP_PUBLIC Logger get_logger(const std::string &name)
Return a named logger.
Template structure used to adapt custom, user-defined types to ROS types.