ROS 2 rclcpp + rcl - kilted  kilted
ROS 2 C++ Client Library with ROS Client Library
subscription_intra_process_buffer.hpp
1 // Copyright 2021 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BUFFER_HPP_
16 #define RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BUFFER_HPP_
17 
18 #include <memory>
19 #include <string>
20 #include <stdexcept>
21 #include <utility>
22 
23 #include "rcl/error_handling.h"
24 #include "rcl/guard_condition.h"
25 #include "rcl/wait.h"
26 
27 #include "rclcpp/experimental/buffers/intra_process_buffer.hpp"
28 #include "rclcpp/experimental/create_intra_process_buffer.hpp"
29 #include "rclcpp/experimental/subscription_intra_process_base.hpp"
30 #include "rclcpp/experimental/ros_message_intra_process_buffer.hpp"
31 #include "rclcpp/qos.hpp"
32 #include "rclcpp/type_support_decl.hpp"
33 #include "rclcpp/detail/add_guard_condition_to_rcl_wait_set.hpp"
34 
35 #include "tracetools/tracetools.h"
36 
37 namespace rclcpp
38 {
39 namespace experimental
40 {
41 
42 template<
43  typename SubscribedType,
44  typename Alloc = std::allocator<SubscribedType>,
45  typename Deleter = std::default_delete<SubscribedType>,
48  typename ROSMessageType = SubscribedType
49 >
51  typename allocator::AllocRebind<ROSMessageType, Alloc>::allocator_type,
52  allocator::Deleter<typename allocator::AllocRebind<ROSMessageType, Alloc>::allocator_type,
53  ROSMessageType>>
54 {
55 public:
56  RCLCPP_SMART_PTR_DEFINITIONS(SubscriptionIntraProcessBuffer)
57 
58  using SubscribedTypeAllocatorTraits = allocator::AllocRebind<SubscribedType, Alloc>;
59  using SubscribedTypeAllocator = typename SubscribedTypeAllocatorTraits::allocator_type;
60  using SubscribedTypeDeleter = allocator::Deleter<SubscribedTypeAllocator, SubscribedType>;
61 
62  using ROSMessageTypeAllocatorTraits = allocator::AllocRebind<ROSMessageType, Alloc>;
63  using ROSMessageTypeAllocator = typename ROSMessageTypeAllocatorTraits::allocator_type;
64  using ROSMessageTypeDeleter = allocator::Deleter<ROSMessageTypeAllocator, ROSMessageType>;
65 
66  using ConstMessageSharedPtr = std::shared_ptr<const ROSMessageType>;
67  using MessageUniquePtr = std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter>;
68 
69  using ConstDataSharedPtr = std::shared_ptr<const SubscribedType>;
70  using SubscribedTypeUniquePtr = std::unique_ptr<SubscribedType, SubscribedTypeDeleter>;
71 
72  using BufferUniquePtr = typename rclcpp::experimental::buffers::IntraProcessBuffer<
73  SubscribedType,
74  Alloc,
75  SubscribedTypeDeleter
76  >::UniquePtr;
77 
79  std::shared_ptr<Alloc> allocator,
80  rclcpp::Context::SharedPtr context,
81  const std::string & topic_name,
82  const rclcpp::QoS & qos_profile,
84  : SubscriptionROSMsgIntraProcessBuffer<ROSMessageType, ROSMessageTypeAllocator,
85  ROSMessageTypeDeleter>(
86  context, topic_name, qos_profile),
87  subscribed_type_allocator_(*allocator)
88  {
89  allocator::set_allocator_for_deleter(&subscribed_type_deleter_, &subscribed_type_allocator_);
90 
91  // Create the intra-process buffer.
92  buffer_ = rclcpp::experimental::create_intra_process_buffer<SubscribedType, Alloc,
93  SubscribedTypeDeleter>(
94  buffer_type,
95  qos_profile,
96  std::make_shared<Alloc>(subscribed_type_allocator_));
97  TRACETOOLS_TRACEPOINT(
98  rclcpp_ipb_to_subscription,
99  static_cast<const void *>(buffer_.get()),
100  static_cast<const void *>(this));
101  }
102 
103  void
104  add_to_wait_set(rcl_wait_set_t & wait_set) override
105  {
106  if (this->buffer_->has_data()) {
107  this->trigger_guard_condition();
108  }
109  detail::add_guard_condition_to_rcl_wait_set(wait_set, this->gc_);
110  }
111 
112  bool
113  is_ready([[maybe_unused]] const rcl_wait_set_t & wait_set) override
114  {
115  return buffer_->has_data();
116  }
117 
118  SubscribedTypeUniquePtr
119  convert_ros_message_to_subscribed_type_unique_ptr(const ROSMessageType & msg)
120  {
121  if constexpr (!std::is_same<SubscribedType, ROSMessageType>::value) {
122  auto ptr = SubscribedTypeAllocatorTraits::allocate(subscribed_type_allocator_, 1);
123  SubscribedTypeAllocatorTraits::construct(subscribed_type_allocator_, ptr);
125  return SubscribedTypeUniquePtr(ptr, subscribed_type_deleter_);
126  } else {
127  throw std::runtime_error(
128  "convert_ros_message_to_subscribed_type_unique_ptr "
129  "unexpectedly called without TypeAdapter");
130  }
131  }
132 
133  void
134  provide_intra_process_message(ConstMessageSharedPtr message) override
135  {
136  if constexpr (std::is_same<SubscribedType, ROSMessageType>::value) {
137  buffer_->add_shared(std::move(message));
138  trigger_guard_condition();
139  } else {
140  buffer_->add_shared(convert_ros_message_to_subscribed_type_unique_ptr(*message));
141  trigger_guard_condition();
142  }
143  this->invoke_on_new_message();
144  }
145 
146  void
147  provide_intra_process_message(MessageUniquePtr message) override
148  {
149  if constexpr (std::is_same<SubscribedType, ROSMessageType>::value) {
150  buffer_->add_unique(std::move(message));
151  trigger_guard_condition();
152  } else {
153  buffer_->add_unique(convert_ros_message_to_subscribed_type_unique_ptr(*message));
154  trigger_guard_condition();
155  }
156  this->invoke_on_new_message();
157  }
158 
159  void
160  provide_intra_process_data(ConstDataSharedPtr message)
161  {
162  buffer_->add_shared(std::move(message));
163  trigger_guard_condition();
164  this->invoke_on_new_message();
165  }
166 
167  void
168  provide_intra_process_data(SubscribedTypeUniquePtr message)
169  {
170  buffer_->add_unique(std::move(message));
171  trigger_guard_condition();
172  this->invoke_on_new_message();
173  }
174 
175  bool
176  use_take_shared_method() const override
177  {
178  return buffer_->use_take_shared_method();
179  }
180 
181  size_t available_capacity() const override
182  {
183  return buffer_->available_capacity();
184  }
185 
186 protected:
187  void
188  trigger_guard_condition() override
189  {
190  this->gc_.trigger();
191  }
192 
193  BufferUniquePtr buffer_;
194  SubscribedTypeAllocator subscribed_type_allocator_;
195  SubscribedTypeDeleter subscribed_type_deleter_;
196 };
197 
198 } // namespace experimental
199 } // namespace rclcpp
200 
201 #endif // RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BUFFER_HPP_
RCLCPP_PUBLIC void trigger()
Signal that the condition has been met, notifying both the wait set and listeners,...
Encapsulation of Quality of Service settings.
Definition: qos.hpp:116
void add_to_wait_set(rcl_wait_set_t &wait_set) override
Add the Waitable to a wait set.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
Container for subscription's, guard condition's, etc to be waited on.
Definition: wait.h:42
Template structure used to adapt custom, user-defined types to ROS types.