ROS 2 rclcpp + rcl - kilted  kilted
ROS 2 C++ Client Library with ROS Client Library
subscription.hpp
1 // Copyright 2014 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef RCLCPP__SUBSCRIPTION_HPP_
16 #define RCLCPP__SUBSCRIPTION_HPP_
17 
18 #include <rmw/error_handling.h>
19 #include <rmw/rmw.h>
20 
21 #include <chrono>
22 #include <functional>
23 #include <iostream>
24 #include <memory>
25 #include <sstream>
26 #include <string>
27 #include <utility>
28 
29 #include "rcl/error_handling.h"
30 #include "rcl/subscription.h"
31 
32 #include "rclcpp/any_subscription_callback.hpp"
33 #include "rclcpp/detail/resolve_use_intra_process.hpp"
34 #include "rclcpp/detail/resolve_intra_process_buffer_type.hpp"
35 #include "rclcpp/exceptions.hpp"
36 #include "rclcpp/expand_topic_or_service_name.hpp"
37 #include "rclcpp/experimental/intra_process_manager.hpp"
38 #include "rclcpp/experimental/subscription_intra_process.hpp"
39 #include "rclcpp/logging.hpp"
40 #include "rclcpp/macros.hpp"
41 #include "rclcpp/message_info.hpp"
42 #include "rclcpp/message_memory_strategy.hpp"
43 #include "rclcpp/node_interfaces/node_base_interface.hpp"
44 #include "rclcpp/subscription_base.hpp"
45 #include "rclcpp/subscription_options.hpp"
46 #include "rclcpp/subscription_traits.hpp"
47 #include "rclcpp/type_support_decl.hpp"
48 #include "rclcpp/visibility_control.hpp"
49 #include "rclcpp/waitable.hpp"
50 #include "rclcpp/topic_statistics/subscription_topic_statistics.hpp"
51 #include "tracetools/tracetools.h"
52 
53 namespace rclcpp
54 {
55 
56 namespace node_interfaces
57 {
58 class NodeTopicsInterface;
59 } // namespace node_interfaces
60 
62 template<
63  typename MessageT,
64  typename AllocatorT = std::allocator<void>,
67  typename SubscribedT = typename rclcpp::TypeAdapter<MessageT>::custom_type,
70  typename ROSMessageT = typename rclcpp::TypeAdapter<MessageT>::ros_message_type,
71  typename MessageMemoryStrategyT = rclcpp::message_memory_strategy::MessageMemoryStrategy<
72  ROSMessageT,
73  AllocatorT
74  >>
76 {
78 
79 public:
80  // Redeclare these here to use outside of the class.
81  using SubscribedType = SubscribedT;
82  using ROSMessageType = ROSMessageT;
83  using MessageMemoryStrategyType = MessageMemoryStrategyT;
84 
85  using SubscribedTypeAllocatorTraits = allocator::AllocRebind<SubscribedType, AllocatorT>;
86  using SubscribedTypeAllocator = typename SubscribedTypeAllocatorTraits::allocator_type;
87  using SubscribedTypeDeleter = allocator::Deleter<SubscribedTypeAllocator, SubscribedType>;
88 
89  using ROSMessageTypeAllocatorTraits = allocator::AllocRebind<ROSMessageType, AllocatorT>;
90  using ROSMessageTypeAllocator = typename ROSMessageTypeAllocatorTraits::allocator_type;
91  using ROSMessageTypeDeleter = allocator::Deleter<ROSMessageTypeAllocator, ROSMessageType>;
92 
93 private:
94  using SubscriptionTopicStatisticsSharedPtr =
95  std::shared_ptr<rclcpp::topic_statistics::SubscriptionTopicStatistics>;
96 
97 public:
98  RCLCPP_SMART_PTR_DEFINITIONS(Subscription)
99 
100 
118  // *INDENT-OFF*
120  rclcpp::node_interfaces::NodeBaseInterface * node_base,
121  const rosidl_message_type_support_t & type_support_handle,
122  const std::string & topic_name,
123  const rclcpp::QoS & qos,
124  AnySubscriptionCallback<MessageT, AllocatorT> callback,
125  const rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> & options,
126  typename MessageMemoryStrategyT::SharedPtr message_memory_strategy,
127  SubscriptionTopicStatisticsSharedPtr subscription_topic_statistics = nullptr)
129  node_base,
130  type_support_handle,
131  topic_name,
132  options.to_rcl_subscription_options(qos),
133  // NOTE(methylDragon): Passing these args separately is necessary for event binding
134  options.event_callbacks,
135  options.use_default_callbacks,
136  callback.is_serialized_message_callback() ? DeliveredMessageKind::SERIALIZED_MESSAGE : DeliveredMessageKind::ROS_MESSAGE), // NOLINT
137  any_callback_(callback),
138  options_(options),
139  message_memory_strategy_(message_memory_strategy)
140  // *INDENT-ON*
141  {
142  // Setup intra process publishing if requested.
143  if (rclcpp::detail::resolve_use_intra_process(options_, *node_base)) {
144  using rclcpp::detail::resolve_intra_process_buffer_type;
145 
146  // Check if the QoS is compatible with intra-process.
147  auto qos_profile = get_actual_qos();
148  if (qos_profile.history() != rclcpp::HistoryPolicy::KeepLast) {
149  throw std::invalid_argument(
150  "intraprocess communication on topic '" + topic_name +
151  "' allowed only with keep last history qos policy");
152  }
153  if (qos_profile.depth() == 0) {
154  throw std::invalid_argument(
155  "intraprocess communication on topic '" + topic_name +
156  "' is not allowed with 0 depth qos policy");
157  }
158 
159  using SubscriptionIntraProcessT = rclcpp::experimental::SubscriptionIntraProcess<
160  MessageT,
161  SubscribedType,
162  SubscribedTypeAllocator,
163  SubscribedTypeDeleter,
164  ROSMessageT,
165  AllocatorT>;
166 
167  // First create a SubscriptionIntraProcess which will be given to the intra-process manager.
168  auto context = node_base->get_context();
169  subscription_intra_process_ = std::make_shared<SubscriptionIntraProcessT>(
170  callback,
171  options_.get_allocator(),
172  context,
173  this->get_topic_name(), // important to get like this, as it has the fully-qualified name
174  qos_profile,
175  resolve_intra_process_buffer_type(options_.intra_process_buffer_type, callback));
176  TRACETOOLS_TRACEPOINT(
177  rclcpp_subscription_init,
178  static_cast<const void *>(get_subscription_handle().get()),
179  static_cast<const void *>(subscription_intra_process_.get()));
180 
181  // Add it to the intra process manager.
183  auto ipm = context->get_sub_context<IntraProcessManager>();
184  uint64_t intra_process_subscription_id = ipm->template add_subscription<
185  ROSMessageType, ROSMessageTypeAllocator>(subscription_intra_process_);
186  this->setup_intra_process(intra_process_subscription_id, ipm);
187  }
188 
189  if (subscription_topic_statistics != nullptr) {
190  this->subscription_topic_statistics_ = std::move(subscription_topic_statistics);
191  }
192 
193  TRACETOOLS_TRACEPOINT(
194  rclcpp_subscription_init,
195  static_cast<const void *>(get_subscription_handle().get()),
196  static_cast<const void *>(this));
197  TRACETOOLS_TRACEPOINT(
198  rclcpp_subscription_callback_added,
199  static_cast<const void *>(this),
200  static_cast<const void *>(&any_callback_));
201  // The callback object gets copied, so if registration is done too early/before this point
202  // (e.g. in `AnySubscriptionCallback::set()`), its address won't match any address used later
203  // in subsequent tracepoints.
204 #ifndef TRACETOOLS_DISABLED
205  any_callback_.register_callback_for_tracing();
206 #endif
207  }
208 
210  void
212  [[maybe_unused]] rclcpp::node_interfaces::NodeBaseInterface * node_base,
213  [[maybe_unused]] const rclcpp::QoS & qos,
214  [[maybe_unused]] const rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> & options)
215  {
216  // This function is intentionally left empty.
217  }
218 
220 
237  bool
238  take(ROSMessageType & message_out, rclcpp::MessageInfo & message_info_out)
239  {
240  return this->take_type_erased(static_cast<void *>(&message_out), message_info_out);
241  }
242 
244 
250  template<typename TakeT>
251  std::enable_if_t<
252  !rosidl_generator_traits::is_message<TakeT>::value &&
253  std::is_same_v<TakeT, SubscribedType>,
254  bool
255  >
256  take(TakeT & message_out, rclcpp::MessageInfo & message_info_out)
257  {
258  ROSMessageType local_message;
259  bool taken = this->take_type_erased(static_cast<void *>(&local_message), message_info_out);
260  if (taken) {
261  rclcpp::TypeAdapter<MessageT>::convert_to_custom(local_message, message_out);
262  }
263  return taken;
264  }
265 
266  std::shared_ptr<void>
267  create_message() override
268  {
269  /* The default message memory strategy provides a dynamically allocated message on each call to
270  * create_message, though alternative memory strategies that re-use a preallocated message may be
271  * used (see rclcpp/strategies/message_pool_memory_strategy.hpp).
272  */
273  return message_memory_strategy_->borrow_message();
274  }
275 
276  std::shared_ptr<rclcpp::SerializedMessage>
278  {
279  return message_memory_strategy_->borrow_serialized_message();
280  }
281 
282  void
284  std::shared_ptr<void> & message,
285  const rclcpp::MessageInfo & message_info) override
286  {
287  if (matches_any_intra_process_publishers(&message_info.get_rmw_message_info().publisher_gid)) {
288  // In this case, the message will be delivered via intra process and
289  // we should ignore this copy of the message.
290  return;
291  }
292  auto typed_message = std::static_pointer_cast<ROSMessageType>(message);
293 
294  std::chrono::time_point<std::chrono::system_clock> now;
295  if (subscription_topic_statistics_) {
296  // get current time before executing callback to
297  // exclude callback duration from topic statistics result.
298  now = std::chrono::system_clock::now();
299  }
300 
301  any_callback_.dispatch(typed_message, message_info);
302 
303  if (subscription_topic_statistics_) {
304  const auto nanos = std::chrono::time_point_cast<std::chrono::nanoseconds>(now);
305  const auto time = rclcpp::Time(nanos.time_since_epoch().count());
306  subscription_topic_statistics_->handle_message(message_info.get_rmw_message_info(), time);
307  }
308  }
309 
310  void
311  handle_serialized_message(
312  const std::shared_ptr<rclcpp::SerializedMessage> & serialized_message,
313  const rclcpp::MessageInfo & message_info) override
314  {
315  std::chrono::time_point<std::chrono::system_clock> now;
316  if (subscription_topic_statistics_) {
317  // get current time before executing callback to
318  // exclude callback duration from topic statistics result.
319  now = std::chrono::system_clock::now();
320  }
321 
322  any_callback_.dispatch(serialized_message, message_info);
323 
324  if (subscription_topic_statistics_) {
325  const auto nanos = std::chrono::time_point_cast<std::chrono::nanoseconds>(now);
326  const auto time = rclcpp::Time(nanos.time_since_epoch().count());
327  subscription_topic_statistics_->handle_message(message_info.get_rmw_message_info(), time);
328  }
329  }
330 
331  void
332  handle_loaned_message(
333  void * loaned_message,
334  const rclcpp::MessageInfo & message_info) override
335  {
336  if (matches_any_intra_process_publishers(&message_info.get_rmw_message_info().publisher_gid)) {
337  // In this case, the message will be delivered via intra process and
338  // we should ignore this copy of the message.
339  return;
340  }
341 
342  auto typed_message = static_cast<ROSMessageType *>(loaned_message);
343  // message is loaned, so we have to make sure that the deleter does not deallocate the message
344  auto sptr = std::shared_ptr<ROSMessageType>(
345  typed_message, [](ROSMessageType * msg) {(void) msg;});
346 
347  std::chrono::time_point<std::chrono::system_clock> now;
348  if (subscription_topic_statistics_) {
349  // get current time before executing callback to
350  // exclude callback duration from topic statistics result.
351  now = std::chrono::system_clock::now();
352  }
353 
354  any_callback_.dispatch(sptr, message_info);
355 
356  if (subscription_topic_statistics_) {
357  const auto nanos = std::chrono::time_point_cast<std::chrono::nanoseconds>(now);
358  const auto time = rclcpp::Time(nanos.time_since_epoch().count());
359  subscription_topic_statistics_->handle_message(message_info.get_rmw_message_info(), time);
360  }
361  }
362 
364 
367  void
368  return_message(std::shared_ptr<void> & message) override
369  {
370  auto typed_message = std::static_pointer_cast<ROSMessageType>(message);
371  message_memory_strategy_->return_message(typed_message);
372  }
373 
375 
378  void
379  return_serialized_message(std::shared_ptr<rclcpp::SerializedMessage> & message) override
380  {
381  message_memory_strategy_->return_serialized_message(message);
382  }
383 
384  bool
385  use_take_shared_method() const
386  {
387  return any_callback_.use_take_shared_method();
388  }
389 
390  // DYNAMIC TYPE ==================================================================================
391  // TODO(methylDragon): Reorder later
392  // TODO(methylDragon): Implement later...
393  rclcpp::dynamic_typesupport::DynamicMessageType::SharedPtr
394  get_shared_dynamic_message_type() override
395  {
397  "get_shared_dynamic_message_type is not implemented for Subscription");
398  }
399 
400  rclcpp::dynamic_typesupport::DynamicMessage::SharedPtr
401  get_shared_dynamic_message() override
402  {
404  "get_shared_dynamic_message is not implemented for Subscription");
405  }
406 
407  rclcpp::dynamic_typesupport::DynamicSerializationSupport::SharedPtr
408  get_shared_dynamic_serialization_support() override
409  {
411  "get_shared_dynamic_serialization_support is not implemented for Subscription");
412  }
413 
414  rclcpp::dynamic_typesupport::DynamicMessage::SharedPtr
416  {
418  "create_dynamic_message is not implemented for Subscription");
419  }
420 
421  void
422  return_dynamic_message(
423  [[maybe_unused]] rclcpp::dynamic_typesupport::DynamicMessage::SharedPtr & message) override
424  {
426  "return_dynamic_message is not implemented for Subscription");
427  }
428 
429  void
430  handle_dynamic_message(
431  [[maybe_unused]] const rclcpp::dynamic_typesupport::DynamicMessage::SharedPtr & message,
432  [[maybe_unused]] const rclcpp::MessageInfo & message_info) override
433  {
435  "handle_dynamic_message is not implemented for Subscription");
436  }
437 
438 private:
439  RCLCPP_DISABLE_COPY(Subscription)
440 
441  AnySubscriptionCallback<MessageT, AllocatorT> any_callback_;
443 
448  typename message_memory_strategy::MessageMemoryStrategy<ROSMessageType, AllocatorT>::SharedPtr
449  message_memory_strategy_;
450 
452  SubscriptionTopicStatisticsSharedPtr subscription_topic_statistics_{nullptr};
453 };
454 
455 } // namespace rclcpp
456 
457 #endif // RCLCPP__SUBSCRIPTION_HPP_
Additional meta data about messages taken from subscriptions.
const rmw_message_info_t & get_rmw_message_info() const
Return the message info as the underlying rmw message info type.
Encapsulation of Quality of Service settings.
Definition: qos.hpp:116
RCLCPP_PUBLIC rclcpp::QoS get_actual_qos() const
Get the actual QoS settings, after the defaults have been determined.
RCLCPP_PUBLIC void setup_intra_process(uint64_t intra_process_subscription_id, IntraProcessManagerWeakPtr weak_ipm)
Implemenation detail.
RCLCPP_PUBLIC bool take_type_erased(void *message_out, rclcpp::MessageInfo &message_info_out)
Take the next inter-process message from the subscription as a type erased pointer.
Subscription implementation, templated on the type of message this subscription receives.
std::enable_if_t< !rosidl_generator_traits::is_message< TakeT >::value &&std::is_same_v< TakeT, SubscribedType >, bool > take(TakeT &message_out, rclcpp::MessageInfo &message_info_out)
Take the next message from the inter-process subscription.
Subscription(rclcpp::node_interfaces::NodeBaseInterface *node_base, const rosidl_message_type_support_t &type_support_handle, const std::string &topic_name, const rclcpp::QoS &qos, AnySubscriptionCallback< MessageT, AllocatorT > callback, const rclcpp::SubscriptionOptionsWithAllocator< AllocatorT > &options, typename MessageMemoryStrategyT::SharedPtr message_memory_strategy, SubscriptionTopicStatisticsSharedPtr subscription_topic_statistics=nullptr)
Default constructor.
void return_serialized_message(std::shared_ptr< rclcpp::SerializedMessage > &message) override
Return the borrowed serialized message.
std::shared_ptr< rclcpp::SerializedMessage > create_serialized_message() override
Borrow a new serialized message.
void return_message(std::shared_ptr< void > &message) override
Return the borrowed message.
rclcpp::dynamic_typesupport::DynamicMessage::SharedPtr create_dynamic_message() override
Borrow a new serialized message (this clones!)
void post_init_setup([[maybe_unused]] rclcpp::node_interfaces::NodeBaseInterface *node_base, [[maybe_unused]] const rclcpp::QoS &qos, [[maybe_unused]] const rclcpp::SubscriptionOptionsWithAllocator< AllocatorT > &options)
Called after construction to continue setup that requires shared_from_this().
void handle_message(std::shared_ptr< void > &message, const rclcpp::MessageInfo &message_info) override
Check if we need to handle the message, and execute the callback if we do.
std::shared_ptr< void > create_message() override
Borrow a new message.
bool take(ROSMessageType &message_out, rclcpp::MessageInfo &message_info_out)
Take the next message from the inter-process subscription.
This class performs intra process communication between nodes.
Default allocation strategy for messages received by subscriptions.
Pure virtual interface class for the NodeBase part of the Node API.
Pure virtual interface class for the NodeTopics part of the Node API.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
DeliveredMessageKind
The kind of message that the subscription delivers in its callback, used by the executor.
Structure containing optional configuration for Subscriptions.
Template structure used to adapt custom, user-defined types to ROS types.