ROS 2 rclcpp + rcl - jazzy  jazzy
ROS 2 C++ Client Library with ROS Client Library
subscription.hpp
1 // Copyright 2014 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef RCLCPP__SUBSCRIPTION_HPP_
16 #define RCLCPP__SUBSCRIPTION_HPP_
17 
18 #include <rmw/error_handling.h>
19 #include <rmw/rmw.h>
20 
21 #include <chrono>
22 #include <functional>
23 #include <iostream>
24 #include <memory>
25 #include <sstream>
26 #include <string>
27 #include <utility>
28 
29 #include "rcl/error_handling.h"
30 #include "rcl/subscription.h"
31 
32 #include "rclcpp/any_subscription_callback.hpp"
33 #include "rclcpp/detail/resolve_use_intra_process.hpp"
34 #include "rclcpp/detail/resolve_intra_process_buffer_type.hpp"
35 #include "rclcpp/exceptions.hpp"
36 #include "rclcpp/expand_topic_or_service_name.hpp"
37 #include "rclcpp/experimental/intra_process_manager.hpp"
38 #include "rclcpp/experimental/subscription_intra_process.hpp"
39 #include "rclcpp/logging.hpp"
40 #include "rclcpp/macros.hpp"
41 #include "rclcpp/message_info.hpp"
42 #include "rclcpp/message_memory_strategy.hpp"
43 #include "rclcpp/node_interfaces/node_base_interface.hpp"
44 #include "rclcpp/subscription_base.hpp"
45 #include "rclcpp/subscription_options.hpp"
46 #include "rclcpp/subscription_traits.hpp"
47 #include "rclcpp/type_support_decl.hpp"
48 #include "rclcpp/visibility_control.hpp"
49 #include "rclcpp/waitable.hpp"
50 #include "rclcpp/topic_statistics/subscription_topic_statistics.hpp"
51 #include "tracetools/tracetools.h"
52 
53 namespace rclcpp
54 {
55 
56 namespace node_interfaces
57 {
58 class NodeTopicsInterface;
59 } // namespace node_interfaces
60 
62 template<
63  typename MessageT,
64  typename AllocatorT = std::allocator<void>,
67  typename SubscribedT = typename rclcpp::TypeAdapter<MessageT>::custom_type,
70  typename ROSMessageT = typename rclcpp::TypeAdapter<MessageT>::ros_message_type,
71  typename MessageMemoryStrategyT = rclcpp::message_memory_strategy::MessageMemoryStrategy<
72  ROSMessageT,
73  AllocatorT
74  >>
76 {
78 
79 public:
80  // Redeclare these here to use outside of the class.
81  using SubscribedType = SubscribedT;
82  using ROSMessageType = ROSMessageT;
83  using MessageMemoryStrategyType = MessageMemoryStrategyT;
84 
85  using SubscribedTypeAllocatorTraits = allocator::AllocRebind<SubscribedType, AllocatorT>;
86  using SubscribedTypeAllocator = typename SubscribedTypeAllocatorTraits::allocator_type;
87  using SubscribedTypeDeleter = allocator::Deleter<SubscribedTypeAllocator, SubscribedType>;
88 
89  using ROSMessageTypeAllocatorTraits = allocator::AllocRebind<ROSMessageType, AllocatorT>;
90  using ROSMessageTypeAllocator = typename ROSMessageTypeAllocatorTraits::allocator_type;
91  using ROSMessageTypeDeleter = allocator::Deleter<ROSMessageTypeAllocator, ROSMessageType>;
92 
93  using MessageAllocatorTraits [[deprecated("use ROSMessageTypeAllocatorTraits")]] =
94  ROSMessageTypeAllocatorTraits;
95  using MessageAllocator [[deprecated("use ROSMessageTypeAllocator")]] =
96  ROSMessageTypeAllocator;
97  using MessageDeleter [[deprecated("use ROSMessageTypeDeleter")]] =
98  ROSMessageTypeDeleter;
99 
100  using ConstMessageSharedPtr [[deprecated]] = std::shared_ptr<const ROSMessageType>;
101  using MessageUniquePtr
102  [[deprecated("use std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter> instead")]] =
103  std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter>;
104 
105 private:
106  using SubscriptionTopicStatisticsSharedPtr =
107  std::shared_ptr<rclcpp::topic_statistics::SubscriptionTopicStatistics>;
108 
109 public:
110  RCLCPP_SMART_PTR_DEFINITIONS(Subscription)
111 
112 
130  // *INDENT-OFF*
132  rclcpp::node_interfaces::NodeBaseInterface * node_base,
133  const rosidl_message_type_support_t & type_support_handle,
134  const std::string & topic_name,
135  const rclcpp::QoS & qos,
136  AnySubscriptionCallback<MessageT, AllocatorT> callback,
137  const rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> & options,
138  typename MessageMemoryStrategyT::SharedPtr message_memory_strategy,
139  SubscriptionTopicStatisticsSharedPtr subscription_topic_statistics = nullptr)
141  node_base,
142  type_support_handle,
143  topic_name,
144  options.to_rcl_subscription_options(qos),
145  // NOTE(methylDragon): Passing these args separately is necessary for event binding
146  options.event_callbacks,
147  options.use_default_callbacks,
148  callback.is_serialized_message_callback() ? DeliveredMessageKind::SERIALIZED_MESSAGE : DeliveredMessageKind::ROS_MESSAGE), // NOLINT
149  any_callback_(callback),
150  options_(options),
151  message_memory_strategy_(message_memory_strategy)
152  // *INDENT-ON*
153  {
154  // Setup intra process publishing if requested.
155  if (rclcpp::detail::resolve_use_intra_process(options_, *node_base)) {
156  using rclcpp::detail::resolve_intra_process_buffer_type;
157 
158  // Check if the QoS is compatible with intra-process.
159  auto qos_profile = get_actual_qos();
160  if (qos_profile.history() != rclcpp::HistoryPolicy::KeepLast) {
161  throw std::invalid_argument(
162  "intraprocess communication on topic '" + topic_name +
163  "' allowed only with keep last history qos policy");
164  }
165  if (qos_profile.depth() == 0) {
166  throw std::invalid_argument(
167  "intraprocess communication on topic '" + topic_name +
168  "' is not allowed with 0 depth qos policy");
169  }
170 
171  using SubscriptionIntraProcessT = rclcpp::experimental::SubscriptionIntraProcess<
172  MessageT,
173  SubscribedType,
174  SubscribedTypeAllocator,
175  SubscribedTypeDeleter,
176  ROSMessageT,
177  AllocatorT>;
178 
179  // First create a SubscriptionIntraProcess which will be given to the intra-process manager.
180  auto context = node_base->get_context();
181  subscription_intra_process_ = std::make_shared<SubscriptionIntraProcessT>(
182  callback,
183  options_.get_allocator(),
184  context,
185  this->get_topic_name(), // important to get like this, as it has the fully-qualified name
186  qos_profile,
187  resolve_intra_process_buffer_type(options_.intra_process_buffer_type, callback));
188  TRACETOOLS_TRACEPOINT(
189  rclcpp_subscription_init,
190  static_cast<const void *>(get_subscription_handle().get()),
191  static_cast<const void *>(subscription_intra_process_.get()));
192 
193  // Add it to the intra process manager.
195  auto ipm = context->get_sub_context<IntraProcessManager>();
196  uint64_t intra_process_subscription_id = ipm->template add_subscription<
197  ROSMessageType, ROSMessageTypeAllocator>(subscription_intra_process_);
198  this->setup_intra_process(intra_process_subscription_id, ipm);
199  }
200 
201  if (subscription_topic_statistics != nullptr) {
202  this->subscription_topic_statistics_ = std::move(subscription_topic_statistics);
203  }
204 
205  TRACETOOLS_TRACEPOINT(
206  rclcpp_subscription_init,
207  static_cast<const void *>(get_subscription_handle().get()),
208  static_cast<const void *>(this));
209  TRACETOOLS_TRACEPOINT(
210  rclcpp_subscription_callback_added,
211  static_cast<const void *>(this),
212  static_cast<const void *>(&any_callback_));
213  // The callback object gets copied, so if registration is done too early/before this point
214  // (e.g. in `AnySubscriptionCallback::set()`), its address won't match any address used later
215  // in subsequent tracepoints.
216 #ifndef TRACETOOLS_DISABLED
217  any_callback_.register_callback_for_tracing();
218 #endif
219  }
220 
222  void
225  const rclcpp::QoS & qos,
227  {
228  (void)node_base;
229  (void)qos;
230  (void)options;
231  }
232 
234 
251  bool
252  take(ROSMessageType & message_out, rclcpp::MessageInfo & message_info_out)
253  {
254  return this->take_type_erased(static_cast<void *>(&message_out), message_info_out);
255  }
256 
258 
264  template<typename TakeT>
265  std::enable_if_t<
266  !rosidl_generator_traits::is_message<TakeT>::value &&
267  std::is_same_v<TakeT, SubscribedType>,
268  bool
269  >
270  take(TakeT & message_out, rclcpp::MessageInfo & message_info_out)
271  {
272  ROSMessageType local_message;
273  bool taken = this->take_type_erased(static_cast<void *>(&local_message), message_info_out);
274  if (taken) {
275  rclcpp::TypeAdapter<MessageT>::convert_to_custom(local_message, message_out);
276  }
277  return taken;
278  }
279 
280  std::shared_ptr<void>
281  create_message() override
282  {
283  /* The default message memory strategy provides a dynamically allocated message on each call to
284  * create_message, though alternative memory strategies that re-use a preallocated message may be
285  * used (see rclcpp/strategies/message_pool_memory_strategy.hpp).
286  */
287  return message_memory_strategy_->borrow_message();
288  }
289 
290  std::shared_ptr<rclcpp::SerializedMessage>
292  {
293  return message_memory_strategy_->borrow_serialized_message();
294  }
295 
296  void
298  std::shared_ptr<void> & message,
299  const rclcpp::MessageInfo & message_info) override
300  {
301  if (matches_any_intra_process_publishers(&message_info.get_rmw_message_info().publisher_gid)) {
302  // In this case, the message will be delivered via intra process and
303  // we should ignore this copy of the message.
304  return;
305  }
306  auto typed_message = std::static_pointer_cast<ROSMessageType>(message);
307 
308  std::chrono::time_point<std::chrono::system_clock> now;
309  if (subscription_topic_statistics_) {
310  // get current time before executing callback to
311  // exclude callback duration from topic statistics result.
312  now = std::chrono::system_clock::now();
313  }
314 
315  any_callback_.dispatch(typed_message, message_info);
316 
317  if (subscription_topic_statistics_) {
318  const auto nanos = std::chrono::time_point_cast<std::chrono::nanoseconds>(now);
319  const auto time = rclcpp::Time(nanos.time_since_epoch().count());
320  subscription_topic_statistics_->handle_message(message_info.get_rmw_message_info(), time);
321  }
322  }
323 
324  void
325  handle_serialized_message(
326  const std::shared_ptr<rclcpp::SerializedMessage> & serialized_message,
327  const rclcpp::MessageInfo & message_info) override
328  {
329  std::chrono::time_point<std::chrono::system_clock> now;
330  if (subscription_topic_statistics_) {
331  // get current time before executing callback to
332  // exclude callback duration from topic statistics result.
333  now = std::chrono::system_clock::now();
334  }
335 
336  any_callback_.dispatch(serialized_message, message_info);
337 
338  if (subscription_topic_statistics_) {
339  const auto nanos = std::chrono::time_point_cast<std::chrono::nanoseconds>(now);
340  const auto time = rclcpp::Time(nanos.time_since_epoch().count());
341  subscription_topic_statistics_->handle_message(message_info.get_rmw_message_info(), time);
342  }
343  }
344 
345  void
346  handle_loaned_message(
347  void * loaned_message,
348  const rclcpp::MessageInfo & message_info) override
349  {
350  if (matches_any_intra_process_publishers(&message_info.get_rmw_message_info().publisher_gid)) {
351  // In this case, the message will be delivered via intra process and
352  // we should ignore this copy of the message.
353  return;
354  }
355 
356  auto typed_message = static_cast<ROSMessageType *>(loaned_message);
357  // message is loaned, so we have to make sure that the deleter does not deallocate the message
358  auto sptr = std::shared_ptr<ROSMessageType>(
359  typed_message, [](ROSMessageType * msg) {(void) msg;});
360 
361  std::chrono::time_point<std::chrono::system_clock> now;
362  if (subscription_topic_statistics_) {
363  // get current time before executing callback to
364  // exclude callback duration from topic statistics result.
365  now = std::chrono::system_clock::now();
366  }
367 
368  any_callback_.dispatch(sptr, message_info);
369 
370  if (subscription_topic_statistics_) {
371  const auto nanos = std::chrono::time_point_cast<std::chrono::nanoseconds>(now);
372  const auto time = rclcpp::Time(nanos.time_since_epoch().count());
373  subscription_topic_statistics_->handle_message(message_info.get_rmw_message_info(), time);
374  }
375  }
376 
378 
381  void
382  return_message(std::shared_ptr<void> & message) override
383  {
384  auto typed_message = std::static_pointer_cast<ROSMessageType>(message);
385  message_memory_strategy_->return_message(typed_message);
386  }
387 
389 
392  void
393  return_serialized_message(std::shared_ptr<rclcpp::SerializedMessage> & message) override
394  {
395  message_memory_strategy_->return_serialized_message(message);
396  }
397 
398  bool
399  use_take_shared_method() const
400  {
401  return any_callback_.use_take_shared_method();
402  }
403 
404  // DYNAMIC TYPE ==================================================================================
405  // TODO(methylDragon): Reorder later
406  // TODO(methylDragon): Implement later...
407  rclcpp::dynamic_typesupport::DynamicMessageType::SharedPtr
408  get_shared_dynamic_message_type() override
409  {
411  "get_shared_dynamic_message_type is not implemented for Subscription");
412  }
413 
414  rclcpp::dynamic_typesupport::DynamicMessage::SharedPtr
415  get_shared_dynamic_message() override
416  {
418  "get_shared_dynamic_message is not implemented for Subscription");
419  }
420 
421  rclcpp::dynamic_typesupport::DynamicSerializationSupport::SharedPtr
422  get_shared_dynamic_serialization_support() override
423  {
425  "get_shared_dynamic_serialization_support is not implemented for Subscription");
426  }
427 
428  rclcpp::dynamic_typesupport::DynamicMessage::SharedPtr
430  {
432  "create_dynamic_message is not implemented for Subscription");
433  }
434 
435  void
436  return_dynamic_message(
437  rclcpp::dynamic_typesupport::DynamicMessage::SharedPtr & message) override
438  {
439  (void) message;
441  "return_dynamic_message is not implemented for Subscription");
442  }
443 
444  void
445  handle_dynamic_message(
446  const rclcpp::dynamic_typesupport::DynamicMessage::SharedPtr & message,
447  const rclcpp::MessageInfo & message_info) override
448  {
449  (void) message;
450  (void) message_info;
452  "handle_dynamic_message is not implemented for Subscription");
453  }
454 
455 private:
456  RCLCPP_DISABLE_COPY(Subscription)
457 
458  AnySubscriptionCallback<MessageT, AllocatorT> any_callback_;
460 
465  typename message_memory_strategy::MessageMemoryStrategy<ROSMessageType, AllocatorT>::SharedPtr
466  message_memory_strategy_;
467 
469  SubscriptionTopicStatisticsSharedPtr subscription_topic_statistics_{nullptr};
470 };
471 
472 } // namespace rclcpp
473 
474 #endif // RCLCPP__SUBSCRIPTION_HPP_
Additional meta data about messages taken from subscriptions.
const rmw_message_info_t & get_rmw_message_info() const
Return the message info as the underlying rmw message info type.
Encapsulation of Quality of Service settings.
Definition: qos.hpp:116
RCLCPP_PUBLIC rclcpp::QoS get_actual_qos() const
Get the actual QoS settings, after the defaults have been determined.
RCLCPP_PUBLIC void setup_intra_process(uint64_t intra_process_subscription_id, IntraProcessManagerWeakPtr weak_ipm)
Implemenation detail.
RCLCPP_PUBLIC bool take_type_erased(void *message_out, rclcpp::MessageInfo &message_info_out)
Take the next inter-process message from the subscription as a type erased pointer.
Subscription implementation, templated on the type of message this subscription receives.
std::enable_if_t< !rosidl_generator_traits::is_message< TakeT >::value &&std::is_same_v< TakeT, SubscribedType >, bool > take(TakeT &message_out, rclcpp::MessageInfo &message_info_out)
Take the next message from the inter-process subscription.
Subscription(rclcpp::node_interfaces::NodeBaseInterface *node_base, const rosidl_message_type_support_t &type_support_handle, const std::string &topic_name, const rclcpp::QoS &qos, AnySubscriptionCallback< MessageT, AllocatorT > callback, const rclcpp::SubscriptionOptionsWithAllocator< AllocatorT > &options, typename MessageMemoryStrategyT::SharedPtr message_memory_strategy, SubscriptionTopicStatisticsSharedPtr subscription_topic_statistics=nullptr)
Default constructor.
void return_serialized_message(std::shared_ptr< rclcpp::SerializedMessage > &message) override
Return the borrowed serialized message.
void post_init_setup(rclcpp::node_interfaces::NodeBaseInterface *node_base, const rclcpp::QoS &qos, const rclcpp::SubscriptionOptionsWithAllocator< AllocatorT > &options)
Called after construction to continue setup that requires shared_from_this().
std::shared_ptr< rclcpp::SerializedMessage > create_serialized_message() override
Borrow a new serialized message.
void return_message(std::shared_ptr< void > &message) override
Return the borrowed message.
rclcpp::dynamic_typesupport::DynamicMessage::SharedPtr create_dynamic_message() override
Borrow a new serialized message (this clones!)
void handle_message(std::shared_ptr< void > &message, const rclcpp::MessageInfo &message_info) override
Check if we need to handle the message, and execute the callback if we do.
std::shared_ptr< void > create_message() override
Borrow a new message.
bool take(ROSMessageType &message_out, rclcpp::MessageInfo &message_info_out)
Take the next message from the inter-process subscription.
This class performs intra process communication between nodes.
Default allocation strategy for messages received by subscriptions.
Pure virtual interface class for the NodeBase part of the Node API.
Pure virtual interface class for the NodeTopics part of the Node API.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
DeliveredMessageKind
The kind of message that the subscription delivers in its callback, used by the executor.
Structure containing optional configuration for Subscriptions.
Template structure used to adapt custom, user-defined types to ROS types.