15 #ifndef RCLCPP__SUBSCRIPTION_HPP_
16 #define RCLCPP__SUBSCRIPTION_HPP_
18 #include <rmw/error_handling.h>
29 #include "rcl/error_handling.h"
32 #include "rclcpp/any_subscription_callback.hpp"
33 #include "rclcpp/detail/resolve_use_intra_process.hpp"
34 #include "rclcpp/detail/resolve_intra_process_buffer_type.hpp"
35 #include "rclcpp/exceptions.hpp"
36 #include "rclcpp/expand_topic_or_service_name.hpp"
37 #include "rclcpp/experimental/intra_process_manager.hpp"
38 #include "rclcpp/experimental/subscription_intra_process.hpp"
39 #include "rclcpp/logging.hpp"
40 #include "rclcpp/macros.hpp"
41 #include "rclcpp/message_info.hpp"
42 #include "rclcpp/message_memory_strategy.hpp"
43 #include "rclcpp/node_interfaces/node_base_interface.hpp"
44 #include "rclcpp/subscription_base.hpp"
45 #include "rclcpp/subscription_options.hpp"
46 #include "rclcpp/subscription_traits.hpp"
47 #include "rclcpp/type_support_decl.hpp"
48 #include "rclcpp/visibility_control.hpp"
49 #include "rclcpp/waitable.hpp"
50 #include "rclcpp/topic_statistics/subscription_topic_statistics.hpp"
51 #include "tracetools/tracetools.h"
56 namespace node_interfaces
58 class NodeTopicsInterface;
64 typename AllocatorT = std::allocator<void>,
67 typename SubscribedT =
typename rclcpp::TypeAdapter<MessageT>::custom_type,
70 typename ROSMessageT =
typename rclcpp::TypeAdapter<MessageT>::ros_message_type,
81 using SubscribedType = SubscribedT;
82 using ROSMessageType = ROSMessageT;
83 using MessageMemoryStrategyType = MessageMemoryStrategyT;
85 using SubscribedTypeAllocatorTraits = allocator::AllocRebind<SubscribedType, AllocatorT>;
86 using SubscribedTypeAllocator =
typename SubscribedTypeAllocatorTraits::allocator_type;
87 using SubscribedTypeDeleter = allocator::Deleter<SubscribedTypeAllocator, SubscribedType>;
89 using ROSMessageTypeAllocatorTraits = allocator::AllocRebind<ROSMessageType, AllocatorT>;
90 using ROSMessageTypeAllocator =
typename ROSMessageTypeAllocatorTraits::allocator_type;
91 using ROSMessageTypeDeleter = allocator::Deleter<ROSMessageTypeAllocator, ROSMessageType>;
93 using MessageAllocatorTraits [[deprecated(
"use ROSMessageTypeAllocatorTraits")]] =
94 ROSMessageTypeAllocatorTraits;
95 using MessageAllocator [[deprecated(
"use ROSMessageTypeAllocator")]] =
96 ROSMessageTypeAllocator;
97 using MessageDeleter [[deprecated(
"use ROSMessageTypeDeleter")]] =
98 ROSMessageTypeDeleter;
100 using ConstMessageSharedPtr [[deprecated]] = std::shared_ptr<const ROSMessageType>;
101 using MessageUniquePtr
102 [[deprecated(
"use std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter> instead")]] =
103 std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter>;
106 using SubscriptionTopicStatisticsSharedPtr =
107 std::shared_ptr<rclcpp::topic_statistics::SubscriptionTopicStatistics<ROSMessageType>>;
131 rclcpp::node_interfaces::NodeBaseInterface * node_base,
132 const rosidl_message_type_support_t & type_support_handle,
133 const std::
string & topic_name,
137 typename MessageMemoryStrategyT::SharedPtr message_memory_strategy,
138 SubscriptionTopicStatisticsSharedPtr subscription_topic_statistics =
nullptr)
143 options.template to_rcl_subscription_options<ROSMessageType>(qos),
144 callback.is_serialized_message_callback()),
145 any_callback_(callback),
147 message_memory_strategy_(message_memory_strategy)
149 if (options_.event_callbacks.deadline_callback) {
150 this->add_event_handler(
151 options_.event_callbacks.deadline_callback,
152 RCL_SUBSCRIPTION_REQUESTED_DEADLINE_MISSED);
154 if (options_.event_callbacks.liveliness_callback) {
155 this->add_event_handler(
156 options_.event_callbacks.liveliness_callback,
157 RCL_SUBSCRIPTION_LIVELINESS_CHANGED);
159 if (options_.event_callbacks.incompatible_qos_callback) {
160 this->add_event_handler(
161 options_.event_callbacks.incompatible_qos_callback,
162 RCL_SUBSCRIPTION_REQUESTED_INCOMPATIBLE_QOS);
163 }
else if (options_.use_default_callbacks) {
166 this->add_event_handler(
167 [
this](QOSRequestedIncompatibleQoSInfo & info) {
168 this->default_incompatible_qos_callback(info);
170 RCL_SUBSCRIPTION_REQUESTED_INCOMPATIBLE_QOS);
175 if (options_.event_callbacks.message_lost_callback) {
176 this->add_event_handler(
177 options_.event_callbacks.message_lost_callback,
178 RCL_SUBSCRIPTION_MESSAGE_LOST);
182 if (rclcpp::detail::resolve_use_intra_process(options_, *node_base)) {
183 using rclcpp::detail::resolve_intra_process_buffer_type;
187 if (qos_profile.history() != rclcpp::HistoryPolicy::KeepLast) {
188 throw std::invalid_argument(
189 "intraprocess communication on topic '" + topic_name +
190 "' allowed only with keep last history qos policy");
192 if (qos_profile.depth() == 0) {
193 throw std::invalid_argument(
194 "intraprocess communication on topic '" + topic_name +
195 "' is not allowed with 0 depth qos policy");
197 if (qos_profile.durability() != rclcpp::DurabilityPolicy::Volatile) {
198 throw std::invalid_argument(
199 "intraprocess communication allowed only with volatile durability");
205 SubscribedTypeAllocator,
206 SubscribedTypeDeleter,
211 auto context = node_base->get_context();
212 subscription_intra_process_ = std::make_shared<SubscriptionIntraProcessT>(
214 options_.get_allocator(),
216 this->get_topic_name(),
218 resolve_intra_process_buffer_type(options_.intra_process_buffer_type, callback));
220 rclcpp_subscription_init,
221 static_cast<const void *
>(get_subscription_handle().get()),
222 static_cast<const void *
>(subscription_intra_process_.get()));
226 auto ipm = context->get_sub_context<IntraProcessManager>();
227 uint64_t intra_process_subscription_id = ipm->
add_subscription(subscription_intra_process_);
231 if (subscription_topic_statistics !=
nullptr) {
232 this->subscription_topic_statistics_ = std::move(subscription_topic_statistics);
236 rclcpp_subscription_init,
237 static_cast<const void *
>(get_subscription_handle().get()),
238 static_cast<const void *
>(
this));
240 rclcpp_subscription_callback_added,
241 static_cast<const void *
>(
this),
242 static_cast<const void *
>(&any_callback_));
246 #ifndef TRACETOOLS_DISABLED
247 any_callback_.register_callback_for_tracing();
284 return this->
take_type_erased(
static_cast<void *
>(&message_out), message_info_out);
294 template<
typename TakeT>
296 !rosidl_generator_traits::is_message<TakeT>::value &&
297 std::is_same_v<TakeT, SubscribedType>,
302 ROSMessageType local_message;
303 bool taken = this->
take_type_erased(
static_cast<void *
>(&local_message), message_info_out);
310 std::shared_ptr<void>
317 return message_memory_strategy_->borrow_message();
320 std::shared_ptr<rclcpp::SerializedMessage>
323 return message_memory_strategy_->borrow_serialized_message();
328 std::shared_ptr<void> & message,
336 auto typed_message = std::static_pointer_cast<ROSMessageType>(message);
338 std::chrono::time_point<std::chrono::system_clock> now;
339 if (subscription_topic_statistics_) {
342 now = std::chrono::system_clock::now();
345 any_callback_.dispatch(typed_message, message_info);
347 if (subscription_topic_statistics_) {
348 const auto nanos = std::chrono::time_point_cast<std::chrono::nanoseconds>(now);
349 const auto time =
rclcpp::Time(nanos.time_since_epoch().count());
350 subscription_topic_statistics_->handle_message(*typed_message, time);
355 handle_serialized_message(
356 const std::shared_ptr<rclcpp::SerializedMessage> & serialized_message,
360 any_callback_.dispatch(serialized_message, message_info);
364 handle_loaned_message(
365 void * loaned_message,
374 auto typed_message =
static_cast<ROSMessageType *
>(loaned_message);
376 auto sptr = std::shared_ptr<ROSMessageType>(
377 typed_message, [](ROSMessageType * msg) {(void) msg;});
379 std::chrono::time_point<std::chrono::system_clock> now;
380 if (subscription_topic_statistics_) {
383 now = std::chrono::system_clock::now();
386 any_callback_.dispatch(sptr, message_info);
388 if (subscription_topic_statistics_) {
389 const auto nanos = std::chrono::time_point_cast<std::chrono::nanoseconds>(now);
390 const auto time =
rclcpp::Time(nanos.time_since_epoch().count());
391 subscription_topic_statistics_->handle_message(*typed_message, time);
402 auto typed_message = std::static_pointer_cast<ROSMessageType>(message);
403 message_memory_strategy_->return_message(typed_message);
413 message_memory_strategy_->return_serialized_message(message);
417 use_take_shared_method()
const
419 return any_callback_.use_take_shared_method();
425 AnySubscriptionCallback<MessageT, AllocatorT> any_callback_;
432 typename message_memory_strategy::MessageMemoryStrategy<ROSMessageType, AllocatorT>::SharedPtr
433 message_memory_strategy_;
436 SubscriptionTopicStatisticsSharedPtr subscription_topic_statistics_{
nullptr};
Additional meta data about messages taken from subscriptions.
const rmw_message_info_t & get_rmw_message_info() const
Return the message info as the underlying rmw message info type.
Encapsulation of Quality of Service settings.
RCLCPP_PUBLIC rclcpp::QoS get_actual_qos() const
Get the actual QoS settings, after the defaults have been determined.
RCLCPP_PUBLIC void setup_intra_process(uint64_t intra_process_subscription_id, IntraProcessManagerWeakPtr weak_ipm)
Implemenation detail.
RCLCPP_PUBLIC bool take_type_erased(void *message_out, rclcpp::MessageInfo &message_info_out)
Take the next inter-process message from the subscription as a type erased pointer.
Subscription implementation, templated on the type of message this subscription receives.
std::enable_if_t< !rosidl_generator_traits::is_message< TakeT >::value &&std::is_same_v< TakeT, SubscribedType >, bool > take(TakeT &message_out, rclcpp::MessageInfo &message_info_out)
Take the next message from the inter-process subscription.
Subscription(rclcpp::node_interfaces::NodeBaseInterface *node_base, const rosidl_message_type_support_t &type_support_handle, const std::string &topic_name, const rclcpp::QoS &qos, AnySubscriptionCallback< MessageT, AllocatorT > callback, const rclcpp::SubscriptionOptionsWithAllocator< AllocatorT > &options, typename MessageMemoryStrategyT::SharedPtr message_memory_strategy, SubscriptionTopicStatisticsSharedPtr subscription_topic_statistics=nullptr)
Default constructor.
void return_serialized_message(std::shared_ptr< rclcpp::SerializedMessage > &message) override
Return the borrowed serialized message.
void post_init_setup(rclcpp::node_interfaces::NodeBaseInterface *node_base, const rclcpp::QoS &qos, const rclcpp::SubscriptionOptionsWithAllocator< AllocatorT > &options)
Called after construction to continue setup that requires shared_from_this().
std::shared_ptr< rclcpp::SerializedMessage > create_serialized_message() override
Borrow a new serialized message.
void return_message(std::shared_ptr< void > &message) override
Return the borrowed message.
void handle_message(std::shared_ptr< void > &message, const rclcpp::MessageInfo &message_info) override
Check if we need to handle the message, and execute the callback if we do.
std::shared_ptr< void > create_message() override
Borrow a new message.
bool take(ROSMessageType &message_out, rclcpp::MessageInfo &message_info_out)
Take the next message from the inter-process subscription.
This class performs intra process communication between nodes.
RCLCPP_PUBLIC uint64_t add_subscription(rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription)
Register a subscription with the manager, returns subscriptions unique id.
Default allocation strategy for messages received by subscriptions.
Pure virtual interface class for the NodeBase part of the Node API.
Pure virtual interface class for the NodeTopics part of the Node API.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
Structure containing optional configuration for Subscriptions.
Template structure used to adapt custom, user-defined types to ROS types.