15 #ifndef RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BUFFER_HPP_
16 #define RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BUFFER_HPP_
23 #include "rcl/error_handling.h"
27 #include "rclcpp/experimental/buffers/intra_process_buffer.hpp"
28 #include "rclcpp/experimental/create_intra_process_buffer.hpp"
29 #include "rclcpp/experimental/subscription_intra_process_base.hpp"
30 #include "rclcpp/experimental/ros_message_intra_process_buffer.hpp"
31 #include "rclcpp/qos.hpp"
32 #include "rclcpp/type_support_decl.hpp"
33 #include "rclcpp/detail/add_guard_condition_to_rcl_wait_set.hpp"
35 #include "tracetools/tracetools.h"
39 namespace experimental
43 typename SubscribedType,
44 typename Alloc = std::allocator<SubscribedType>,
45 typename Deleter = std::default_delete<SubscribedType>,
48 typename ROSMessageType = SubscribedType
51 typename allocator::AllocRebind<ROSMessageType, Alloc>::allocator_type,
52 allocator::Deleter<typename allocator::AllocRebind<ROSMessageType, Alloc>::allocator_type,
58 using SubscribedTypeAllocatorTraits = allocator::AllocRebind<SubscribedType, Alloc>;
59 using SubscribedTypeAllocator =
typename SubscribedTypeAllocatorTraits::allocator_type;
60 using SubscribedTypeDeleter = allocator::Deleter<SubscribedTypeAllocator, SubscribedType>;
62 using ROSMessageTypeAllocatorTraits = allocator::AllocRebind<ROSMessageType, Alloc>;
63 using ROSMessageTypeAllocator =
typename ROSMessageTypeAllocatorTraits::allocator_type;
64 using ROSMessageTypeDeleter = allocator::Deleter<ROSMessageTypeAllocator, ROSMessageType>;
66 using ConstMessageSharedPtr = std::shared_ptr<const ROSMessageType>;
67 using MessageUniquePtr = std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter>;
69 using ConstDataSharedPtr = std::shared_ptr<const SubscribedType>;
70 using SubscribedTypeUniquePtr = std::unique_ptr<SubscribedType, SubscribedTypeDeleter>;
79 std::shared_ptr<Alloc> allocator,
80 rclcpp::Context::SharedPtr context,
81 const std::string & topic_name,
85 ROSMessageTypeDeleter>(
86 context, topic_name, qos_profile),
87 subscribed_type_allocator_(*allocator)
89 allocator::set_allocator_for_deleter(&subscribed_type_deleter_, &subscribed_type_allocator_);
92 buffer_ = rclcpp::experimental::create_intra_process_buffer<SubscribedType, Alloc,
93 SubscribedTypeDeleter>(
96 std::make_shared<Alloc>(subscribed_type_allocator_));
97 TRACETOOLS_TRACEPOINT(
98 rclcpp_ipb_to_subscription,
99 static_cast<const void *
>(buffer_.get()),
100 static_cast<const void *
>(
this));
106 if (this->buffer_->has_data()) {
107 this->trigger_guard_condition();
109 detail::add_guard_condition_to_rcl_wait_set(wait_set, this->gc_);
116 return buffer_->has_data();
119 SubscribedTypeUniquePtr
120 convert_ros_message_to_subscribed_type_unique_ptr(
const ROSMessageType & msg)
122 if constexpr (!std::is_same<SubscribedType, ROSMessageType>::value) {
123 auto ptr = SubscribedTypeAllocatorTraits::allocate(subscribed_type_allocator_, 1);
124 SubscribedTypeAllocatorTraits::construct(subscribed_type_allocator_, ptr);
126 return SubscribedTypeUniquePtr(ptr, subscribed_type_deleter_);
128 throw std::runtime_error(
129 "convert_ros_message_to_subscribed_type_unique_ptr "
130 "unexpectedly called without TypeAdapter");
135 provide_intra_process_message(ConstMessageSharedPtr message)
override
137 if constexpr (std::is_same<SubscribedType, ROSMessageType>::value) {
138 buffer_->add_shared(std::move(message));
139 trigger_guard_condition();
141 buffer_->add_shared(convert_ros_message_to_subscribed_type_unique_ptr(*message));
142 trigger_guard_condition();
144 this->invoke_on_new_message();
148 provide_intra_process_message(MessageUniquePtr message)
override
150 if constexpr (std::is_same<SubscribedType, ROSMessageType>::value) {
151 buffer_->add_unique(std::move(message));
152 trigger_guard_condition();
154 buffer_->add_unique(convert_ros_message_to_subscribed_type_unique_ptr(*message));
155 trigger_guard_condition();
157 this->invoke_on_new_message();
161 provide_intra_process_data(ConstDataSharedPtr message)
163 buffer_->add_shared(std::move(message));
164 trigger_guard_condition();
165 this->invoke_on_new_message();
169 provide_intra_process_data(SubscribedTypeUniquePtr message)
171 buffer_->add_unique(std::move(message));
172 trigger_guard_condition();
173 this->invoke_on_new_message();
177 use_take_shared_method()
const override
179 return buffer_->use_take_shared_method();
182 size_t available_capacity()
const override
184 return buffer_->available_capacity();
189 trigger_guard_condition()
override
194 BufferUniquePtr buffer_;
195 SubscribedTypeAllocator subscribed_type_allocator_;
196 SubscribedTypeDeleter subscribed_type_deleter_;
RCLCPP_PUBLIC void trigger()
Signal that the condition has been met, notifying both the wait set and listeners,...
Encapsulation of Quality of Service settings.
void add_to_wait_set(rcl_wait_set_t &wait_set) override
Add the Waitable to a wait set.
bool is_ready(const rcl_wait_set_t &wait_set) override
Check if the Waitable is ready.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
Container for subscription's, guard condition's, etc to be waited on.
Template structure used to adapt custom, user-defined types to ROS types.