15 #ifndef RCLCPP__SUBSCRIPTION_BASE_HPP_
16 #define RCLCPP__SUBSCRIPTION_BASE_HPP_
22 #include <unordered_map>
26 #include "rcl/event_callback.h"
29 #include "rmw/impl/cpp/demangle.hpp"
32 #include "rclcpp/any_subscription_callback.hpp"
33 #include "rclcpp/detail/cpp_callback_trampoline.hpp"
34 #include "rclcpp/experimental/intra_process_manager.hpp"
35 #include "rclcpp/experimental/subscription_intra_process_base.hpp"
36 #include "rclcpp/macros.hpp"
37 #include "rclcpp/message_info.hpp"
38 #include "rclcpp/network_flow_endpoint.hpp"
39 #include "rclcpp/qos.hpp"
40 #include "rclcpp/qos_event.hpp"
41 #include "rclcpp/serialized_message.hpp"
42 #include "rclcpp/subscription_content_filter_options.hpp"
43 #include "rclcpp/type_support_decl.hpp"
44 #include "rclcpp/visibility_control.hpp"
49 namespace node_interfaces
51 class NodeBaseInterface;
54 namespace experimental
60 class IntraProcessManager;
84 const rosidl_message_type_support_t & type_support_handle,
85 const std::string & topic_name,
99 std::shared_ptr<rcl_subscription_t>
100 get_subscription_handle();
103 std::shared_ptr<const rcl_subscription_t>
104 get_subscription_handle()
const;
110 std::unordered_map<rcl_subscription_event_type_t, std::shared_ptr<rclcpp::QOSEventHandlerBase>> &
172 std::shared_ptr<void>
179 std::shared_ptr<rclcpp::SerializedMessage>
195 handle_serialized_message(
196 const std::shared_ptr<rclcpp::SerializedMessage> & serialized_message,
202 handle_loaned_message(
void * loaned_message,
const rclcpp::MessageInfo & message_info) = 0;
219 const rosidl_message_type_support_t &
220 get_message_type_support_handle()
const;
247 using IntraProcessManagerWeakPtr =
248 std::weak_ptr<rclcpp::experimental::IntraProcessManager>;
254 uint64_t intra_process_subscription_id,
255 IntraProcessManagerWeakPtr weak_ipm);
263 rclcpp::Waitable::SharedPtr
289 std::vector<rclcpp::NetworkFlowEndpoint>
320 throw std::invalid_argument(
321 "The callback passed to set_on_new_message_callback "
326 [callback,
this](
size_t number_of_messages) {
328 callback(number_of_messages);
329 }
catch (
const std::exception & exception) {
332 "rclcpp::SubscriptionBase@" <<
this <<
333 " caught " << rmw::impl::cpp::demangle(exception) <<
334 " exception in user-provided callback for the 'on new message' callback: " <<
339 "rclcpp::SubscriptionBase@" <<
this <<
340 " caught unhandled exception in user-provided callback " <<
341 "for the 'on new message' callback");
345 std::lock_guard<std::recursive_mutex> lock(callback_mutex_);
351 rclcpp::detail::cpp_callback_trampoline<const void *, size_t>,
352 static_cast<const void *
>(&new_callback));
355 on_new_message_callback_ = new_callback;
359 rclcpp::detail::cpp_callback_trampoline<const void *, size_t>,
360 static_cast<const void *
>(&on_new_message_callback_));
367 std::lock_guard<std::recursive_mutex> lock(callback_mutex_);
369 if (on_new_message_callback_) {
371 on_new_message_callback_ =
nullptr;
396 if (!use_intra_process_) {
399 "Calling set_on_new_intra_process_message_callback for subscription with IPC disabled");
404 throw std::invalid_argument(
405 "The callback passed to set_on_new_intra_process_message_callback "
412 std::function<void(
size_t,
int)> new_callback = std::bind(callback, std::placeholders::_1);
413 subscription_intra_process_->set_on_ready_callback(new_callback);
420 if (!use_intra_process_) {
423 "Calling clear_on_new_intra_process_message_callback for subscription with IPC disabled");
427 subscription_intra_process_->clear_on_ready_callback();
458 std::function<
void(
size_t)> callback,
461 if (event_handlers_.count(event_type) == 0) {
464 "Calling set_on_new_qos_event_callback for non registered subscription event_type");
469 throw std::invalid_argument(
470 "The callback passed to set_on_new_qos_event_callback "
477 std::function<void(
size_t,
int)> new_callback = std::bind(callback, std::placeholders::_1);
478 event_handlers_[event_type]->set_on_ready_callback(new_callback);
485 if (event_handlers_.count(event_type) == 0) {
488 "Calling clear_on_new_qos_event_callback for non registered event_type");
492 event_handlers_[event_type]->clear_on_ready_callback();
516 const std::string & filter_expression,
517 const std::vector<std::string> & expression_parameters = {});
530 template<
typename EventCallbackT>
533 const EventCallbackT & callback,
536 auto handler = std::make_shared<QOSEventHandler<EventCallbackT,
537 std::shared_ptr<rcl_subscription_t>>>(
540 get_subscription_handle(),
542 qos_events_in_use_by_wait_set_.insert(std::make_pair(handler.get(),
false));
543 event_handlers_.insert(std::make_pair(event_type, handler));
547 void default_incompatible_qos_callback(QOSRequestedIncompatibleQoSInfo & info)
const;
551 matches_any_intra_process_publishers(
const rmw_gid_t * sender_gid)
const;
559 std::shared_ptr<rcl_node_t> node_handle_;
560 std::shared_ptr<rcl_subscription_t> subscription_handle_;
561 std::shared_ptr<rcl_subscription_t> intra_process_subscription_handle_;
565 std::shared_ptr<rclcpp::QOSEventHandlerBase>> event_handlers_;
567 bool use_intra_process_;
568 IntraProcessManagerWeakPtr weak_ipm_;
569 uint64_t intra_process_subscription_id_;
570 std::shared_ptr<rclcpp::experimental::SubscriptionIntraProcessBase> subscription_intra_process_;
575 rosidl_message_type_support_t type_support_;
578 std::atomic<bool> subscription_in_use_by_wait_set_{
false};
579 std::atomic<bool> intra_process_subscription_waitable_in_use_by_wait_set_{
false};
581 std::atomic<bool>> qos_events_in_use_by_wait_set_;
583 std::recursive_mutex callback_mutex_;
584 std::function<void(
size_t)> on_new_message_callback_{
nullptr};
Additional meta data about messages taken from subscriptions.
Encapsulation of Quality of Service settings.
Object oriented version of rcl_serialized_message_t with destructor to avoid memory leaks.
virtual RCLCPP_PUBLIC std::shared_ptr< void > create_message()=0
Borrow a new message.
RCLCPP_PUBLIC size_t get_publisher_count() const
Get matching publisher count.
RCLCPP_PUBLIC rclcpp::QoS get_actual_qos() const
Get the actual QoS settings, after the defaults have been determined.
void set_on_new_intra_process_message_callback(std::function< void(size_t)> callback)
Set a callback to be called when each new intra-process message is received.
virtual RCLCPP_PUBLIC void handle_message(std::shared_ptr< void > &message, const rclcpp::MessageInfo &message_info)=0
Check if we need to handle the message, and execute the callback if we do.
RCLCPP_PUBLIC bool can_loan_messages() const
Check if subscription instance can loan messages.
void set_on_new_message_callback(std::function< void(size_t)> callback)
Set a callback to be called when each new message is received.
RCLCPP_PUBLIC rclcpp::Waitable::SharedPtr get_intra_process_waitable() const
Return the waitable for intra-process.
RCLCPP_PUBLIC const std::unordered_map< rcl_subscription_event_type_t, std::shared_ptr< rclcpp::QOSEventHandlerBase > > & get_event_handlers() const
Get all the QoS event handlers associated with this subscription.
void set_on_new_qos_event_callback(std::function< void(size_t)> callback, rcl_subscription_event_type_t event_type)
Set a callback to be called when each new qos event instance occurs.
RCLCPP_PUBLIC std::vector< rclcpp::NetworkFlowEndpoint > get_network_flow_endpoints() const
Get network flow endpoints.
RCLCPP_PUBLIC void setup_intra_process(uint64_t intra_process_subscription_id, IntraProcessManagerWeakPtr weak_ipm)
Implemenation detail.
virtual RCLCPP_PUBLIC void return_message(std::shared_ptr< void > &message)=0
Return the message borrowed in create_message.
void clear_on_new_message_callback()
Unset the callback registered for new messages, if any.
virtual RCLCPP_PUBLIC std::shared_ptr< rclcpp::SerializedMessage > create_serialized_message()=0
Borrow a new serialized message.
RCLCPP_PUBLIC bool take_serialized(rclcpp::SerializedMessage &message_out, rclcpp::MessageInfo &message_info_out)
Take the next inter-process message, in its serialized form, from the subscription.
RCLCPP_PUBLIC bool take_type_erased(void *message_out, rclcpp::MessageInfo &message_info_out)
Take the next inter-process message from the subscription as a type erased pointer.
RCLCPP_PUBLIC bool exchange_in_use_by_wait_set_state(void *pointer_to_subscription_part, bool in_use_state)
Exchange state of whether or not a part of the subscription is used by a wait set.
void clear_on_new_qos_event_callback(rcl_subscription_event_type_t event_type)
Unset the callback registered for new qos events, if any.
RCLCPP_PUBLIC void set_content_filter(const std::string &filter_expression, const std::vector< std::string > &expression_parameters={})
Set the filter expression and expression parameters for the subscription.
virtual RCLCPP_PUBLIC ~SubscriptionBase()
Destructor.
virtual RCLCPP_PUBLIC void return_serialized_message(std::shared_ptr< rclcpp::SerializedMessage > &message)=0
Return the message borrowed in create_serialized_message.
RCLCPP_PUBLIC bool is_cft_enabled() const
Check if content filtered topic feature of the subscription instance is enabled.
void clear_on_new_intra_process_message_callback()
Unset the callback registered for new intra-process messages, if any.
RCLCPP_PUBLIC rclcpp::ContentFilterOptions get_content_filter() const
Get the filter expression and expression parameters for the subscription.
RCLCPP_PUBLIC bool is_serialized() const
Return if the subscription is serialized.
RCLCPP_PUBLIC const char * get_topic_name() const
Get the topic that this subscription is subscribed on.
RCLCPP_PUBLIC SubscriptionBase(rclcpp::node_interfaces::NodeBaseInterface *node_base, const rosidl_message_type_support_t &type_support_handle, const std::string &topic_name, const rcl_subscription_options_t &subscription_options, bool is_serialized=false)
Constructor.
Pure virtual interface class for the NodeBase part of the Node API.
enum rcl_subscription_event_type_e rcl_subscription_event_type_t
Enumeration of all of the subscription events that may fire.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_subscription_event_init(rcl_event_t *event, const rcl_subscription_t *subscription, const rcl_subscription_event_type_t event_type)
Initialize an rcl_event_t with a subscription.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
RCLCPP_PUBLIC Logger get_logger(const std::string &name)
Return a named logger.
Options available for a rcl subscription.
Options to configure content filtered topic in the subscription.