15 #ifndef RCLCPP__SUBSCRIPTION_HPP_
16 #define RCLCPP__SUBSCRIPTION_HPP_
18 #include <rmw/error_handling.h>
29 #include "rcl/error_handling.h"
32 #include "rclcpp/any_subscription_callback.hpp"
33 #include "rclcpp/detail/resolve_use_intra_process.hpp"
34 #include "rclcpp/detail/resolve_intra_process_buffer_type.hpp"
35 #include "rclcpp/exceptions.hpp"
36 #include "rclcpp/expand_topic_or_service_name.hpp"
37 #include "rclcpp/experimental/intra_process_manager.hpp"
38 #include "rclcpp/experimental/subscription_intra_process.hpp"
39 #include "rclcpp/logging.hpp"
40 #include "rclcpp/macros.hpp"
41 #include "rclcpp/message_info.hpp"
42 #include "rclcpp/message_memory_strategy.hpp"
43 #include "rclcpp/node_interfaces/node_base_interface.hpp"
44 #include "rclcpp/subscription_base.hpp"
45 #include "rclcpp/subscription_options.hpp"
46 #include "rclcpp/subscription_traits.hpp"
47 #include "rclcpp/type_support_decl.hpp"
48 #include "rclcpp/visibility_control.hpp"
49 #include "rclcpp/waitable.hpp"
50 #include "rclcpp/topic_statistics/subscription_topic_statistics.hpp"
51 #include "tracetools/tracetools.h"
56 namespace node_interfaces
58 class NodeTopicsInterface;
64 typename AllocatorT = std::allocator<void>,
67 typename SubscribedT =
typename rclcpp::TypeAdapter<MessageT>::custom_type,
70 typename ROSMessageT =
typename rclcpp::TypeAdapter<MessageT>::ros_message_type,
81 using SubscribedType = SubscribedT;
82 using ROSMessageType = ROSMessageT;
83 using MessageMemoryStrategyType = MessageMemoryStrategyT;
85 using SubscribedTypeAllocatorTraits = allocator::AllocRebind<SubscribedType, AllocatorT>;
86 using SubscribedTypeAllocator =
typename SubscribedTypeAllocatorTraits::allocator_type;
87 using SubscribedTypeDeleter = allocator::Deleter<SubscribedTypeAllocator, SubscribedType>;
89 using ROSMessageTypeAllocatorTraits = allocator::AllocRebind<ROSMessageType, AllocatorT>;
90 using ROSMessageTypeAllocator =
typename ROSMessageTypeAllocatorTraits::allocator_type;
91 using ROSMessageTypeDeleter = allocator::Deleter<ROSMessageTypeAllocator, ROSMessageType>;
93 using MessageAllocatorTraits [[deprecated(
"use ROSMessageTypeAllocatorTraits")]] =
94 ROSMessageTypeAllocatorTraits;
95 using MessageAllocator [[deprecated(
"use ROSMessageTypeAllocator")]] =
96 ROSMessageTypeAllocator;
97 using MessageDeleter [[deprecated(
"use ROSMessageTypeDeleter")]] =
98 ROSMessageTypeDeleter;
100 using ConstMessageSharedPtr [[deprecated]] = std::shared_ptr<const ROSMessageType>;
101 using MessageUniquePtr
102 [[deprecated(
"use std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter> instead")]] =
103 std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter>;
106 using SubscriptionTopicStatisticsSharedPtr =
107 std::shared_ptr<rclcpp::topic_statistics::SubscriptionTopicStatistics>;
132 rclcpp::node_interfaces::NodeBaseInterface * node_base,
133 const rosidl_message_type_support_t & type_support_handle,
134 const std::
string & topic_name,
138 typename MessageMemoryStrategyT::SharedPtr message_memory_strategy,
139 SubscriptionTopicStatisticsSharedPtr subscription_topic_statistics =
nullptr)
144 options.to_rcl_subscription_options(qos),
146 options.event_callbacks,
147 options.use_default_callbacks,
149 any_callback_(callback),
151 message_memory_strategy_(message_memory_strategy)
155 if (rclcpp::detail::resolve_use_intra_process(options_, *node_base)) {
156 using rclcpp::detail::resolve_intra_process_buffer_type;
160 if (qos_profile.history() != rclcpp::HistoryPolicy::KeepLast) {
161 throw std::invalid_argument(
162 "intraprocess communication on topic '" + topic_name +
163 "' allowed only with keep last history qos policy");
165 if (qos_profile.depth() == 0) {
166 throw std::invalid_argument(
167 "intraprocess communication on topic '" + topic_name +
168 "' is not allowed with 0 depth qos policy");
174 SubscribedTypeAllocator,
175 SubscribedTypeDeleter,
180 auto context = node_base->get_context();
181 subscription_intra_process_ = std::make_shared<SubscriptionIntraProcessT>(
183 options_.get_allocator(),
185 this->get_topic_name(),
187 resolve_intra_process_buffer_type(options_.intra_process_buffer_type, callback));
188 TRACETOOLS_TRACEPOINT(
189 rclcpp_subscription_init,
190 static_cast<const void *
>(get_subscription_handle().get()),
191 static_cast<const void *
>(subscription_intra_process_.get()));
195 auto ipm = context->get_sub_context<IntraProcessManager>();
196 uint64_t intra_process_subscription_id = ipm->template add_subscription<
197 ROSMessageType, ROSMessageTypeAllocator>(subscription_intra_process_);
201 if (subscription_topic_statistics !=
nullptr) {
202 this->subscription_topic_statistics_ = std::move(subscription_topic_statistics);
205 TRACETOOLS_TRACEPOINT(
206 rclcpp_subscription_init,
207 static_cast<const void *
>(get_subscription_handle().get()),
208 static_cast<const void *
>(
this));
209 TRACETOOLS_TRACEPOINT(
210 rclcpp_subscription_callback_added,
211 static_cast<const void *
>(
this),
212 static_cast<const void *
>(&any_callback_));
216 #ifndef TRACETOOLS_DISABLED
217 any_callback_.register_callback_for_tracing();
254 return this->
take_type_erased(
static_cast<void *
>(&message_out), message_info_out);
264 template<
typename TakeT>
266 !rosidl_generator_traits::is_message<TakeT>::value &&
267 std::is_same_v<TakeT, SubscribedType>,
272 ROSMessageType local_message;
273 bool taken = this->
take_type_erased(
static_cast<void *
>(&local_message), message_info_out);
280 std::shared_ptr<void>
287 return message_memory_strategy_->borrow_message();
290 std::shared_ptr<rclcpp::SerializedMessage>
293 return message_memory_strategy_->borrow_serialized_message();
298 std::shared_ptr<void> & message,
306 auto typed_message = std::static_pointer_cast<ROSMessageType>(message);
308 std::chrono::time_point<std::chrono::system_clock> now;
309 if (subscription_topic_statistics_) {
312 now = std::chrono::system_clock::now();
315 any_callback_.dispatch(typed_message, message_info);
317 if (subscription_topic_statistics_) {
318 const auto nanos = std::chrono::time_point_cast<std::chrono::nanoseconds>(now);
319 const auto time =
rclcpp::Time(nanos.time_since_epoch().count());
325 handle_serialized_message(
326 const std::shared_ptr<rclcpp::SerializedMessage> & serialized_message,
329 std::chrono::time_point<std::chrono::system_clock> now;
330 if (subscription_topic_statistics_) {
333 now = std::chrono::system_clock::now();
336 any_callback_.dispatch(serialized_message, message_info);
338 if (subscription_topic_statistics_) {
339 const auto nanos = std::chrono::time_point_cast<std::chrono::nanoseconds>(now);
340 const auto time =
rclcpp::Time(nanos.time_since_epoch().count());
346 handle_loaned_message(
347 void * loaned_message,
356 auto typed_message =
static_cast<ROSMessageType *
>(loaned_message);
358 auto sptr = std::shared_ptr<ROSMessageType>(
359 typed_message, [](ROSMessageType * msg) {(void) msg;});
361 std::chrono::time_point<std::chrono::system_clock> now;
362 if (subscription_topic_statistics_) {
365 now = std::chrono::system_clock::now();
368 any_callback_.dispatch(sptr, message_info);
370 if (subscription_topic_statistics_) {
371 const auto nanos = std::chrono::time_point_cast<std::chrono::nanoseconds>(now);
372 const auto time =
rclcpp::Time(nanos.time_since_epoch().count());
384 auto typed_message = std::static_pointer_cast<ROSMessageType>(message);
385 message_memory_strategy_->return_message(typed_message);
395 message_memory_strategy_->return_serialized_message(message);
399 use_take_shared_method()
const
401 return any_callback_.use_take_shared_method();
407 rclcpp::dynamic_typesupport::DynamicMessageType::SharedPtr
408 get_shared_dynamic_message_type()
override
411 "get_shared_dynamic_message_type is not implemented for Subscription");
414 rclcpp::dynamic_typesupport::DynamicMessage::SharedPtr
415 get_shared_dynamic_message()
override
418 "get_shared_dynamic_message is not implemented for Subscription");
421 rclcpp::dynamic_typesupport::DynamicSerializationSupport::SharedPtr
422 get_shared_dynamic_serialization_support()
override
425 "get_shared_dynamic_serialization_support is not implemented for Subscription");
428 rclcpp::dynamic_typesupport::DynamicMessage::SharedPtr
432 "create_dynamic_message is not implemented for Subscription");
436 return_dynamic_message(
437 rclcpp::dynamic_typesupport::DynamicMessage::SharedPtr & message)
override
441 "return_dynamic_message is not implemented for Subscription");
445 handle_dynamic_message(
446 const rclcpp::dynamic_typesupport::DynamicMessage::SharedPtr & message,
452 "handle_dynamic_message is not implemented for Subscription");
458 AnySubscriptionCallback<MessageT, AllocatorT> any_callback_;
465 typename message_memory_strategy::MessageMemoryStrategy<ROSMessageType, AllocatorT>::SharedPtr
466 message_memory_strategy_;
469 SubscriptionTopicStatisticsSharedPtr subscription_topic_statistics_{
nullptr};
Additional meta data about messages taken from subscriptions.
const rmw_message_info_t & get_rmw_message_info() const
Return the message info as the underlying rmw message info type.
Encapsulation of Quality of Service settings.
RCLCPP_PUBLIC rclcpp::QoS get_actual_qos() const
Get the actual QoS settings, after the defaults have been determined.
RCLCPP_PUBLIC void setup_intra_process(uint64_t intra_process_subscription_id, IntraProcessManagerWeakPtr weak_ipm)
Implemenation detail.
RCLCPP_PUBLIC bool take_type_erased(void *message_out, rclcpp::MessageInfo &message_info_out)
Take the next inter-process message from the subscription as a type erased pointer.
Subscription implementation, templated on the type of message this subscription receives.
std::enable_if_t< !rosidl_generator_traits::is_message< TakeT >::value &&std::is_same_v< TakeT, SubscribedType >, bool > take(TakeT &message_out, rclcpp::MessageInfo &message_info_out)
Take the next message from the inter-process subscription.
Subscription(rclcpp::node_interfaces::NodeBaseInterface *node_base, const rosidl_message_type_support_t &type_support_handle, const std::string &topic_name, const rclcpp::QoS &qos, AnySubscriptionCallback< MessageT, AllocatorT > callback, const rclcpp::SubscriptionOptionsWithAllocator< AllocatorT > &options, typename MessageMemoryStrategyT::SharedPtr message_memory_strategy, SubscriptionTopicStatisticsSharedPtr subscription_topic_statistics=nullptr)
Default constructor.
void return_serialized_message(std::shared_ptr< rclcpp::SerializedMessage > &message) override
Return the borrowed serialized message.
void post_init_setup(rclcpp::node_interfaces::NodeBaseInterface *node_base, const rclcpp::QoS &qos, const rclcpp::SubscriptionOptionsWithAllocator< AllocatorT > &options)
Called after construction to continue setup that requires shared_from_this().
std::shared_ptr< rclcpp::SerializedMessage > create_serialized_message() override
Borrow a new serialized message.
void return_message(std::shared_ptr< void > &message) override
Return the borrowed message.
rclcpp::dynamic_typesupport::DynamicMessage::SharedPtr create_dynamic_message() override
Borrow a new serialized message (this clones!)
void handle_message(std::shared_ptr< void > &message, const rclcpp::MessageInfo &message_info) override
Check if we need to handle the message, and execute the callback if we do.
std::shared_ptr< void > create_message() override
Borrow a new message.
bool take(ROSMessageType &message_out, rclcpp::MessageInfo &message_info_out)
Take the next message from the inter-process subscription.
This class performs intra process communication between nodes.
Default allocation strategy for messages received by subscriptions.
Pure virtual interface class for the NodeBase part of the Node API.
Pure virtual interface class for the NodeTopics part of the Node API.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
DeliveredMessageKind
The kind of message that the subscription delivers in its callback, used by the executor.
Structure containing optional configuration for Subscriptions.
Template structure used to adapt custom, user-defined types to ROS types.