ROS 2 rclcpp + rcl - humble  humble
ROS 2 C++ Client Library with ROS Client Library
subscription_intra_process_buffer.hpp
1 // Copyright 2021 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BUFFER_HPP_
16 #define RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BUFFER_HPP_
17 
18 #include <memory>
19 #include <string>
20 #include <stdexcept>
21 #include <utility>
22 
23 #include "rcl/error_handling.h"
24 #include "rcl/guard_condition.h"
25 #include "rcl/wait.h"
26 
27 #include "rclcpp/experimental/buffers/intra_process_buffer.hpp"
28 #include "rclcpp/experimental/create_intra_process_buffer.hpp"
29 #include "rclcpp/experimental/subscription_intra_process_base.hpp"
30 #include "rclcpp/experimental/ros_message_intra_process_buffer.hpp"
31 #include "rclcpp/qos.hpp"
32 #include "rclcpp/type_support_decl.hpp"
33 
34 namespace rclcpp
35 {
36 namespace experimental
37 {
38 
39 template<
40  typename SubscribedType,
41  typename Alloc = std::allocator<SubscribedType>,
42  typename Deleter = std::default_delete<SubscribedType>,
45  typename ROSMessageType = SubscribedType
46 >
48  typename allocator::AllocRebind<ROSMessageType, Alloc>::allocator_type,
49  allocator::Deleter<typename allocator::AllocRebind<ROSMessageType, Alloc>::allocator_type,
50  ROSMessageType>>
51 {
52 public:
53  RCLCPP_SMART_PTR_DEFINITIONS(SubscriptionIntraProcessBuffer)
54 
55  using SubscribedTypeAllocatorTraits = allocator::AllocRebind<SubscribedType, Alloc>;
56  using SubscribedTypeAllocator = typename SubscribedTypeAllocatorTraits::allocator_type;
57  using SubscribedTypeDeleter = allocator::Deleter<SubscribedTypeAllocator, SubscribedType>;
58 
59  using ROSMessageTypeAllocatorTraits = allocator::AllocRebind<ROSMessageType, Alloc>;
60  using ROSMessageTypeAllocator = typename ROSMessageTypeAllocatorTraits::allocator_type;
61  using ROSMessageTypeDeleter = allocator::Deleter<ROSMessageTypeAllocator, ROSMessageType>;
62 
63  using ConstMessageSharedPtr = std::shared_ptr<const ROSMessageType>;
64  using MessageUniquePtr = std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter>;
65 
66  using ConstDataSharedPtr = std::shared_ptr<const SubscribedType>;
67  using SubscribedTypeUniquePtr = std::unique_ptr<SubscribedType, SubscribedTypeDeleter>;
68 
69  using BufferUniquePtr = typename rclcpp::experimental::buffers::IntraProcessBuffer<
70  SubscribedType,
71  Alloc,
72  SubscribedTypeDeleter
73  >::UniquePtr;
74 
76  std::shared_ptr<Alloc> allocator,
77  rclcpp::Context::SharedPtr context,
78  const std::string & topic_name,
79  const rclcpp::QoS & qos_profile,
81  : SubscriptionROSMsgIntraProcessBuffer<ROSMessageType, ROSMessageTypeAllocator,
82  ROSMessageTypeDeleter>(
83  context, topic_name, qos_profile),
84  subscribed_type_allocator_(*allocator)
85  {
86  allocator::set_allocator_for_deleter(&subscribed_type_deleter_, &subscribed_type_allocator_);
87 
88  // Create the intra-process buffer.
89  buffer_ = rclcpp::experimental::create_intra_process_buffer<SubscribedType, Alloc,
90  SubscribedTypeDeleter>(
91  buffer_type,
92  qos_profile,
93  std::make_shared<Alloc>(subscribed_type_allocator_));
94  }
95 
96  bool
97  is_ready(rcl_wait_set_t * wait_set) override
98  {
99  (void) wait_set;
100  return buffer_->has_data();
101  }
102 
103  SubscribedTypeUniquePtr
104  convert_ros_message_to_subscribed_type_unique_ptr(const ROSMessageType & msg)
105  {
106  if constexpr (!std::is_same<SubscribedType, ROSMessageType>::value) {
107  auto ptr = SubscribedTypeAllocatorTraits::allocate(subscribed_type_allocator_, 1);
108  SubscribedTypeAllocatorTraits::construct(subscribed_type_allocator_, ptr);
110  return SubscribedTypeUniquePtr(ptr, subscribed_type_deleter_);
111  } else {
112  throw std::runtime_error(
113  "convert_ros_message_to_subscribed_type_unique_ptr "
114  "unexpectedly called without TypeAdapter");
115  }
116  }
117 
118  void
119  provide_intra_process_message(ConstMessageSharedPtr message) override
120  {
121  if constexpr (std::is_same<SubscribedType, ROSMessageType>::value) {
122  buffer_->add_shared(std::move(message));
123  trigger_guard_condition();
124  } else {
125  buffer_->add_shared(convert_ros_message_to_subscribed_type_unique_ptr(*message));
126  trigger_guard_condition();
127  }
128  this->invoke_on_new_message();
129  }
130 
131  void
132  provide_intra_process_message(MessageUniquePtr message) override
133  {
134  if constexpr (std::is_same<SubscribedType, ROSMessageType>::value) {
135  buffer_->add_unique(std::move(message));
136  trigger_guard_condition();
137  } else {
138  buffer_->add_unique(convert_ros_message_to_subscribed_type_unique_ptr(*message));
139  trigger_guard_condition();
140  }
141  this->invoke_on_new_message();
142  }
143 
144  void
145  provide_intra_process_data(ConstDataSharedPtr message)
146  {
147  buffer_->add_shared(std::move(message));
148  trigger_guard_condition();
149  this->invoke_on_new_message();
150  }
151 
152  void
153  provide_intra_process_data(SubscribedTypeUniquePtr message)
154  {
155  buffer_->add_unique(std::move(message));
156  trigger_guard_condition();
157  this->invoke_on_new_message();
158  }
159 
160  bool
161  use_take_shared_method() const override
162  {
163  return buffer_->use_take_shared_method();
164  }
165 
166 protected:
167  void
168  trigger_guard_condition() override
169  {
170  this->gc_.trigger();
171  }
172 
173  BufferUniquePtr buffer_;
174  SubscribedTypeAllocator subscribed_type_allocator_;
175  SubscribedTypeDeleter subscribed_type_deleter_;
176 };
177 
178 } // namespace experimental
179 } // namespace rclcpp
180 
181 #endif // RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BUFFER_HPP_
RCLCPP_PUBLIC void trigger()
Notify the wait set waiting on this condition, if any, that the condition had been met.
Encapsulation of Quality of Service settings.
Definition: qos.hpp:111
bool is_ready(rcl_wait_set_t *wait_set) override
Check if the Waitable is ready.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
Container for subscription's, guard condition's, etc to be waited on.
Definition: wait.h:42
Template structure used to adapt custom, user-defined types to ROS types.