ROS 2 rclcpp + rcl - rolling  rolling-a919a6e5
ROS 2 C++ Client Library with ROS Client Library
subscription_intra_process.hpp
1 // Copyright 2019 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_HPP_
16 #define RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_HPP_
17 
18 #include <rmw/types.h>
19 
20 #include <memory>
21 #include <stdexcept>
22 #include <string>
23 #include <type_traits>
24 #include <utility>
25 
26 #include "rcl/types.h"
27 
28 #include "rclcpp/any_subscription_callback.hpp"
29 #include "rclcpp/context.hpp"
30 #include "rclcpp/experimental/buffers/intra_process_buffer.hpp"
31 #include "rclcpp/experimental/subscription_intra_process_buffer.hpp"
32 #include "rclcpp/qos.hpp"
33 #include "rclcpp/type_support_decl.hpp"
34 #include "tracetools/tracetools.h"
35 
36 namespace rclcpp
37 {
38 namespace experimental
39 {
40 
41 template<
42  typename MessageT,
43  typename SubscribedType,
44  typename SubscribedTypeAlloc = std::allocator<SubscribedType>,
45  typename SubscribedTypeDeleter = std::default_delete<SubscribedType>,
46  typename ROSMessageType = SubscribedType,
47  typename Alloc = std::allocator<void>
48 >
51  SubscribedType,
52  SubscribedTypeAlloc,
53  SubscribedTypeDeleter,
54  ROSMessageType
55  >
56 {
58  SubscribedType,
59  SubscribedTypeAlloc,
60  SubscribedTypeDeleter,
61  ROSMessageType
62  >;
63 
64 public:
65  RCLCPP_SMART_PTR_DEFINITIONS(SubscriptionIntraProcess)
66 
67  using MessageAllocTraits =
68  typename SubscriptionIntraProcessBufferT::SubscribedTypeAllocatorTraits;
69  using MessageAlloc = typename SubscriptionIntraProcessBufferT::SubscribedTypeAllocator;
70  using ConstMessageSharedPtr = typename SubscriptionIntraProcessBufferT::ConstDataSharedPtr;
71  using MessageUniquePtr = typename SubscriptionIntraProcessBufferT::SubscribedTypeUniquePtr;
72  using BufferUniquePtr = typename SubscriptionIntraProcessBufferT::BufferUniquePtr;
73 
76  std::shared_ptr<Alloc> allocator,
77  rclcpp::Context::SharedPtr context,
78  const std::string & topic_name,
79  const rclcpp::QoS & qos_profile,
81  : SubscriptionIntraProcessBuffer<SubscribedType, SubscribedTypeAlloc,
82  SubscribedTypeDeleter, ROSMessageType>(
83  std::make_shared<SubscribedTypeAlloc>(*allocator),
84  context,
85  topic_name,
86  qos_profile,
87  buffer_type),
88  any_callback_(callback)
89  {
90  TRACETOOLS_TRACEPOINT(
91  rclcpp_subscription_callback_added,
92  static_cast<const void *>(this),
93  static_cast<const void *>(&any_callback_));
94  // The callback object gets copied, so if registration is done too early/before this point
95  // (e.g. in `AnySubscriptionCallback::set()`), its address won't match any address used later
96  // in subsequent tracepoints.
97 #ifndef TRACETOOLS_DISABLED
98  any_callback_.register_callback_for_tracing();
99 #endif
100  }
101 
102  virtual ~SubscriptionIntraProcess() = default;
103 
104  void
105  add_to_wait_set(rcl_wait_set_t & wait_set) override
106  {
107  // This block is necessary when the guard condition wakes the wait set, but
108  // the intra process waitable was not handled before the wait set is waited
109  // on again.
110  // Basically we're keeping the guard condition triggered so long as there is
111  // data in the buffer.
112  if (this->buffer_->has_data()) {
113  // If there is data still to be processed, indicate to the
114  // executor or waitset by triggering the guard condition.
115  this->trigger_guard_condition();
116  }
117  // Let the parent classes handle the rest of the work:
119  }
120 
121  std::shared_ptr<void>
122  take_data() override
123  {
124  ConstMessageSharedPtr shared_msg;
125  MessageUniquePtr unique_msg;
126 
127  if (any_callback_.use_take_shared_method()) {
128  shared_msg = this->buffer_->consume_shared();
129  if (!shared_msg) {
130  return nullptr;
131  }
132  } else {
133  unique_msg = this->buffer_->consume_unique();
134  if (!unique_msg) {
135  return nullptr;
136  }
137  }
138 
139  if (this->buffer_->has_data()) {
140  // If there is data still to be processed, indicate to the
141  // executor or waitset by triggering the guard condition.
142  this->trigger_guard_condition();
143  }
144 
145  return std::static_pointer_cast<void>(
146  std::make_shared<std::pair<ConstMessageSharedPtr, MessageUniquePtr>>(
147  std::pair<ConstMessageSharedPtr, MessageUniquePtr>(
148  shared_msg, std::move(unique_msg)))
149  );
150  }
151 
152  void execute(const std::shared_ptr<void> & data) override
153  {
154  execute_impl<SubscribedType>(data);
155  }
156 
157 protected:
158  template<typename T>
159  typename std::enable_if<std::is_same<T, rcl_serialized_message_t>::value, void>::type
160  execute_impl(const std::shared_ptr<void> &)
161  {
162  throw std::runtime_error("Subscription intra-process can't handle serialized messages");
163  }
164 
165  template<class T>
166  typename std::enable_if<!std::is_same<T, rcl_serialized_message_t>::value, void>::type
167  execute_impl(const std::shared_ptr<void> & data)
168  {
169  if (nullptr == data) {
170  return;
171  }
172 
173  rmw_message_info_t msg_info;
174  msg_info.publisher_gid = {0, {0}};
175  msg_info.from_intra_process = true;
176 
177  auto shared_ptr = std::static_pointer_cast<std::pair<ConstMessageSharedPtr, MessageUniquePtr>>(
178  data);
179 
180  if (any_callback_.use_take_shared_method()) {
181  ConstMessageSharedPtr shared_msg = shared_ptr->first;
182  any_callback_.dispatch_intra_process(shared_msg, msg_info);
183  } else {
184  MessageUniquePtr unique_msg = std::move(shared_ptr->second);
185  any_callback_.dispatch_intra_process(std::move(unique_msg), msg_info);
186  }
187  shared_ptr.reset();
188  }
189 
190  AnySubscriptionCallback<MessageT, Alloc> any_callback_;
191 };
192 
193 } // namespace experimental
194 } // namespace rclcpp
195 
196 #endif // RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_HPP_
Encapsulation of Quality of Service settings.
Definition: qos.hpp:116
void add_to_wait_set(rcl_wait_set_t &wait_set) override
Add the Waitable to a wait set.
std::shared_ptr< void > take_data() override
Take the data so that it can be consumed with execute.
void add_to_wait_set(rcl_wait_set_t &wait_set) override
Add the Waitable to a wait set.
void execute(const std::shared_ptr< void > &data) override
Execute data that is passed in.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
Container for subscription's, guard condition's, etc to be waited on.
Definition: wait.h:42