15 #ifndef RCLCPP__SUBSCRIPTION_BASE_HPP_
16 #define RCLCPP__SUBSCRIPTION_BASE_HPP_
22 #include <unordered_map>
26 #include "rcl/event_callback.h"
29 #include "rmw/impl/cpp/demangle.hpp"
32 #include "rclcpp/any_subscription_callback.hpp"
33 #include "rclcpp/detail/cpp_callback_trampoline.hpp"
34 #include "rclcpp/dynamic_typesupport/dynamic_message.hpp"
35 #include "rclcpp/dynamic_typesupport/dynamic_message_type.hpp"
36 #include "rclcpp/dynamic_typesupport/dynamic_serialization_support.hpp"
37 #include "rclcpp/experimental/intra_process_manager.hpp"
38 #include "rclcpp/experimental/subscription_intra_process_base.hpp"
39 #include "rclcpp/macros.hpp"
40 #include "rclcpp/message_info.hpp"
41 #include "rclcpp/network_flow_endpoint.hpp"
42 #include "rclcpp/qos.hpp"
43 #include "rclcpp/event_handler.hpp"
44 #include "rclcpp/serialized_message.hpp"
45 #include "rclcpp/subscription_content_filter_options.hpp"
46 #include "rclcpp/type_support_decl.hpp"
47 #include "rclcpp/visibility_control.hpp"
52 namespace node_interfaces
54 class NodeBaseInterface;
57 namespace experimental
63 class IntraProcessManager;
83 SERIALIZED_MESSAGE = 2,
109 const rosidl_message_type_support_t & type_support_handle,
110 const std::string & topic_name,
113 bool use_default_callbacks,
132 std::shared_ptr<rcl_subscription_t>
133 get_subscription_handle();
136 std::shared_ptr<const rcl_subscription_t>
137 get_subscription_handle()
const;
143 std::unordered_map<rcl_subscription_event_type_t, std::shared_ptr<rclcpp::EventHandlerBase>> &
205 std::shared_ptr<void>
212 std::shared_ptr<rclcpp::SerializedMessage>
228 handle_serialized_message(
229 const std::shared_ptr<rclcpp::SerializedMessage> & serialized_message,
235 handle_loaned_message(
void * loaned_message,
const rclcpp::MessageInfo & message_info) = 0;
252 const rosidl_message_type_support_t &
253 get_message_type_support_handle()
const;
288 using IntraProcessManagerWeakPtr =
289 std::weak_ptr<rclcpp::experimental::IntraProcessManager>;
295 uint64_t intra_process_subscription_id,
296 IntraProcessManagerWeakPtr weak_ipm);
304 rclcpp::Waitable::SharedPtr
330 std::vector<rclcpp::NetworkFlowEndpoint>
361 throw std::invalid_argument(
362 "The callback passed to set_on_new_message_callback "
367 [callback,
this](
size_t number_of_messages) {
369 callback(number_of_messages);
370 }
catch (
const std::exception & exception) {
373 "rclcpp::SubscriptionBase@" <<
this <<
374 " caught " << rmw::impl::cpp::demangle(exception) <<
375 " exception in user-provided callback for the 'on new message' callback: " <<
380 "rclcpp::SubscriptionBase@" <<
this <<
381 " caught unhandled exception in user-provided callback " <<
382 "for the 'on new message' callback");
386 std::lock_guard<std::recursive_mutex> lock(callback_mutex_);
392 rclcpp::detail::cpp_callback_trampoline<decltype(new_callback),
const void *,
size_t>,
393 static_cast<const void *
>(&new_callback));
396 on_new_message_callback_ = new_callback;
400 rclcpp::detail::cpp_callback_trampoline<
401 decltype(on_new_message_callback_),
const void *,
size_t>,
402 static_cast<const void *
>(&on_new_message_callback_));
409 std::lock_guard<std::recursive_mutex> lock(callback_mutex_);
411 if (on_new_message_callback_) {
413 on_new_message_callback_ =
nullptr;
438 if (!use_intra_process_) {
441 "Calling set_on_new_intra_process_message_callback for subscription with IPC disabled");
446 throw std::invalid_argument(
447 "The callback passed to set_on_new_intra_process_message_callback "
454 std::function<void(
size_t,
int)> new_callback = std::bind(callback, std::placeholders::_1);
455 subscription_intra_process_->set_on_ready_callback(new_callback);
462 if (!use_intra_process_) {
465 "Calling clear_on_new_intra_process_message_callback for subscription with IPC disabled");
469 subscription_intra_process_->clear_on_ready_callback();
500 std::function<
void(
size_t)> callback,
503 if (event_handlers_.count(event_type) == 0) {
506 "Calling set_on_new_qos_event_callback for non registered subscription event_type");
511 throw std::invalid_argument(
512 "The callback passed to set_on_new_qos_event_callback "
519 std::function<void(
size_t,
int)> new_callback = std::bind(callback, std::placeholders::_1);
520 event_handlers_[event_type]->set_on_ready_callback(new_callback);
527 if (event_handlers_.count(event_type) == 0) {
530 "Calling clear_on_new_qos_event_callback for non registered event_type");
534 event_handlers_[event_type]->clear_on_ready_callback();
558 const std::string & filter_expression,
559 const std::vector<std::string> & expression_parameters = {});
575 rclcpp::dynamic_typesupport::DynamicMessageType::SharedPtr
576 get_shared_dynamic_message_type() = 0;
580 rclcpp::dynamic_typesupport::DynamicMessage::SharedPtr
581 get_shared_dynamic_message() = 0;
585 rclcpp::dynamic_typesupport::DynamicSerializationSupport::SharedPtr
586 get_shared_dynamic_serialization_support() = 0;
592 rclcpp::dynamic_typesupport::DynamicMessage::SharedPtr
598 return_dynamic_message(rclcpp::dynamic_typesupport::DynamicMessage::SharedPtr & message) = 0;
603 handle_dynamic_message(
604 const rclcpp::dynamic_typesupport::DynamicMessage::SharedPtr & message,
609 take_dynamic_message(
615 template<
typename EventCallbackT>
618 const EventCallbackT & callback,
621 auto handler = std::make_shared<
EventHandler<EventCallbackT,
622 std::shared_ptr<rcl_subscription_t>>>(
625 get_subscription_handle(),
627 qos_events_in_use_by_wait_set_.insert(std::make_pair(handler.get(),
false));
628 event_handlers_.insert(std::make_pair(event_type, handler));
632 void default_incompatible_qos_callback(QOSRequestedIncompatibleQoSInfo & info)
const;
635 void default_incompatible_type_callback(IncompatibleTypeInfo & info)
const;
639 matches_any_intra_process_publishers(
const rmw_gid_t * sender_gid)
const;
647 std::shared_ptr<rcl_node_t> node_handle_;
649 std::recursive_mutex callback_mutex_;
654 std::function<void(
size_t)> on_new_message_callback_{
nullptr};
656 std::shared_ptr<rcl_subscription_t> subscription_handle_;
657 std::shared_ptr<rcl_subscription_t> intra_process_subscription_handle_;
661 std::shared_ptr<rclcpp::EventHandlerBase>> event_handlers_;
663 bool use_intra_process_;
664 IntraProcessManagerWeakPtr weak_ipm_;
665 uint64_t intra_process_subscription_id_;
666 std::shared_ptr<rclcpp::experimental::SubscriptionIntraProcessBase> subscription_intra_process_;
668 const SubscriptionEventCallbacks event_callbacks_;
673 rosidl_message_type_support_t type_support_;
676 std::atomic<bool> subscription_in_use_by_wait_set_{
false};
677 std::atomic<bool> intra_process_subscription_waitable_in_use_by_wait_set_{
false};
679 std::atomic<bool>> qos_events_in_use_by_wait_set_;
Additional meta data about messages taken from subscriptions.
Encapsulation of Quality of Service settings.
Object oriented version of rcl_serialized_message_t with destructor to avoid memory leaks.
virtual RCLCPP_PUBLIC std::shared_ptr< void > create_message()=0
Borrow a new message.
RCLCPP_PUBLIC size_t get_publisher_count() const
Get matching publisher count.
virtual RCLCPP_PUBLIC rclcpp::dynamic_typesupport::DynamicMessage::SharedPtr create_dynamic_message()=0
Borrow a new serialized message (this clones!)
RCLCPP_PUBLIC rclcpp::QoS get_actual_qos() const
Get the actual QoS settings, after the defaults have been determined.
void set_on_new_intra_process_message_callback(std::function< void(size_t)> callback)
Set a callback to be called when each new intra-process message is received.
virtual RCLCPP_PUBLIC void handle_message(std::shared_ptr< void > &message, const rclcpp::MessageInfo &message_info)=0
Check if we need to handle the message, and execute the callback if we do.
RCLCPP_PUBLIC bool can_loan_messages() const
Check if subscription instance can loan messages.
void set_on_new_message_callback(std::function< void(size_t)> callback)
Set a callback to be called when each new message is received.
RCLCPP_PUBLIC rclcpp::Waitable::SharedPtr get_intra_process_waitable() const
Return the waitable for intra-process.
void set_on_new_qos_event_callback(std::function< void(size_t)> callback, rcl_subscription_event_type_t event_type)
Set a callback to be called when each new qos event instance occurs.
RCLCPP_PUBLIC std::vector< rclcpp::NetworkFlowEndpoint > get_network_flow_endpoints() const
Get network flow endpoints.
RCLCPP_PUBLIC void setup_intra_process(uint64_t intra_process_subscription_id, IntraProcessManagerWeakPtr weak_ipm)
Implemenation detail.
RCLCPP_PUBLIC DeliveredMessageKind get_delivered_message_kind() const
Return the delivered message kind.
virtual RCLCPP_PUBLIC void return_message(std::shared_ptr< void > &message)=0
Return the message borrowed in create_message.
void clear_on_new_message_callback()
Unset the callback registered for new messages, if any.
virtual RCLCPP_PUBLIC std::shared_ptr< rclcpp::SerializedMessage > create_serialized_message()=0
Borrow a new serialized message.
RCLCPP_PUBLIC bool take_serialized(rclcpp::SerializedMessage &message_out, rclcpp::MessageInfo &message_info_out)
Take the next inter-process message, in its serialized form, from the subscription.
RCLCPP_PUBLIC bool take_type_erased(void *message_out, rclcpp::MessageInfo &message_info_out)
Take the next inter-process message from the subscription as a type erased pointer.
RCLCPP_PUBLIC bool exchange_in_use_by_wait_set_state(void *pointer_to_subscription_part, bool in_use_state)
Exchange state of whether or not a part of the subscription is used by a wait set.
void clear_on_new_qos_event_callback(rcl_subscription_event_type_t event_type)
Unset the callback registered for new qos events, if any.
RCLCPP_PUBLIC void set_content_filter(const std::string &filter_expression, const std::vector< std::string > &expression_parameters={})
Set the filter expression and expression parameters for the subscription.
virtual RCLCPP_PUBLIC ~SubscriptionBase()
Destructor.
RCLCPP_PUBLIC SubscriptionBase(rclcpp::node_interfaces::NodeBaseInterface *node_base, const rosidl_message_type_support_t &type_support_handle, const std::string &topic_name, const rcl_subscription_options_t &subscription_options, const SubscriptionEventCallbacks &event_callbacks, bool use_default_callbacks, DeliveredMessageKind delivered_message_kind=DeliveredMessageKind::ROS_MESSAGE)
Constructor.
virtual RCLCPP_PUBLIC void return_serialized_message(std::shared_ptr< rclcpp::SerializedMessage > &message)=0
Return the message borrowed in create_serialized_message.
RCLCPP_PUBLIC bool is_cft_enabled() const
Check if content filtered topic feature of the subscription instance is enabled.
void clear_on_new_intra_process_message_callback()
Unset the callback registered for new intra-process messages, if any.
RCLCPP_PUBLIC void bind_event_callbacks(const SubscriptionEventCallbacks &event_callbacks, bool use_default_callbacks)
Add event handlers for passed in event_callbacks.
RCLCPP_PUBLIC const std::unordered_map< rcl_subscription_event_type_t, std::shared_ptr< rclcpp::EventHandlerBase > > & get_event_handlers() const
Get all the QoS event handlers associated with this subscription.
RCLCPP_PUBLIC rclcpp::ContentFilterOptions get_content_filter() const
Get the filter expression and expression parameters for the subscription.
RCLCPP_PUBLIC bool is_serialized() const
Return if the subscription is serialized.
RCLCPP_PUBLIC const char * get_topic_name() const
Get the topic that this subscription is subscribed on.
Pure virtual interface class for the NodeBase part of the Node API.
enum rcl_subscription_event_type_e rcl_subscription_event_type_t
Enumeration of all of the subscription events that may fire.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_subscription_event_init(rcl_event_t *event, const rcl_subscription_t *subscription, const rcl_subscription_event_type_t event_type)
Initialize an rcl_event_t with a subscription.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
DeliveredMessageKind
The kind of message that the subscription delivers in its callback, used by the executor.
RCLCPP_PUBLIC Logger get_logger(const std::string &name)
Return a named logger.
Options available for a rcl subscription.
Options to configure content filtered topic in the subscription.
Contains callbacks for non-message events that a Subscription can receive from the middleware.