15 #ifndef RCLCPP__EXPERIMENTAL__INTRA_PROCESS_MANAGER_HPP_
16 #define RCLCPP__EXPERIMENTAL__INTRA_PROCESS_MANAGER_HPP_
18 #include <rmw/types.h>
20 #include <shared_mutex>
25 #include <unordered_map>
30 #include "rclcpp/allocator/allocator_deleter.hpp"
31 #include "rclcpp/experimental/ros_message_intra_process_buffer.hpp"
32 #include "rclcpp/experimental/subscription_intra_process.hpp"
33 #include "rclcpp/experimental/subscription_intra_process_base.hpp"
34 #include "rclcpp/experimental/subscription_intra_process_buffer.hpp"
35 #include "rclcpp/logger.hpp"
36 #include "rclcpp/logging.hpp"
37 #include "rclcpp/macros.hpp"
38 #include "rclcpp/publisher_base.hpp"
39 #include "rclcpp/type_adapter.hpp"
40 #include "rclcpp/visibility_control.hpp"
45 namespace experimental
117 add_subscription(rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription);
179 typename ROSMessageType,
181 typename Deleter = std::default_delete<MessageT>
185 uint64_t intra_process_publisher_id,
186 std::unique_ptr<MessageT, Deleter> message,
187 typename allocator::AllocRebind<MessageT, Alloc>::allocator_type & allocator)
189 using MessageAllocTraits = allocator::AllocRebind<MessageT, Alloc>;
190 using MessageAllocatorT =
typename MessageAllocTraits::allocator_type;
192 std::shared_lock<std::shared_timed_mutex> lock(mutex_);
194 auto publisher_it = pub_to_subs_.find(intra_process_publisher_id);
195 if (publisher_it == pub_to_subs_.end()) {
199 "Calling do_intra_process_publish for invalid or no longer existing publisher id");
202 const auto & sub_ids = publisher_it->second;
204 if (sub_ids.take_ownership_subscriptions.empty()) {
206 std::shared_ptr<MessageT> msg = std::move(message);
208 this->
template add_shared_msg_to_buffers<MessageT, Alloc, Deleter, ROSMessageType>(
209 msg, sub_ids.take_shared_subscriptions);
210 }
else if (!sub_ids.take_ownership_subscriptions.empty() &&
211 sub_ids.take_shared_subscriptions.size() <= 1)
217 std::vector<uint64_t> concatenated_vector(sub_ids.take_shared_subscriptions);
218 concatenated_vector.insert(
219 concatenated_vector.end(),
220 sub_ids.take_ownership_subscriptions.begin(),
221 sub_ids.take_ownership_subscriptions.end());
222 this->
template add_owned_msg_to_buffers<MessageT, Alloc, Deleter, ROSMessageType>(
226 }
else if (!sub_ids.take_ownership_subscriptions.empty() &&
227 sub_ids.take_shared_subscriptions.size() > 1)
231 auto shared_msg = std::allocate_shared<MessageT, MessageAllocatorT>(allocator, *message);
233 this->
template add_shared_msg_to_buffers<MessageT, Alloc, Deleter, ROSMessageType>(
234 shared_msg, sub_ids.take_shared_subscriptions);
235 this->
template add_owned_msg_to_buffers<MessageT, Alloc, Deleter, ROSMessageType>(
236 std::move(message), sub_ids.take_ownership_subscriptions, allocator);
242 typename ROSMessageType,
244 typename Deleter = std::default_delete<MessageT>
246 std::shared_ptr<const MessageT>
247 do_intra_process_publish_and_return_shared(
248 uint64_t intra_process_publisher_id,
249 std::unique_ptr<MessageT, Deleter> message,
250 typename allocator::AllocRebind<MessageT, Alloc>::allocator_type & allocator)
252 using MessageAllocTraits = allocator::AllocRebind<MessageT, Alloc>;
253 using MessageAllocatorT =
typename MessageAllocTraits::allocator_type;
255 std::shared_lock<std::shared_timed_mutex> lock(mutex_);
257 auto publisher_it = pub_to_subs_.find(intra_process_publisher_id);
258 if (publisher_it == pub_to_subs_.end()) {
262 "Calling do_intra_process_publish for invalid or no longer existing publisher id");
265 const auto & sub_ids = publisher_it->second;
267 if (sub_ids.take_ownership_subscriptions.empty()) {
269 std::shared_ptr<MessageT> shared_msg = std::move(message);
270 if (!sub_ids.take_shared_subscriptions.empty()) {
271 this->
template add_shared_msg_to_buffers<MessageT, Alloc, Deleter, ROSMessageType>(
272 shared_msg, sub_ids.take_shared_subscriptions);
278 auto shared_msg = std::allocate_shared<MessageT, MessageAllocatorT>(allocator, *message);
280 if (!sub_ids.take_shared_subscriptions.empty()) {
281 this->
template add_shared_msg_to_buffers<MessageT, Alloc, Deleter, ROSMessageType>(
283 sub_ids.take_shared_subscriptions);
285 if (!sub_ids.take_ownership_subscriptions.empty()) {
286 this->
template add_owned_msg_to_buffers<MessageT, Alloc, Deleter, ROSMessageType>(
288 sub_ids.take_ownership_subscriptions,
306 rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr
307 get_subscription_intra_process(uint64_t intra_process_subscription_id);
310 struct SplittedSubscriptions
312 std::vector<uint64_t> take_shared_subscriptions;
313 std::vector<uint64_t> take_ownership_subscriptions;
316 using SubscriptionMap =
317 std::unordered_map<uint64_t, rclcpp::experimental::SubscriptionIntraProcessBase::WeakPtr>;
320 std::unordered_map<uint64_t, rclcpp::PublisherBase::WeakPtr>;
322 using PublisherToSubscriptionIdsMap =
323 std::unordered_map<uint64_t, SplittedSubscriptions>;
328 get_next_unique_id();
332 insert_sub_id_for_pub(uint64_t sub_id, uint64_t pub_id,
bool use_take_shared_method);
337 rclcpp::PublisherBase::SharedPtr pub,
338 rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr sub)
const;
344 typename ROSMessageType>
346 add_shared_msg_to_buffers(
347 std::shared_ptr<const MessageT> message,
348 std::vector<uint64_t> subscription_ids)
350 using ROSMessageTypeAllocatorTraits = allocator::AllocRebind<ROSMessageType, Alloc>;
351 using ROSMessageTypeAllocator =
typename ROSMessageTypeAllocatorTraits::allocator_type;
352 using ROSMessageTypeDeleter = allocator::Deleter<ROSMessageTypeAllocator, ROSMessageType>;
354 using PublishedType =
typename rclcpp::TypeAdapter<MessageT>::custom_type;
355 using PublishedTypeAllocatorTraits = allocator::AllocRebind<PublishedType, Alloc>;
356 using PublishedTypeAllocator =
typename PublishedTypeAllocatorTraits::allocator_type;
357 using PublishedTypeDeleter = allocator::Deleter<PublishedTypeAllocator, PublishedType>;
359 for (
auto id : subscription_ids) {
360 auto subscription_it = subscriptions_.find(
id);
361 if (subscription_it == subscriptions_.end()) {
362 throw std::runtime_error(
"subscription has unexpectedly gone out of scope");
364 auto subscription_base = subscription_it->second.lock();
365 if (subscription_base ==
nullptr) {
366 subscriptions_.erase(
id);
370 auto subscription = std::dynamic_pointer_cast<
372 PublishedTypeAllocator, PublishedTypeDeleter, ROSMessageType>
373 >(subscription_base);
374 if (subscription !=
nullptr) {
375 subscription->provide_intra_process_data(message);
379 auto ros_message_subscription = std::dynamic_pointer_cast<
381 ROSMessageTypeAllocator, ROSMessageTypeDeleter>
382 >(subscription_base);
383 if (
nullptr == ros_message_subscription) {
384 throw std::runtime_error(
385 "failed to dynamic cast SubscriptionIntraProcessBase to "
386 "SubscriptionIntraProcessBuffer<MessageT, Alloc, Deleter>, or to "
387 "SubscriptionROSMsgIntraProcessBuffer<ROSMessageType,ROSMessageTypeAllocator,"
388 "ROSMessageTypeDeleter> which can happen when the publisher and "
389 "subscription use different allocator types, which is not supported");
393 ROSMessageType ros_msg;
395 ros_message_subscription->provide_intra_process_message(
396 std::make_shared<ROSMessageType>(ros_msg));
398 if constexpr (std::is_same<MessageT, ROSMessageType>::value) {
399 ros_message_subscription->provide_intra_process_message(message);
402 ROSMessageType>::ros_message_type, ROSMessageType>::value)
404 ROSMessageType ros_msg;
407 ros_message_subscription->provide_intra_process_message(
408 std::make_shared<ROSMessageType>(ros_msg));
419 typename ROSMessageType>
421 add_owned_msg_to_buffers(
422 std::unique_ptr<MessageT, Deleter> message,
423 std::vector<uint64_t> subscription_ids,
424 typename allocator::AllocRebind<MessageT, Alloc>::allocator_type & allocator)
426 using MessageAllocTraits = allocator::AllocRebind<MessageT, Alloc>;
427 using MessageUniquePtr = std::unique_ptr<MessageT, Deleter>;
429 using ROSMessageTypeAllocatorTraits = allocator::AllocRebind<ROSMessageType, Alloc>;
430 using ROSMessageTypeAllocator =
typename ROSMessageTypeAllocatorTraits::allocator_type;
431 using ROSMessageTypeDeleter = allocator::Deleter<ROSMessageTypeAllocator, ROSMessageType>;
433 using PublishedType =
typename rclcpp::TypeAdapter<MessageT>::custom_type;
434 using PublishedTypeAllocatorTraits = allocator::AllocRebind<PublishedType, Alloc>;
435 using PublishedTypeAllocator =
typename PublishedTypeAllocatorTraits::allocator_type;
436 using PublishedTypeDeleter = allocator::Deleter<PublishedTypeAllocator, PublishedType>;
438 for (
auto it = subscription_ids.begin(); it != subscription_ids.end(); it++) {
439 auto subscription_it = subscriptions_.find(*it);
440 if (subscription_it == subscriptions_.end()) {
441 throw std::runtime_error(
"subscription has unexpectedly gone out of scope");
443 auto subscription_base = subscription_it->second.lock();
444 if (subscription_base ==
nullptr) {
445 subscriptions_.erase(subscription_it);
449 auto subscription = std::dynamic_pointer_cast<
451 PublishedTypeAllocator, PublishedTypeDeleter, ROSMessageType>
452 >(subscription_base);
453 if (subscription !=
nullptr) {
454 if (std::next(it) == subscription_ids.end()) {
456 subscription->provide_intra_process_data(std::move(message));
461 Deleter deleter = message.get_deleter();
462 auto ptr = MessageAllocTraits::allocate(allocator, 1);
463 MessageAllocTraits::construct(allocator, ptr, *message);
465 subscription->provide_intra_process_data(std::move(MessageUniquePtr(ptr, deleter)));
471 auto ros_message_subscription = std::dynamic_pointer_cast<
473 ROSMessageTypeAllocator, ROSMessageTypeDeleter>
474 >(subscription_base);
475 if (
nullptr == ros_message_subscription) {
476 throw std::runtime_error(
477 "failed to dynamic cast SubscriptionIntraProcessBase to "
478 "SubscriptionIntraProcessBuffer<MessageT, Alloc, Deleter>, or to "
479 "SubscriptionROSMsgIntraProcessBuffer<ROSMessageType,ROSMessageTypeAllocator,"
480 "ROSMessageTypeDeleter> which can happen when the publisher and "
481 "subscription use different allocator types, which is not supported");
485 ROSMessageTypeAllocator ros_message_alloc(allocator);
486 auto ptr = ROSMessageTypeAllocatorTraits::allocate(ros_message_alloc, 1);
487 ROSMessageTypeAllocatorTraits::construct(ros_message_alloc, ptr);
488 ROSMessageTypeDeleter deleter;
489 allocator::set_allocator_for_deleter(&deleter, &allocator);
491 auto ros_msg = std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter>(ptr, deleter);
492 ros_message_subscription->provide_intra_process_message(std::move(ros_msg));
494 if constexpr (std::is_same<MessageT, ROSMessageType>::value) {
495 if (std::next(it) == subscription_ids.end()) {
497 ros_message_subscription->provide_intra_process_message(std::move(message));
502 Deleter deleter = message.get_deleter();
503 allocator::set_allocator_for_deleter(&deleter, &allocator);
504 auto ptr = MessageAllocTraits::allocate(allocator, 1);
505 MessageAllocTraits::construct(allocator, ptr, *message);
507 ros_message_subscription->provide_intra_process_message(
508 std::move(MessageUniquePtr(ptr, deleter)));
515 PublisherToSubscriptionIdsMap pub_to_subs_;
516 SubscriptionMap subscriptions_;
517 PublisherMap publishers_;
519 mutable std::shared_timed_mutex mutex_;
This class performs intra process communication between nodes.
RCLCPP_PUBLIC bool matches_any_publishers(const rmw_gid_t *id) const
Return true if the given rmw_gid_t matches any stored Publishers.
RCLCPP_PUBLIC uint64_t add_subscription(rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription)
Register a subscription with the manager, returns subscriptions unique id.
RCLCPP_PUBLIC void remove_subscription(uint64_t intra_process_subscription_id)
Unregister a subscription using the subscription's unique id.
void do_intra_process_publish(uint64_t intra_process_publisher_id, std::unique_ptr< MessageT, Deleter > message, typename allocator::AllocRebind< MessageT, Alloc >::allocator_type &allocator)
Publishes an intra-process message, passed as a unique pointer.
RCLCPP_PUBLIC uint64_t add_publisher(rclcpp::PublisherBase::SharedPtr publisher)
Register a publisher with the manager, returns the publisher unique id.
RCLCPP_PUBLIC void remove_publisher(uint64_t intra_process_publisher_id)
Unregister a publisher using the publisher's unique id.
RCLCPP_PUBLIC size_t get_subscription_count(uint64_t intra_process_publisher_id) const
Return the number of intraprocess subscriptions that are matched with a given publisher id.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
RCLCPP_PUBLIC Logger get_logger(const std::string &name)
Return a named logger.
Template structure used to adapt custom, user-defined types to ROS types.