ROS 2 rclcpp + rcl - humble  humble
ROS 2 C++ Client Library with ROS Client Library
subscription.hpp
1 // Copyright 2014 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef RCLCPP__SUBSCRIPTION_HPP_
16 #define RCLCPP__SUBSCRIPTION_HPP_
17 
18 #include <rmw/error_handling.h>
19 #include <rmw/rmw.h>
20 
21 #include <chrono>
22 #include <functional>
23 #include <iostream>
24 #include <memory>
25 #include <sstream>
26 #include <string>
27 #include <utility>
28 
29 #include "rcl/error_handling.h"
30 #include "rcl/subscription.h"
31 
32 #include "rclcpp/any_subscription_callback.hpp"
33 #include "rclcpp/detail/resolve_use_intra_process.hpp"
34 #include "rclcpp/detail/resolve_intra_process_buffer_type.hpp"
35 #include "rclcpp/exceptions.hpp"
36 #include "rclcpp/expand_topic_or_service_name.hpp"
37 #include "rclcpp/experimental/intra_process_manager.hpp"
38 #include "rclcpp/experimental/subscription_intra_process.hpp"
39 #include "rclcpp/logging.hpp"
40 #include "rclcpp/macros.hpp"
41 #include "rclcpp/message_info.hpp"
42 #include "rclcpp/message_memory_strategy.hpp"
43 #include "rclcpp/node_interfaces/node_base_interface.hpp"
44 #include "rclcpp/subscription_base.hpp"
45 #include "rclcpp/subscription_options.hpp"
46 #include "rclcpp/subscription_traits.hpp"
47 #include "rclcpp/type_support_decl.hpp"
48 #include "rclcpp/visibility_control.hpp"
49 #include "rclcpp/waitable.hpp"
50 #include "rclcpp/topic_statistics/subscription_topic_statistics.hpp"
51 #include "tracetools/tracetools.h"
52 
53 namespace rclcpp
54 {
55 
56 namespace node_interfaces
57 {
58 class NodeTopicsInterface;
59 } // namespace node_interfaces
60 
62 template<
63  typename MessageT,
64  typename AllocatorT = std::allocator<void>,
67  typename SubscribedT = typename rclcpp::TypeAdapter<MessageT>::custom_type,
70  typename ROSMessageT = typename rclcpp::TypeAdapter<MessageT>::ros_message_type,
71  typename MessageMemoryStrategyT = rclcpp::message_memory_strategy::MessageMemoryStrategy<
72  ROSMessageT,
73  AllocatorT
74  >>
76 {
78 
79 public:
80  // Redeclare these here to use outside of the class.
81  using SubscribedType = SubscribedT;
82  using ROSMessageType = ROSMessageT;
83  using MessageMemoryStrategyType = MessageMemoryStrategyT;
84 
85  using SubscribedTypeAllocatorTraits = allocator::AllocRebind<SubscribedType, AllocatorT>;
86  using SubscribedTypeAllocator = typename SubscribedTypeAllocatorTraits::allocator_type;
87  using SubscribedTypeDeleter = allocator::Deleter<SubscribedTypeAllocator, SubscribedType>;
88 
89  using ROSMessageTypeAllocatorTraits = allocator::AllocRebind<ROSMessageType, AllocatorT>;
90  using ROSMessageTypeAllocator = typename ROSMessageTypeAllocatorTraits::allocator_type;
91  using ROSMessageTypeDeleter = allocator::Deleter<ROSMessageTypeAllocator, ROSMessageType>;
92 
93  using MessageAllocatorTraits [[deprecated("use ROSMessageTypeAllocatorTraits")]] =
94  ROSMessageTypeAllocatorTraits;
95  using MessageAllocator [[deprecated("use ROSMessageTypeAllocator")]] =
96  ROSMessageTypeAllocator;
97  using MessageDeleter [[deprecated("use ROSMessageTypeDeleter")]] =
98  ROSMessageTypeDeleter;
99 
100  using ConstMessageSharedPtr [[deprecated]] = std::shared_ptr<const ROSMessageType>;
101  using MessageUniquePtr
102  [[deprecated("use std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter> instead")]] =
103  std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter>;
104 
105 private:
106  using SubscriptionTopicStatisticsSharedPtr =
107  std::shared_ptr<rclcpp::topic_statistics::SubscriptionTopicStatistics<ROSMessageType>>;
108 
109 public:
110  RCLCPP_SMART_PTR_DEFINITIONS(Subscription)
111 
112 
131  rclcpp::node_interfaces::NodeBaseInterface * node_base,
132  const rosidl_message_type_support_t & type_support_handle,
133  const std::string & topic_name,
134  const rclcpp::QoS & qos,
135  AnySubscriptionCallback<MessageT, AllocatorT> callback,
136  const rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> & options,
137  typename MessageMemoryStrategyT::SharedPtr message_memory_strategy,
138  SubscriptionTopicStatisticsSharedPtr subscription_topic_statistics = nullptr)
140  node_base,
141  type_support_handle,
142  topic_name,
143  options.template to_rcl_subscription_options<ROSMessageType>(qos),
144  callback.is_serialized_message_callback()),
145  any_callback_(callback),
146  options_(options),
147  message_memory_strategy_(message_memory_strategy)
148  {
149  if (options_.event_callbacks.deadline_callback) {
150  this->add_event_handler(
151  options_.event_callbacks.deadline_callback,
152  RCL_SUBSCRIPTION_REQUESTED_DEADLINE_MISSED);
153  }
154  if (options_.event_callbacks.liveliness_callback) {
155  this->add_event_handler(
156  options_.event_callbacks.liveliness_callback,
157  RCL_SUBSCRIPTION_LIVELINESS_CHANGED);
158  }
159  if (options_.event_callbacks.incompatible_qos_callback) {
160  this->add_event_handler(
161  options_.event_callbacks.incompatible_qos_callback,
162  RCL_SUBSCRIPTION_REQUESTED_INCOMPATIBLE_QOS);
163  } else if (options_.use_default_callbacks) {
164  // Register default callback when not specified
165  try {
166  this->add_event_handler(
167  [this](QOSRequestedIncompatibleQoSInfo & info) {
168  this->default_incompatible_qos_callback(info);
169  },
170  RCL_SUBSCRIPTION_REQUESTED_INCOMPATIBLE_QOS);
171  } catch (UnsupportedEventTypeException & /*exc*/) {
172  // pass
173  }
174  }
175  if (options_.event_callbacks.message_lost_callback) {
176  this->add_event_handler(
177  options_.event_callbacks.message_lost_callback,
178  RCL_SUBSCRIPTION_MESSAGE_LOST);
179  }
180 
181  // Setup intra process publishing if requested.
182  if (rclcpp::detail::resolve_use_intra_process(options_, *node_base)) {
183  using rclcpp::detail::resolve_intra_process_buffer_type;
184 
185  // Check if the QoS is compatible with intra-process.
186  auto qos_profile = get_actual_qos();
187  if (qos_profile.history() != rclcpp::HistoryPolicy::KeepLast) {
188  throw std::invalid_argument(
189  "intraprocess communication on topic '" + topic_name +
190  "' allowed only with keep last history qos policy");
191  }
192  if (qos_profile.depth() == 0) {
193  throw std::invalid_argument(
194  "intraprocess communication on topic '" + topic_name +
195  "' is not allowed with 0 depth qos policy");
196  }
197  if (qos_profile.durability() != rclcpp::DurabilityPolicy::Volatile) {
198  throw std::invalid_argument(
199  "intraprocess communication allowed only with volatile durability");
200  }
201 
202  using SubscriptionIntraProcessT = rclcpp::experimental::SubscriptionIntraProcess<
203  MessageT,
204  SubscribedType,
205  SubscribedTypeAllocator,
206  SubscribedTypeDeleter,
207  ROSMessageT,
208  AllocatorT>;
209 
210  // First create a SubscriptionIntraProcess which will be given to the intra-process manager.
211  auto context = node_base->get_context();
212  subscription_intra_process_ = std::make_shared<SubscriptionIntraProcessT>(
213  callback,
214  options_.get_allocator(),
215  context,
216  this->get_topic_name(), // important to get like this, as it has the fully-qualified name
217  qos_profile,
218  resolve_intra_process_buffer_type(options_.intra_process_buffer_type, callback));
219  TRACEPOINT(
220  rclcpp_subscription_init,
221  static_cast<const void *>(get_subscription_handle().get()),
222  static_cast<const void *>(subscription_intra_process_.get()));
223 
224  // Add it to the intra process manager.
226  auto ipm = context->get_sub_context<IntraProcessManager>();
227  uint64_t intra_process_subscription_id = ipm->add_subscription(subscription_intra_process_);
228  this->setup_intra_process(intra_process_subscription_id, ipm);
229  }
230 
231  if (subscription_topic_statistics != nullptr) {
232  this->subscription_topic_statistics_ = std::move(subscription_topic_statistics);
233  }
234 
235  TRACEPOINT(
236  rclcpp_subscription_init,
237  static_cast<const void *>(get_subscription_handle().get()),
238  static_cast<const void *>(this));
239  TRACEPOINT(
240  rclcpp_subscription_callback_added,
241  static_cast<const void *>(this),
242  static_cast<const void *>(&any_callback_));
243  // The callback object gets copied, so if registration is done too early/before this point
244  // (e.g. in `AnySubscriptionCallback::set()`), its address won't match any address used later
245  // in subsequent tracepoints.
246 #ifndef TRACETOOLS_DISABLED
247  any_callback_.register_callback_for_tracing();
248 #endif
249  }
250 
252  void
255  const rclcpp::QoS & qos,
257  {
258  (void)node_base;
259  (void)qos;
260  (void)options;
261  }
262 
264 
281  bool
282  take(ROSMessageType & message_out, rclcpp::MessageInfo & message_info_out)
283  {
284  return this->take_type_erased(static_cast<void *>(&message_out), message_info_out);
285  }
286 
288 
294  template<typename TakeT>
295  std::enable_if_t<
296  !rosidl_generator_traits::is_message<TakeT>::value &&
297  std::is_same_v<TakeT, SubscribedType>,
298  bool
299  >
300  take(TakeT & message_out, rclcpp::MessageInfo & message_info_out)
301  {
302  ROSMessageType local_message;
303  bool taken = this->take_type_erased(static_cast<void *>(&local_message), message_info_out);
304  if (taken) {
305  rclcpp::TypeAdapter<MessageT>::convert_to_custom(local_message, message_out);
306  }
307  return taken;
308  }
309 
310  std::shared_ptr<void>
311  create_message() override
312  {
313  /* The default message memory strategy provides a dynamically allocated message on each call to
314  * create_message, though alternative memory strategies that re-use a preallocated message may be
315  * used (see rclcpp/strategies/message_pool_memory_strategy.hpp).
316  */
317  return message_memory_strategy_->borrow_message();
318  }
319 
320  std::shared_ptr<rclcpp::SerializedMessage>
322  {
323  return message_memory_strategy_->borrow_serialized_message();
324  }
325 
326  void
328  std::shared_ptr<void> & message,
329  const rclcpp::MessageInfo & message_info) override
330  {
331  if (matches_any_intra_process_publishers(&message_info.get_rmw_message_info().publisher_gid)) {
332  // In this case, the message will be delivered via intra process and
333  // we should ignore this copy of the message.
334  return;
335  }
336  auto typed_message = std::static_pointer_cast<ROSMessageType>(message);
337 
338  std::chrono::time_point<std::chrono::system_clock> now;
339  if (subscription_topic_statistics_) {
340  // get current time before executing callback to
341  // exclude callback duration from topic statistics result.
342  now = std::chrono::system_clock::now();
343  }
344 
345  any_callback_.dispatch(typed_message, message_info);
346 
347  if (subscription_topic_statistics_) {
348  const auto nanos = std::chrono::time_point_cast<std::chrono::nanoseconds>(now);
349  const auto time = rclcpp::Time(nanos.time_since_epoch().count());
350  subscription_topic_statistics_->handle_message(*typed_message, time);
351  }
352  }
353 
354  void
355  handle_serialized_message(
356  const std::shared_ptr<rclcpp::SerializedMessage> & serialized_message,
357  const rclcpp::MessageInfo & message_info) override
358  {
359  // TODO(wjwwood): enable topic statistics for serialized messages
360  any_callback_.dispatch(serialized_message, message_info);
361  }
362 
363  void
364  handle_loaned_message(
365  void * loaned_message,
366  const rclcpp::MessageInfo & message_info) override
367  {
368  if (matches_any_intra_process_publishers(&message_info.get_rmw_message_info().publisher_gid)) {
369  // In this case, the message will be delivered via intra process and
370  // we should ignore this copy of the message.
371  return;
372  }
373 
374  auto typed_message = static_cast<ROSMessageType *>(loaned_message);
375  // message is loaned, so we have to make sure that the deleter does not deallocate the message
376  auto sptr = std::shared_ptr<ROSMessageType>(
377  typed_message, [](ROSMessageType * msg) {(void) msg;});
378 
379  std::chrono::time_point<std::chrono::system_clock> now;
380  if (subscription_topic_statistics_) {
381  // get current time before executing callback to
382  // exclude callback duration from topic statistics result.
383  now = std::chrono::system_clock::now();
384  }
385 
386  any_callback_.dispatch(sptr, message_info);
387 
388  if (subscription_topic_statistics_) {
389  const auto nanos = std::chrono::time_point_cast<std::chrono::nanoseconds>(now);
390  const auto time = rclcpp::Time(nanos.time_since_epoch().count());
391  subscription_topic_statistics_->handle_message(*typed_message, time);
392  }
393  }
394 
396 
399  void
400  return_message(std::shared_ptr<void> & message) override
401  {
402  auto typed_message = std::static_pointer_cast<ROSMessageType>(message);
403  message_memory_strategy_->return_message(typed_message);
404  }
405 
407 
410  void
411  return_serialized_message(std::shared_ptr<rclcpp::SerializedMessage> & message) override
412  {
413  message_memory_strategy_->return_serialized_message(message);
414  }
415 
416  bool
417  use_take_shared_method() const
418  {
419  return any_callback_.use_take_shared_method();
420  }
421 
422 private:
423  RCLCPP_DISABLE_COPY(Subscription)
424 
425  AnySubscriptionCallback<MessageT, AllocatorT> any_callback_;
427 
432  typename message_memory_strategy::MessageMemoryStrategy<ROSMessageType, AllocatorT>::SharedPtr
433  message_memory_strategy_;
434 
436  SubscriptionTopicStatisticsSharedPtr subscription_topic_statistics_{nullptr};
437 };
438 
439 } // namespace rclcpp
440 
441 #endif // RCLCPP__SUBSCRIPTION_HPP_
Additional meta data about messages taken from subscriptions.
const rmw_message_info_t & get_rmw_message_info() const
Return the message info as the underlying rmw message info type.
Encapsulation of Quality of Service settings.
Definition: qos.hpp:111
RCLCPP_PUBLIC rclcpp::QoS get_actual_qos() const
Get the actual QoS settings, after the defaults have been determined.
RCLCPP_PUBLIC void setup_intra_process(uint64_t intra_process_subscription_id, IntraProcessManagerWeakPtr weak_ipm)
Implemenation detail.
RCLCPP_PUBLIC bool take_type_erased(void *message_out, rclcpp::MessageInfo &message_info_out)
Take the next inter-process message from the subscription as a type erased pointer.
Subscription implementation, templated on the type of message this subscription receives.
std::enable_if_t< !rosidl_generator_traits::is_message< TakeT >::value &&std::is_same_v< TakeT, SubscribedType >, bool > take(TakeT &message_out, rclcpp::MessageInfo &message_info_out)
Take the next message from the inter-process subscription.
Subscription(rclcpp::node_interfaces::NodeBaseInterface *node_base, const rosidl_message_type_support_t &type_support_handle, const std::string &topic_name, const rclcpp::QoS &qos, AnySubscriptionCallback< MessageT, AllocatorT > callback, const rclcpp::SubscriptionOptionsWithAllocator< AllocatorT > &options, typename MessageMemoryStrategyT::SharedPtr message_memory_strategy, SubscriptionTopicStatisticsSharedPtr subscription_topic_statistics=nullptr)
Default constructor.
void return_serialized_message(std::shared_ptr< rclcpp::SerializedMessage > &message) override
Return the borrowed serialized message.
void post_init_setup(rclcpp::node_interfaces::NodeBaseInterface *node_base, const rclcpp::QoS &qos, const rclcpp::SubscriptionOptionsWithAllocator< AllocatorT > &options)
Called after construction to continue setup that requires shared_from_this().
std::shared_ptr< rclcpp::SerializedMessage > create_serialized_message() override
Borrow a new serialized message.
void return_message(std::shared_ptr< void > &message) override
Return the borrowed message.
void handle_message(std::shared_ptr< void > &message, const rclcpp::MessageInfo &message_info) override
Check if we need to handle the message, and execute the callback if we do.
std::shared_ptr< void > create_message() override
Borrow a new message.
bool take(ROSMessageType &message_out, rclcpp::MessageInfo &message_info_out)
Take the next message from the inter-process subscription.
This class performs intra process communication between nodes.
RCLCPP_PUBLIC uint64_t add_subscription(rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription)
Register a subscription with the manager, returns subscriptions unique id.
Default allocation strategy for messages received by subscriptions.
Pure virtual interface class for the NodeBase part of the Node API.
Pure virtual interface class for the NodeTopics part of the Node API.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
Structure containing optional configuration for Subscriptions.
Template structure used to adapt custom, user-defined types to ROS types.