ROS 2 rclcpp + rcl - jazzy  jazzy
ROS 2 C++ Client Library with ROS Client Library
subscription_intra_process_buffer.hpp
1 // Copyright 2021 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BUFFER_HPP_
16 #define RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BUFFER_HPP_
17 
18 #include <memory>
19 #include <string>
20 #include <stdexcept>
21 #include <utility>
22 
23 #include "rcl/error_handling.h"
24 #include "rcl/guard_condition.h"
25 #include "rcl/wait.h"
26 
27 #include "rclcpp/experimental/buffers/intra_process_buffer.hpp"
28 #include "rclcpp/experimental/create_intra_process_buffer.hpp"
29 #include "rclcpp/experimental/subscription_intra_process_base.hpp"
30 #include "rclcpp/experimental/ros_message_intra_process_buffer.hpp"
31 #include "rclcpp/qos.hpp"
32 #include "rclcpp/type_support_decl.hpp"
33 #include "rclcpp/detail/add_guard_condition_to_rcl_wait_set.hpp"
34 
35 #include "tracetools/tracetools.h"
36 
37 namespace rclcpp
38 {
39 namespace experimental
40 {
41 
42 template<
43  typename SubscribedType,
44  typename Alloc = std::allocator<SubscribedType>,
45  typename Deleter = std::default_delete<SubscribedType>,
48  typename ROSMessageType = SubscribedType
49 >
51  typename allocator::AllocRebind<ROSMessageType, Alloc>::allocator_type,
52  allocator::Deleter<typename allocator::AllocRebind<ROSMessageType, Alloc>::allocator_type,
53  ROSMessageType>>
54 {
55 public:
56  RCLCPP_SMART_PTR_DEFINITIONS(SubscriptionIntraProcessBuffer)
57 
58  using SubscribedTypeAllocatorTraits = allocator::AllocRebind<SubscribedType, Alloc>;
59  using SubscribedTypeAllocator = typename SubscribedTypeAllocatorTraits::allocator_type;
60  using SubscribedTypeDeleter = allocator::Deleter<SubscribedTypeAllocator, SubscribedType>;
61 
62  using ROSMessageTypeAllocatorTraits = allocator::AllocRebind<ROSMessageType, Alloc>;
63  using ROSMessageTypeAllocator = typename ROSMessageTypeAllocatorTraits::allocator_type;
64  using ROSMessageTypeDeleter = allocator::Deleter<ROSMessageTypeAllocator, ROSMessageType>;
65 
66  using ConstMessageSharedPtr = std::shared_ptr<const ROSMessageType>;
67  using MessageUniquePtr = std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter>;
68 
69  using ConstDataSharedPtr = std::shared_ptr<const SubscribedType>;
70  using SubscribedTypeUniquePtr = std::unique_ptr<SubscribedType, SubscribedTypeDeleter>;
71 
72  using BufferUniquePtr = typename rclcpp::experimental::buffers::IntraProcessBuffer<
73  SubscribedType,
74  Alloc,
75  SubscribedTypeDeleter
76  >::UniquePtr;
77 
79  std::shared_ptr<Alloc> allocator,
80  rclcpp::Context::SharedPtr context,
81  const std::string & topic_name,
82  const rclcpp::QoS & qos_profile,
84  : SubscriptionROSMsgIntraProcessBuffer<ROSMessageType, ROSMessageTypeAllocator,
85  ROSMessageTypeDeleter>(
86  context, topic_name, qos_profile),
87  subscribed_type_allocator_(*allocator)
88  {
89  allocator::set_allocator_for_deleter(&subscribed_type_deleter_, &subscribed_type_allocator_);
90 
91  // Create the intra-process buffer.
92  buffer_ = rclcpp::experimental::create_intra_process_buffer<SubscribedType, Alloc,
93  SubscribedTypeDeleter>(
94  buffer_type,
95  qos_profile,
96  std::make_shared<Alloc>(subscribed_type_allocator_));
97  TRACETOOLS_TRACEPOINT(
98  rclcpp_ipb_to_subscription,
99  static_cast<const void *>(buffer_.get()),
100  static_cast<const void *>(this));
101  }
102 
103  void
104  add_to_wait_set(rcl_wait_set_t & wait_set) override
105  {
106  if (this->buffer_->has_data()) {
107  this->trigger_guard_condition();
108  }
109  detail::add_guard_condition_to_rcl_wait_set(wait_set, this->gc_);
110  }
111 
112  bool
113  is_ready(const rcl_wait_set_t & wait_set) override
114  {
115  (void) wait_set;
116  return buffer_->has_data();
117  }
118 
119  SubscribedTypeUniquePtr
120  convert_ros_message_to_subscribed_type_unique_ptr(const ROSMessageType & msg)
121  {
122  if constexpr (!std::is_same<SubscribedType, ROSMessageType>::value) {
123  auto ptr = SubscribedTypeAllocatorTraits::allocate(subscribed_type_allocator_, 1);
124  SubscribedTypeAllocatorTraits::construct(subscribed_type_allocator_, ptr);
126  return SubscribedTypeUniquePtr(ptr, subscribed_type_deleter_);
127  } else {
128  throw std::runtime_error(
129  "convert_ros_message_to_subscribed_type_unique_ptr "
130  "unexpectedly called without TypeAdapter");
131  }
132  }
133 
134  void
135  provide_intra_process_message(ConstMessageSharedPtr message) override
136  {
137  if constexpr (std::is_same<SubscribedType, ROSMessageType>::value) {
138  buffer_->add_shared(std::move(message));
139  trigger_guard_condition();
140  } else {
141  buffer_->add_shared(convert_ros_message_to_subscribed_type_unique_ptr(*message));
142  trigger_guard_condition();
143  }
144  this->invoke_on_new_message();
145  }
146 
147  void
148  provide_intra_process_message(MessageUniquePtr message) override
149  {
150  if constexpr (std::is_same<SubscribedType, ROSMessageType>::value) {
151  buffer_->add_unique(std::move(message));
152  trigger_guard_condition();
153  } else {
154  buffer_->add_unique(convert_ros_message_to_subscribed_type_unique_ptr(*message));
155  trigger_guard_condition();
156  }
157  this->invoke_on_new_message();
158  }
159 
160  void
161  provide_intra_process_data(ConstDataSharedPtr message)
162  {
163  buffer_->add_shared(std::move(message));
164  trigger_guard_condition();
165  this->invoke_on_new_message();
166  }
167 
168  void
169  provide_intra_process_data(SubscribedTypeUniquePtr message)
170  {
171  buffer_->add_unique(std::move(message));
172  trigger_guard_condition();
173  this->invoke_on_new_message();
174  }
175 
176  bool
177  use_take_shared_method() const override
178  {
179  return buffer_->use_take_shared_method();
180  }
181 
182  size_t available_capacity() const override
183  {
184  return buffer_->available_capacity();
185  }
186 
187 protected:
188  void
189  trigger_guard_condition() override
190  {
191  this->gc_.trigger();
192  }
193 
194  BufferUniquePtr buffer_;
195  SubscribedTypeAllocator subscribed_type_allocator_;
196  SubscribedTypeDeleter subscribed_type_deleter_;
197 };
198 
199 } // namespace experimental
200 } // namespace rclcpp
201 
202 #endif // RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BUFFER_HPP_
RCLCPP_PUBLIC void trigger()
Signal that the condition has been met, notifying both the wait set and listeners,...
Encapsulation of Quality of Service settings.
Definition: qos.hpp:116
void add_to_wait_set(rcl_wait_set_t &wait_set) override
Add the Waitable to a wait set.
bool is_ready(const rcl_wait_set_t &wait_set) override
Check if the Waitable is ready.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
Container for subscription's, guard condition's, etc to be waited on.
Definition: wait.h:42
Template structure used to adapt custom, user-defined types to ROS types.