15 #ifndef RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BUFFER_HPP_
16 #define RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BUFFER_HPP_
23 #include "rcl/error_handling.h"
27 #include "rclcpp/experimental/buffers/intra_process_buffer.hpp"
28 #include "rclcpp/experimental/create_intra_process_buffer.hpp"
29 #include "rclcpp/experimental/subscription_intra_process_base.hpp"
30 #include "rclcpp/experimental/ros_message_intra_process_buffer.hpp"
31 #include "rclcpp/qos.hpp"
32 #include "rclcpp/type_support_decl.hpp"
33 #include "rclcpp/detail/add_guard_condition_to_rcl_wait_set.hpp"
35 #include "tracetools/tracetools.h"
39 namespace experimental
43 typename SubscribedType,
44 typename Alloc = std::allocator<SubscribedType>,
45 typename Deleter = std::default_delete<SubscribedType>,
48 typename ROSMessageType = SubscribedType
51 typename allocator::AllocRebind<ROSMessageType, Alloc>::allocator_type,
52 allocator::Deleter<typename allocator::AllocRebind<ROSMessageType, Alloc>::allocator_type,
58 using SubscribedTypeAllocatorTraits = allocator::AllocRebind<SubscribedType, Alloc>;
59 using SubscribedTypeAllocator =
typename SubscribedTypeAllocatorTraits::allocator_type;
60 using SubscribedTypeDeleter = allocator::Deleter<SubscribedTypeAllocator, SubscribedType>;
62 using ROSMessageTypeAllocatorTraits = allocator::AllocRebind<ROSMessageType, Alloc>;
63 using ROSMessageTypeAllocator =
typename ROSMessageTypeAllocatorTraits::allocator_type;
64 using ROSMessageTypeDeleter = allocator::Deleter<ROSMessageTypeAllocator, ROSMessageType>;
66 using ConstMessageSharedPtr = std::shared_ptr<const ROSMessageType>;
67 using MessageUniquePtr = std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter>;
69 using ConstDataSharedPtr = std::shared_ptr<const SubscribedType>;
70 using SubscribedTypeUniquePtr = std::unique_ptr<SubscribedType, SubscribedTypeDeleter>;
79 std::shared_ptr<Alloc> allocator,
80 rclcpp::Context::SharedPtr context,
81 const std::string & topic_name,
85 ROSMessageTypeDeleter>(
86 context, topic_name, qos_profile),
87 subscribed_type_allocator_(*allocator)
89 allocator::set_allocator_for_deleter(&subscribed_type_deleter_, &subscribed_type_allocator_);
92 buffer_ = rclcpp::experimental::create_intra_process_buffer<SubscribedType, Alloc,
93 SubscribedTypeDeleter>(
96 std::make_shared<Alloc>(subscribed_type_allocator_));
97 TRACETOOLS_TRACEPOINT(
98 rclcpp_ipb_to_subscription,
99 static_cast<const void *
>(buffer_.get()),
100 static_cast<const void *
>(
this));
106 if (this->buffer_->has_data()) {
107 this->trigger_guard_condition();
109 detail::add_guard_condition_to_rcl_wait_set(wait_set, this->gc_);
113 is_ready([[maybe_unused]]
const rcl_wait_set_t & wait_set)
override
115 return buffer_->has_data();
118 SubscribedTypeUniquePtr
119 convert_ros_message_to_subscribed_type_unique_ptr(
const ROSMessageType & msg)
121 if constexpr (!std::is_same<SubscribedType, ROSMessageType>::value) {
122 auto ptr = SubscribedTypeAllocatorTraits::allocate(subscribed_type_allocator_, 1);
123 SubscribedTypeAllocatorTraits::construct(subscribed_type_allocator_, ptr);
125 return SubscribedTypeUniquePtr(ptr, subscribed_type_deleter_);
127 throw std::runtime_error(
128 "convert_ros_message_to_subscribed_type_unique_ptr "
129 "unexpectedly called without TypeAdapter");
134 provide_intra_process_message(ConstMessageSharedPtr message)
override
136 if constexpr (std::is_same<SubscribedType, ROSMessageType>::value) {
137 buffer_->add_shared(std::move(message));
138 trigger_guard_condition();
140 buffer_->add_shared(convert_ros_message_to_subscribed_type_unique_ptr(*message));
141 trigger_guard_condition();
143 this->invoke_on_new_message();
147 provide_intra_process_message(MessageUniquePtr message)
override
149 if constexpr (std::is_same<SubscribedType, ROSMessageType>::value) {
150 buffer_->add_unique(std::move(message));
151 trigger_guard_condition();
153 buffer_->add_unique(convert_ros_message_to_subscribed_type_unique_ptr(*message));
154 trigger_guard_condition();
156 this->invoke_on_new_message();
160 provide_intra_process_data(ConstDataSharedPtr message)
162 buffer_->add_shared(std::move(message));
163 trigger_guard_condition();
164 this->invoke_on_new_message();
168 provide_intra_process_data(SubscribedTypeUniquePtr message)
170 buffer_->add_unique(std::move(message));
171 trigger_guard_condition();
172 this->invoke_on_new_message();
176 use_take_shared_method()
const override
178 return buffer_->use_take_shared_method();
181 size_t available_capacity()
const override
183 return buffer_->available_capacity();
188 trigger_guard_condition()
override
193 BufferUniquePtr buffer_;
194 SubscribedTypeAllocator subscribed_type_allocator_;
195 SubscribedTypeDeleter subscribed_type_deleter_;
RCLCPP_PUBLIC void trigger()
Signal that the condition has been met, notifying both the wait set and listeners,...
Encapsulation of Quality of Service settings.
void add_to_wait_set(rcl_wait_set_t &wait_set) override
Add the Waitable to a wait set.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
Container for subscription's, guard condition's, etc to be waited on.
Template structure used to adapt custom, user-defined types to ROS types.