ROS 2 rclcpp + rcl - humble  humble
ROS 2 C++ Client Library with ROS Client Library
subscription_intra_process.hpp
1 // Copyright 2019 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_HPP_
16 #define RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_HPP_
17 
18 #include <rmw/types.h>
19 
20 #include <memory>
21 #include <stdexcept>
22 #include <string>
23 #include <type_traits>
24 #include <utility>
25 
26 #include "rcl/types.h"
27 
28 #include "rclcpp/any_subscription_callback.hpp"
29 #include "rclcpp/context.hpp"
30 #include "rclcpp/experimental/buffers/intra_process_buffer.hpp"
31 #include "rclcpp/experimental/subscription_intra_process_buffer.hpp"
32 #include "rclcpp/qos.hpp"
33 #include "rclcpp/type_support_decl.hpp"
34 #include "tracetools/tracetools.h"
35 
36 namespace rclcpp
37 {
38 namespace experimental
39 {
40 
41 template<
42  typename MessageT,
43  typename SubscribedType,
44  typename SubscribedTypeAlloc = std::allocator<SubscribedType>,
45  typename SubscribedTypeDeleter = std::default_delete<SubscribedType>,
46  typename ROSMessageType = SubscribedType,
47  typename Alloc = std::allocator<void>
48 >
51  SubscribedType,
52  SubscribedTypeAlloc,
53  SubscribedTypeDeleter,
54  ROSMessageType
55  >
56 {
58  SubscribedType,
59  SubscribedTypeAlloc,
60  SubscribedTypeDeleter,
61  ROSMessageType
62  >;
63 
64 public:
65  RCLCPP_SMART_PTR_DEFINITIONS(SubscriptionIntraProcess)
66 
67  using MessageAllocTraits =
68  typename SubscriptionIntraProcessBufferT::SubscribedTypeAllocatorTraits;
69  using MessageAlloc = typename SubscriptionIntraProcessBufferT::SubscribedTypeAllocator;
70  using ConstMessageSharedPtr = typename SubscriptionIntraProcessBufferT::ConstDataSharedPtr;
71  using MessageUniquePtr = typename SubscriptionIntraProcessBufferT::SubscribedTypeUniquePtr;
72  using BufferUniquePtr = typename SubscriptionIntraProcessBufferT::BufferUniquePtr;
73 
76  std::shared_ptr<Alloc> allocator,
77  rclcpp::Context::SharedPtr context,
78  const std::string & topic_name,
79  const rclcpp::QoS & qos_profile,
81  : SubscriptionIntraProcessBuffer<SubscribedType, SubscribedTypeAlloc,
82  SubscribedTypeDeleter, ROSMessageType>(
83  std::make_shared<SubscribedTypeAlloc>(*allocator),
84  context,
85  topic_name,
86  qos_profile,
87  buffer_type),
88  any_callback_(callback)
89  {
90  TRACEPOINT(
91  rclcpp_subscription_callback_added,
92  static_cast<const void *>(this),
93  static_cast<const void *>(&any_callback_));
94  // The callback object gets copied, so if registration is done too early/before this point
95  // (e.g. in `AnySubscriptionCallback::set()`), its address won't match any address used later
96  // in subsequent tracepoints.
97 #ifndef TRACETOOLS_DISABLED
98  any_callback_.register_callback_for_tracing();
99 #endif
100  }
101 
102  virtual ~SubscriptionIntraProcess() = default;
103 
104  std::shared_ptr<void>
105  take_data() override
106  {
107  ConstMessageSharedPtr shared_msg;
108  MessageUniquePtr unique_msg;
109 
110  if (any_callback_.use_take_shared_method()) {
111  shared_msg = this->buffer_->consume_shared();
112  if (!shared_msg) {
113  return nullptr;
114  }
115  } else {
116  unique_msg = this->buffer_->consume_unique();
117  if (!unique_msg) {
118  return nullptr;
119  }
120  }
121 
122  if (this->buffer_->has_data()) {
123  // If there is data still to be processed, indicate to the
124  // executor or waitset by triggering the guard condition.
125  this->trigger_guard_condition();
126  }
127 
128  return std::static_pointer_cast<void>(
129  std::make_shared<std::pair<ConstMessageSharedPtr, MessageUniquePtr>>(
130  std::pair<ConstMessageSharedPtr, MessageUniquePtr>(
131  shared_msg, std::move(unique_msg)))
132  );
133  }
134 
135  void execute(std::shared_ptr<void> & data) override
136  {
137  execute_impl<SubscribedType>(data);
138  }
139 
140 protected:
141  template<typename T>
142  typename std::enable_if<std::is_same<T, rcl_serialized_message_t>::value, void>::type
143  execute_impl(std::shared_ptr<void> & data)
144  {
145  (void)data;
146  throw std::runtime_error("Subscription intra-process can't handle serialized messages");
147  }
148 
149  template<class T>
150  typename std::enable_if<!std::is_same<T, rcl_serialized_message_t>::value, void>::type
151  execute_impl(std::shared_ptr<void> & data)
152  {
153  if (!data) {
154  return;
155  }
156 
157  rmw_message_info_t msg_info;
158  msg_info.publisher_gid = {0, {0}};
159  msg_info.from_intra_process = true;
160 
161  auto shared_ptr = std::static_pointer_cast<std::pair<ConstMessageSharedPtr, MessageUniquePtr>>(
162  data);
163 
164  if (any_callback_.use_take_shared_method()) {
165  ConstMessageSharedPtr shared_msg = shared_ptr->first;
166  any_callback_.dispatch_intra_process(shared_msg, msg_info);
167  } else {
168  MessageUniquePtr unique_msg = std::move(shared_ptr->second);
169  any_callback_.dispatch_intra_process(std::move(unique_msg), msg_info);
170  }
171  shared_ptr.reset();
172  }
173 
174  AnySubscriptionCallback<MessageT, Alloc> any_callback_;
175 };
176 
177 } // namespace experimental
178 } // namespace rclcpp
179 
180 #endif // RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_HPP_
Encapsulation of Quality of Service settings.
Definition: qos.hpp:111
void execute(std::shared_ptr< void > &data) override
Execute data that is passed in.
std::shared_ptr< void > take_data() override
Take the data so that it can be consumed with execute.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.