15 #ifndef RCLCPP__SUBSCRIPTION_HPP_
16 #define RCLCPP__SUBSCRIPTION_HPP_
18 #include <rmw/error_handling.h>
29 #include "rcl/error_handling.h"
32 #include "rclcpp/any_subscription_callback.hpp"
33 #include "rclcpp/detail/resolve_use_intra_process.hpp"
34 #include "rclcpp/detail/resolve_intra_process_buffer_type.hpp"
35 #include "rclcpp/exceptions.hpp"
36 #include "rclcpp/expand_topic_or_service_name.hpp"
37 #include "rclcpp/experimental/intra_process_manager.hpp"
38 #include "rclcpp/experimental/subscription_intra_process.hpp"
39 #include "rclcpp/logging.hpp"
40 #include "rclcpp/macros.hpp"
41 #include "rclcpp/message_info.hpp"
42 #include "rclcpp/message_memory_strategy.hpp"
43 #include "rclcpp/node_interfaces/node_base_interface.hpp"
44 #include "rclcpp/subscription_base.hpp"
45 #include "rclcpp/subscription_options.hpp"
46 #include "rclcpp/subscription_traits.hpp"
47 #include "rclcpp/type_support_decl.hpp"
48 #include "rclcpp/visibility_control.hpp"
49 #include "rclcpp/waitable.hpp"
50 #include "rclcpp/topic_statistics/subscription_topic_statistics.hpp"
51 #include "tracetools/tracetools.h"
56 namespace node_interfaces
58 class NodeTopicsInterface;
64 typename AllocatorT = std::allocator<void>,
67 typename SubscribedT =
typename rclcpp::TypeAdapter<MessageT>::custom_type,
70 typename ROSMessageT =
typename rclcpp::TypeAdapter<MessageT>::ros_message_type,
81 using SubscribedType = SubscribedT;
82 using ROSMessageType = ROSMessageT;
83 using MessageMemoryStrategyType = MessageMemoryStrategyT;
85 using SubscribedTypeAllocatorTraits = allocator::AllocRebind<SubscribedType, AllocatorT>;
86 using SubscribedTypeAllocator =
typename SubscribedTypeAllocatorTraits::allocator_type;
87 using SubscribedTypeDeleter = allocator::Deleter<SubscribedTypeAllocator, SubscribedType>;
89 using ROSMessageTypeAllocatorTraits = allocator::AllocRebind<ROSMessageType, AllocatorT>;
90 using ROSMessageTypeAllocator =
typename ROSMessageTypeAllocatorTraits::allocator_type;
91 using ROSMessageTypeDeleter = allocator::Deleter<ROSMessageTypeAllocator, ROSMessageType>;
94 using SubscriptionTopicStatisticsSharedPtr =
95 std::shared_ptr<rclcpp::topic_statistics::SubscriptionTopicStatistics>;
120 rclcpp::node_interfaces::NodeBaseInterface * node_base,
121 const rosidl_message_type_support_t & type_support_handle,
122 const std::
string & topic_name,
126 typename MessageMemoryStrategyT::SharedPtr message_memory_strategy,
127 SubscriptionTopicStatisticsSharedPtr subscription_topic_statistics =
nullptr)
132 options.to_rcl_subscription_options(qos),
134 options.event_callbacks,
135 options.use_default_callbacks,
137 any_callback_(callback),
139 message_memory_strategy_(message_memory_strategy)
143 if (rclcpp::detail::resolve_use_intra_process(options_, *node_base)) {
144 using rclcpp::detail::resolve_intra_process_buffer_type;
148 if (qos_profile.history() != rclcpp::HistoryPolicy::KeepLast) {
149 throw std::invalid_argument(
150 "intraprocess communication on topic '" + topic_name +
151 "' allowed only with keep last history qos policy");
153 if (qos_profile.depth() == 0) {
154 throw std::invalid_argument(
155 "intraprocess communication on topic '" + topic_name +
156 "' is not allowed with 0 depth qos policy");
162 SubscribedTypeAllocator,
163 SubscribedTypeDeleter,
168 auto context = node_base->get_context();
169 subscription_intra_process_ = std::make_shared<SubscriptionIntraProcessT>(
171 options_.get_allocator(),
173 this->get_topic_name(),
175 resolve_intra_process_buffer_type(options_.intra_process_buffer_type, callback));
176 TRACETOOLS_TRACEPOINT(
177 rclcpp_subscription_init,
178 static_cast<const void *
>(get_subscription_handle().get()),
179 static_cast<const void *
>(subscription_intra_process_.get()));
183 auto ipm = context->get_sub_context<IntraProcessManager>();
184 uint64_t intra_process_subscription_id = ipm->template add_subscription<
185 ROSMessageType, ROSMessageTypeAllocator>(subscription_intra_process_);
189 if (subscription_topic_statistics !=
nullptr) {
190 this->subscription_topic_statistics_ = std::move(subscription_topic_statistics);
193 TRACETOOLS_TRACEPOINT(
194 rclcpp_subscription_init,
195 static_cast<const void *
>(get_subscription_handle().get()),
196 static_cast<const void *
>(
this));
197 TRACETOOLS_TRACEPOINT(
198 rclcpp_subscription_callback_added,
199 static_cast<const void *
>(
this),
200 static_cast<const void *
>(&any_callback_));
204 #ifndef TRACETOOLS_DISABLED
205 any_callback_.register_callback_for_tracing();
240 return this->
take_type_erased(
static_cast<void *
>(&message_out), message_info_out);
250 template<
typename TakeT>
252 !rosidl_generator_traits::is_message<TakeT>::value &&
253 std::is_same_v<TakeT, SubscribedType>,
258 ROSMessageType local_message;
259 bool taken = this->
take_type_erased(
static_cast<void *
>(&local_message), message_info_out);
266 std::shared_ptr<void>
273 return message_memory_strategy_->borrow_message();
276 std::shared_ptr<rclcpp::SerializedMessage>
279 return message_memory_strategy_->borrow_serialized_message();
284 std::shared_ptr<void> & message,
292 auto typed_message = std::static_pointer_cast<ROSMessageType>(message);
294 std::chrono::time_point<std::chrono::system_clock> now;
295 if (subscription_topic_statistics_) {
298 now = std::chrono::system_clock::now();
301 any_callback_.dispatch(typed_message, message_info);
303 if (subscription_topic_statistics_) {
304 const auto nanos = std::chrono::time_point_cast<std::chrono::nanoseconds>(now);
305 const auto time =
rclcpp::Time(nanos.time_since_epoch().count());
311 handle_serialized_message(
312 const std::shared_ptr<rclcpp::SerializedMessage> & serialized_message,
315 std::chrono::time_point<std::chrono::system_clock> now;
316 if (subscription_topic_statistics_) {
319 now = std::chrono::system_clock::now();
322 any_callback_.dispatch(serialized_message, message_info);
324 if (subscription_topic_statistics_) {
325 const auto nanos = std::chrono::time_point_cast<std::chrono::nanoseconds>(now);
326 const auto time =
rclcpp::Time(nanos.time_since_epoch().count());
332 handle_loaned_message(
333 void * loaned_message,
342 auto typed_message =
static_cast<ROSMessageType *
>(loaned_message);
344 auto sptr = std::shared_ptr<ROSMessageType>(
345 typed_message, [](ROSMessageType * msg) {(void) msg;});
347 std::chrono::time_point<std::chrono::system_clock> now;
348 if (subscription_topic_statistics_) {
351 now = std::chrono::system_clock::now();
354 any_callback_.dispatch(sptr, message_info);
356 if (subscription_topic_statistics_) {
357 const auto nanos = std::chrono::time_point_cast<std::chrono::nanoseconds>(now);
358 const auto time =
rclcpp::Time(nanos.time_since_epoch().count());
370 auto typed_message = std::static_pointer_cast<ROSMessageType>(message);
371 message_memory_strategy_->return_message(typed_message);
381 message_memory_strategy_->return_serialized_message(message);
385 use_take_shared_method()
const
387 return any_callback_.use_take_shared_method();
393 rclcpp::dynamic_typesupport::DynamicMessageType::SharedPtr
394 get_shared_dynamic_message_type()
override
397 "get_shared_dynamic_message_type is not implemented for Subscription");
400 rclcpp::dynamic_typesupport::DynamicMessage::SharedPtr
401 get_shared_dynamic_message()
override
404 "get_shared_dynamic_message is not implemented for Subscription");
407 rclcpp::dynamic_typesupport::DynamicSerializationSupport::SharedPtr
408 get_shared_dynamic_serialization_support()
override
411 "get_shared_dynamic_serialization_support is not implemented for Subscription");
414 rclcpp::dynamic_typesupport::DynamicMessage::SharedPtr
418 "create_dynamic_message is not implemented for Subscription");
422 return_dynamic_message(
423 [[maybe_unused]] rclcpp::dynamic_typesupport::DynamicMessage::SharedPtr & message)
override
426 "return_dynamic_message is not implemented for Subscription");
430 handle_dynamic_message(
431 [[maybe_unused]]
const rclcpp::dynamic_typesupport::DynamicMessage::SharedPtr & message,
435 "handle_dynamic_message is not implemented for Subscription");
441 AnySubscriptionCallback<MessageT, AllocatorT> any_callback_;
448 typename message_memory_strategy::MessageMemoryStrategy<ROSMessageType, AllocatorT>::SharedPtr
449 message_memory_strategy_;
452 SubscriptionTopicStatisticsSharedPtr subscription_topic_statistics_{
nullptr};
Additional meta data about messages taken from subscriptions.
const rmw_message_info_t & get_rmw_message_info() const
Return the message info as the underlying rmw message info type.
Encapsulation of Quality of Service settings.
RCLCPP_PUBLIC rclcpp::QoS get_actual_qos() const
Get the actual QoS settings, after the defaults have been determined.
RCLCPP_PUBLIC void setup_intra_process(uint64_t intra_process_subscription_id, IntraProcessManagerWeakPtr weak_ipm)
Implemenation detail.
RCLCPP_PUBLIC bool take_type_erased(void *message_out, rclcpp::MessageInfo &message_info_out)
Take the next inter-process message from the subscription as a type erased pointer.
Subscription implementation, templated on the type of message this subscription receives.
std::enable_if_t< !rosidl_generator_traits::is_message< TakeT >::value &&std::is_same_v< TakeT, SubscribedType >, bool > take(TakeT &message_out, rclcpp::MessageInfo &message_info_out)
Take the next message from the inter-process subscription.
Subscription(rclcpp::node_interfaces::NodeBaseInterface *node_base, const rosidl_message_type_support_t &type_support_handle, const std::string &topic_name, const rclcpp::QoS &qos, AnySubscriptionCallback< MessageT, AllocatorT > callback, const rclcpp::SubscriptionOptionsWithAllocator< AllocatorT > &options, typename MessageMemoryStrategyT::SharedPtr message_memory_strategy, SubscriptionTopicStatisticsSharedPtr subscription_topic_statistics=nullptr)
Default constructor.
void return_serialized_message(std::shared_ptr< rclcpp::SerializedMessage > &message) override
Return the borrowed serialized message.
std::shared_ptr< rclcpp::SerializedMessage > create_serialized_message() override
Borrow a new serialized message.
void return_message(std::shared_ptr< void > &message) override
Return the borrowed message.
rclcpp::dynamic_typesupport::DynamicMessage::SharedPtr create_dynamic_message() override
Borrow a new serialized message (this clones!)
void post_init_setup([[maybe_unused]] rclcpp::node_interfaces::NodeBaseInterface *node_base, [[maybe_unused]] const rclcpp::QoS &qos, [[maybe_unused]] const rclcpp::SubscriptionOptionsWithAllocator< AllocatorT > &options)
Called after construction to continue setup that requires shared_from_this().
void handle_message(std::shared_ptr< void > &message, const rclcpp::MessageInfo &message_info) override
Check if we need to handle the message, and execute the callback if we do.
std::shared_ptr< void > create_message() override
Borrow a new message.
bool take(ROSMessageType &message_out, rclcpp::MessageInfo &message_info_out)
Take the next message from the inter-process subscription.
This class performs intra process communication between nodes.
Default allocation strategy for messages received by subscriptions.
Pure virtual interface class for the NodeBase part of the Node API.
Pure virtual interface class for the NodeTopics part of the Node API.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
DeliveredMessageKind
The kind of message that the subscription delivers in its callback, used by the executor.
Structure containing optional configuration for Subscriptions.
Template structure used to adapt custom, user-defined types to ROS types.