15 #include "rclcpp/subscription_base.hpp"
20 #include <unordered_map>
23 #include "rcpputils/scope_exit.hpp"
25 #include "rclcpp/dynamic_typesupport/dynamic_message.hpp"
26 #include "rclcpp/exceptions.hpp"
27 #include "rclcpp/expand_topic_or_service_name.hpp"
28 #include "rclcpp/experimental/intra_process_manager.hpp"
29 #include "rclcpp/logging.hpp"
30 #include "rclcpp/node_interfaces/node_base_interface.hpp"
31 #include "rclcpp/event_handler.hpp"
33 #include "rmw/error_handling.h"
36 #include "rosidl_dynamic_typesupport/types.h"
40 SubscriptionBase::SubscriptionBase(
42 const rosidl_message_type_support_t & type_support_handle,
43 const std::string & topic_name,
46 bool use_default_callbacks,
48 : node_base_(node_base),
49 node_handle_(node_base_->get_shared_rcl_node_handle()),
51 use_intra_process_(false),
52 intra_process_subscription_id_(0),
53 event_callbacks_(event_callbacks),
54 type_support_(type_support_handle),
55 delivered_message_kind_(delivered_message_kind)
57 auto custom_deletor = [node_handle = this->node_handle_](
rcl_subscription_t * rcl_subs)
62 "Error in destruction of rcl subscription handle: %s",
63 rcl_get_error_string().str);
69 subscription_handle_ = std::shared_ptr<rcl_subscription_t>(
74 subscription_handle_.get(),
78 &subscription_options);
81 auto rcl_node_handle = node_handle_.get();
89 rclcpp::exceptions::throw_from_rcl_error(ret,
"could not create subscription");
97 if (!use_intra_process_) {
100 auto ipm = weak_ipm_.lock();
105 "Intra process manager died before than a subscription.");
108 ipm->remove_subscription(intra_process_subscription_id_);
116 if (event_callbacks.deadline_callback) {
117 this->add_event_handler(
118 event_callbacks.deadline_callback,
119 RCL_SUBSCRIPTION_REQUESTED_DEADLINE_MISSED);
124 "Failed to add event handler for deadline; not supported");
128 if (event_callbacks.liveliness_callback) {
129 this->add_event_handler(
130 event_callbacks.liveliness_callback,
131 RCL_SUBSCRIPTION_LIVELINESS_CHANGED);
136 "Failed to add event handler for liveliness; not supported");
139 QOSRequestedIncompatibleQoSCallbackType incompatible_qos_cb;
140 if (event_callbacks.incompatible_qos_callback) {
141 incompatible_qos_cb = event_callbacks.incompatible_qos_callback;
142 }
else if (use_default_callbacks) {
144 incompatible_qos_cb = [
this](QOSRequestedIncompatibleQoSInfo & info) {
145 this->default_incompatible_qos_callback(info);
150 if (incompatible_qos_cb) {
151 this->add_event_handler(incompatible_qos_cb, RCL_SUBSCRIPTION_REQUESTED_INCOMPATIBLE_QOS);
156 "Failed to add event handler for incompatible qos; not supported");
159 IncompatibleTypeCallbackType incompatible_type_cb;
160 if (event_callbacks.incompatible_type_callback) {
161 incompatible_type_cb = event_callbacks.incompatible_type_callback;
162 }
else if (use_default_callbacks) {
164 incompatible_type_cb = [
this](IncompatibleTypeInfo & info) {
165 this->default_incompatible_type_callback(info);
169 if (incompatible_type_cb) {
170 this->add_event_handler(incompatible_type_cb, RCL_SUBSCRIPTION_INCOMPATIBLE_TYPE);
175 "Failed to add event handler for incompatible type; not supported");
179 if (event_callbacks.message_lost_callback) {
180 this->add_event_handler(
181 event_callbacks.message_lost_callback,
182 RCL_SUBSCRIPTION_MESSAGE_LOST);
187 "Failed to add event handler for message lost; not supported");
191 if (event_callbacks.matched_callback) {
192 this->add_event_handler(
193 event_callbacks.matched_callback,
194 RCL_SUBSCRIPTION_MATCHED);
199 "Failed to add event handler for matched; not supported");
209 std::shared_ptr<rcl_subscription_t>
210 SubscriptionBase::get_subscription_handle()
212 return subscription_handle_;
215 std::shared_ptr<const rcl_subscription_t>
216 SubscriptionBase::get_subscription_handle()
const
218 return subscription_handle_;
222 std::unordered_map<rcl_subscription_event_type_t, std::shared_ptr<rclcpp::EventHandlerBase>> &
225 return event_handlers_;
233 auto msg = std::string(
"failed to get qos settings: ") + rcl_get_error_string().str;
235 throw std::runtime_error(msg);
245 this->get_subscription_handle().get(),
250 TRACETOOLS_TRACEPOINT(rclcpp_take,
static_cast<const void *
>(message_out));
254 rclcpp::exceptions::throw_from_rcl_error(ret);
272 this->get_subscription_handle().get(),
276 TRACETOOLS_TRACEPOINT(
282 rclcpp::exceptions::throw_from_rcl_error(ret);
287 const rosidl_message_type_support_t &
288 SubscriptionBase::get_message_type_support_handle()
const
290 return type_support_;
296 return delivered_message_kind_ == rclcpp::DeliveredMessageKind::SERIALIZED_MESSAGE;
302 return delivered_message_kind_;
308 size_t inter_process_publisher_count = 0;
311 subscription_handle_.get(),
312 &inter_process_publisher_count);
315 rclcpp::exceptions::throw_from_rcl_error(status,
"failed to get get publisher count");
317 return inter_process_publisher_count;
322 uint64_t intra_process_subscription_id,
323 IntraProcessManagerWeakPtr weak_ipm)
325 intra_process_subscription_id_ = intra_process_subscription_id;
326 weak_ipm_ = weak_ipm;
327 use_intra_process_ =
true;
342 "Loaned messages are only safe with const ref subscription callbacks. "
343 "If you are using any other kind of subscriptions, "
344 "set the ROS_DISABLE_LOANED_MESSAGES environment variable to 1 (the default).");
349 rclcpp::Waitable::SharedPtr
353 if (!use_intra_process_) {
357 auto ipm = weak_ipm_.lock();
359 throw std::runtime_error(
360 "SubscriptionBase::get_intra_process_waitable() called "
361 "after destruction of intra process manager");
365 return ipm->get_subscription_intra_process(intra_process_subscription_id_);
369 SubscriptionBase::default_incompatible_qos_callback(
370 rclcpp::QOSRequestedIncompatibleQoSInfo & event)
const
372 std::string policy_name = qos_policy_name_from_kind(event.last_policy_kind);
375 "New publisher discovered on topic '%s', offering incompatible QoS. "
376 "No messages will be sent to it. "
377 "Last incompatible policy: %s",
379 policy_name.c_str());
383 SubscriptionBase::default_incompatible_type_callback(
384 [[maybe_unused]] rclcpp::IncompatibleTypeInfo & event)
const
388 "Incompatible type on topic '%s', no messages will be sent to it.",
get_topic_name());
392 SubscriptionBase::matches_any_intra_process_publishers(
const rmw_gid_t * sender_gid)
const
394 if (!use_intra_process_) {
397 auto ipm = weak_ipm_.lock();
399 throw std::runtime_error(
400 "intra process publisher check called "
401 "after destruction of intra process manager");
403 return ipm->matches_any_publishers(sender_gid);
408 void * pointer_to_subscription_part,
411 if (
nullptr == pointer_to_subscription_part) {
412 throw std::invalid_argument(
"pointer_to_subscription_part is unexpectedly nullptr");
414 if (
this == pointer_to_subscription_part) {
415 return subscription_in_use_by_wait_set_.exchange(in_use_state);
418 return intra_process_subscription_waitable_in_use_by_wait_set_.exchange(in_use_state);
420 for (
const auto & key_event_pair : event_handlers_) {
421 auto qos_event = key_event_pair.second;
422 if (qos_event.get() == pointer_to_subscription_part) {
423 return qos_events_in_use_by_wait_set_[qos_event.get()].exchange(in_use_state);
426 throw std::runtime_error(
"given pointer_to_subscription_part does not match any part");
429 std::vector<rclcpp::NetworkFlowEndpoint>
432 rcutils_allocator_t allocator = rcutils_get_default_allocator();
433 rcl_network_flow_endpoint_array_t network_flow_endpoint_array =
434 rcl_get_zero_initialized_network_flow_endpoint_array();
435 rcl_ret_t ret = rcl_subscription_get_network_flow_endpoints(
436 subscription_handle_.get(), &allocator, &network_flow_endpoint_array);
438 auto error_msg = std::string(
"Error obtaining network flows of subscription: ") +
439 rcl_get_error_string().str;
442 rcl_network_flow_endpoint_array_fini(&network_flow_endpoint_array))
444 error_msg += std::string(
". Also error cleaning up network flow array: ") +
445 rcl_get_error_string().str;
448 rclcpp::exceptions::throw_from_rcl_error(ret, error_msg);
451 std::vector<rclcpp::NetworkFlowEndpoint> network_flow_endpoint_vector;
452 for (
size_t i = 0; i < network_flow_endpoint_array.size; ++i) {
453 network_flow_endpoint_vector.push_back(
455 network_flow_endpoint_array.
456 network_flow_endpoint[i]));
459 ret = rcl_network_flow_endpoint_array_fini(&network_flow_endpoint_array);
461 rclcpp::exceptions::throw_from_rcl_error(ret,
"error cleaning up network flow array");
464 return network_flow_endpoint_vector;
469 rcl_event_callback_t callback,
470 const void * user_data)
473 subscription_handle_.get(),
478 using rclcpp::exceptions::throw_from_rcl_error;
479 throw_from_rcl_error(ret,
"failed to set the on new message callback for subscription");
491 const std::string & filter_expression,
492 const std::vector<std::string> & expression_parameters)
499 subscription_handle_.get(),
505 rclcpp::exceptions::throw_from_rcl_error(
506 ret,
"failed to init subscription content_filtered_topic option");
508 RCPPUTILS_SCOPE_EXIT(
511 subscription_handle_.get(), &options);
515 "Failed to fini subscription content_filtered_topic option: %s",
516 rcl_get_error_string().str);
522 subscription_handle_.get(),
526 rclcpp::exceptions::throw_from_rcl_error(ret,
"failed to set cft expression parameters");
538 subscription_handle_.get(),
542 rclcpp::exceptions::throw_from_rcl_error(ret,
"failed to get cft expression parameters");
545 RCPPUTILS_SCOPE_EXIT(
548 subscription_handle_.get(), &options);
552 "Failed to fini subscription content_filtered_topic option: %s",
553 rcl_get_error_string().str);
558 rmw_subscription_content_filter_options_t & content_filter_options =
559 options.rmw_subscription_content_filter_options;
562 for (
size_t i = 0; i < content_filter_options.expression_parameters.size; ++i) {
564 content_filter_options.expression_parameters.data[i]);
573 SubscriptionBase::take_dynamic_message(
577 throw std::runtime_error(
"Unimplemented");
RCLCPP_PUBLIC Logger get_child(const std::string &suffix)
Return a logger that is a descendant of this logger.
Additional meta data about messages taken from subscriptions.
const rmw_message_info_t & get_rmw_message_info() const
Return the message info as the underlying rmw message info type.
Encapsulation of Quality of Service settings.
Object oriented version of rcl_serialized_message_t with destructor to avoid memory leaks.
rcl_serialized_message_t & get_rcl_serialized_message()
Get the underlying rcl_serialized_t handle.
RCLCPP_PUBLIC size_t get_publisher_count() const
Get matching publisher count.
RCLCPP_PUBLIC rclcpp::QoS get_actual_qos() const
Get the actual QoS settings, after the defaults have been determined.
RCLCPP_PUBLIC bool can_loan_messages() const
Check if subscription instance can loan messages.
void set_on_new_message_callback(std::function< void(size_t)> callback)
Set a callback to be called when each new message is received.
RCLCPP_PUBLIC rclcpp::Waitable::SharedPtr get_intra_process_waitable() const
Return the waitable for intra-process.
RCLCPP_PUBLIC std::vector< rclcpp::NetworkFlowEndpoint > get_network_flow_endpoints() const
Get network flow endpoints.
RCLCPP_PUBLIC void setup_intra_process(uint64_t intra_process_subscription_id, IntraProcessManagerWeakPtr weak_ipm)
Implemenation detail.
RCLCPP_PUBLIC DeliveredMessageKind get_delivered_message_kind() const
Return the delivered message kind.
RCLCPP_PUBLIC bool take_serialized(rclcpp::SerializedMessage &message_out, rclcpp::MessageInfo &message_info_out)
Take the next inter-process message, in its serialized form, from the subscription.
RCLCPP_PUBLIC bool take_type_erased(void *message_out, rclcpp::MessageInfo &message_info_out)
Take the next inter-process message from the subscription as a type erased pointer.
RCLCPP_PUBLIC bool exchange_in_use_by_wait_set_state(void *pointer_to_subscription_part, bool in_use_state)
Exchange state of whether or not a part of the subscription is used by a wait set.
RCLCPP_PUBLIC void set_content_filter(const std::string &filter_expression, const std::vector< std::string > &expression_parameters={})
Set the filter expression and expression parameters for the subscription.
virtual RCLCPP_PUBLIC ~SubscriptionBase()
Destructor.
RCLCPP_PUBLIC bool is_cft_enabled() const
Check if content filtered topic feature of the subscription instance is enabled.
RCLCPP_PUBLIC void bind_event_callbacks(const SubscriptionEventCallbacks &event_callbacks, bool use_default_callbacks)
Add event handlers for passed in event_callbacks.
RCLCPP_PUBLIC const std::unordered_map< rcl_subscription_event_type_t, std::shared_ptr< rclcpp::EventHandlerBase > > & get_event_handlers() const
Get all the QoS event handlers associated with this subscription.
RCLCPP_PUBLIC rclcpp::ContentFilterOptions get_content_filter() const
Get the filter expression and expression parameters for the subscription.
RCLCPP_PUBLIC bool is_serialized() const
Return if the subscription is serialized.
RCLCPP_PUBLIC const char * get_topic_name() const
Get the topic that this subscription is subscribed on.
Pure virtual interface class for the NodeBase part of the Node API.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
RCLCPP_PUBLIC std::string expand_topic_or_service_name(const std::string &name, const std::string &node_name, const std::string &namespace_, bool is_service=false)
Expand a topic or service name and throw if it is not valid.
RCLCPP_PUBLIC Logger get_node_logger(const rcl_node_t *node)
Return a named logger using an rcl_node_t.
DeliveredMessageKind
The kind of message that the subscription delivers in its callback, used by the executor.
RCLCPP_PUBLIC std::vector< const char * > get_c_vector_string(const std::vector< std::string > &strings_in)
Return the std::vector of C string from the given std::vector<std::string>.
RCLCPP_PUBLIC const char * get_c_string(const char *string_in)
Return the given string.
RCLCPP_PUBLIC Logger get_logger(const std::string &name)
Return a named logger.
RCL_PUBLIC RCL_WARN_UNUSED const char * rcl_node_get_name(const rcl_node_t *node)
Return the name of the node.
RCL_PUBLIC RCL_WARN_UNUSED const char * rcl_node_get_namespace(const rcl_node_t *node)
Return the namespace of the node.
RCL_PUBLIC RCL_WARN_UNUSED const char * rcl_node_get_logger_name(const rcl_node_t *node)
Return the logger name of the node.
Options available for a rcl subscription.
Structure which encapsulates a ROS Subscription.
Options to configure content filtered topic in the subscription.
std::string filter_expression
Filter expression is similar to the WHERE part of an SQL clause.
std::vector< std::string > expression_parameters
static QoSInitialization from_rmw(const rmw_qos_profile_t &rmw_qos)
Create a QoSInitialization from an existing rmw_qos_profile_t, using its history and depth.
Contains callbacks for non-message events that a Subscription can receive from the middleware.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_subscription_content_filter_options_fini(const rcl_subscription_t *subscription, rcl_subscription_content_filter_options_t *options)
Reclaim rcl_subscription_content_filter_options_t structure.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_subscription_init(rcl_subscription_t *subscription, const rcl_node_t *node, const rosidl_message_type_support_t *type_support, const char *topic_name, const rcl_subscription_options_t *options)
Initialize a ROS subscription.
RCL_PUBLIC RCL_WARN_UNUSED const char * rcl_subscription_get_topic_name(const rcl_subscription_t *subscription)
Get the topic name for the subscription.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_subscription_set_on_new_message_callback(const rcl_subscription_t *subscription, rcl_event_callback_t callback, const void *user_data)
Set the on new message callback function for the subscription.
RCL_PUBLIC bool rcl_subscription_can_loan_messages(const rcl_subscription_t *subscription)
Check if subscription instance can loan messages.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_subscription_fini(rcl_subscription_t *subscription, rcl_node_t *node)
Finalize a rcl_subscription_t.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_take(const rcl_subscription_t *subscription, void *ros_message, rmw_message_info_t *message_info, rmw_subscription_allocation_t *allocation)
Take a ROS message from a topic using a rcl subscription.
RCL_PUBLIC RCL_WARN_UNUSED rmw_ret_t rcl_subscription_get_publisher_count(const rcl_subscription_t *subscription, size_t *publisher_count)
Get the number of publishers matched to a subscription.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_take_serialized_message(const rcl_subscription_t *subscription, rcl_serialized_message_t *serialized_message, rmw_message_info_t *message_info, rmw_subscription_allocation_t *allocation)
Take a serialized raw message from a topic using a rcl subscription.
RCL_PUBLIC RCL_WARN_UNUSED rcl_subscription_content_filter_options_t rcl_get_zero_initialized_subscription_content_filter_options(void)
Return the zero initialized subscription content filter options.
RCL_PUBLIC RCL_WARN_UNUSED bool rcl_subscription_is_cft_enabled(const rcl_subscription_t *subscription)
Check if the content filtered topic feature is enabled in the subscription.
RCL_PUBLIC RCL_WARN_UNUSED const rmw_qos_profile_t * rcl_subscription_get_actual_qos(const rcl_subscription_t *subscription)
Get the actual qos settings of the subscription.
RCL_PUBLIC RCL_WARN_UNUSED rcl_subscription_t rcl_get_zero_initialized_subscription(void)
Return a rcl_subscription_t struct with members set to NULL.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_subscription_get_content_filter(const rcl_subscription_t *subscription, rcl_subscription_content_filter_options_t *options)
Retrieve the filter expression of the subscription.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_subscription_set_content_filter(const rcl_subscription_t *subscription, const rcl_subscription_content_filter_options_t *options)
Set the filter expression and expression parameters for the subscription.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_subscription_content_filter_options_init(const rcl_subscription_t *subscription, const char *filter_expression, size_t expression_parameters_argc, const char *expression_parameter_argv[], rcl_subscription_content_filter_options_t *options)
Initialize the content filter options for the given subscription options.
#define RCL_RET_SUBSCRIPTION_TAKE_FAILED
Failed to take a message from the subscription return code.
#define RCL_RET_OK
Success return code.
#define RCL_RET_TOPIC_NAME_INVALID
Topic name does not pass validation.
rmw_ret_t rcl_ret_t
The type that holds an rcl return code.