24 #include "rcl/error_handling.h"
26 #include "rcutils/env.h"
27 #include "rcutils/logging_macros.h"
28 #include "rcutils/strdup.h"
29 #include "rcutils/types/string_array.h"
30 #include "rmw/error_handling.h"
31 #include "rmw/subscription_content_filter_options.h"
32 #include "rmw/validate_full_topic_name.h"
33 #include "tracetools/tracetools.h"
36 #include "./subscription_impl.h"
43 return null_subscription;
50 const rosidl_message_type_support_t * type_support,
51 const char * topic_name,
67 RCUTILS_LOG_DEBUG_NAMED(
68 ROS_PACKAGE_NAME,
"Initializing subscription for topic name '%s'", topic_name);
69 if (subscription->
impl) {
70 RCL_SET_ERROR_MSG(
"subscription already initialized, or memory was uninitialized");
75 char * remapped_topic_name = NULL;
82 &remapped_topic_name);
91 RCUTILS_LOG_DEBUG_NAMED(
92 ROS_PACKAGE_NAME,
"Expanded and remapped topic name '%s'", remapped_topic_name);
97 RCL_CHECK_FOR_NULL_WITH_MSG(
102 subscription->
impl->rmw_handle = rmw_create_subscription(
108 if (!subscription->
impl->rmw_handle) {
109 RCL_SET_ERROR_MSG(rmw_get_error_string().str);
113 rmw_ret_t rmw_ret = rmw_subscription_get_actual_qos(
114 subscription->
impl->rmw_handle,
115 &subscription->
impl->actual_qos);
116 if (RMW_RET_OK != rmw_ret) {
117 RCL_SET_ERROR_MSG(rmw_get_error_string().str);
120 subscription->
impl->actual_qos.avoid_ros_namespace_conventions =
121 options->
qos.avoid_ros_namespace_conventions;
123 subscription->
impl->options = *options;
124 RCUTILS_LOG_DEBUG_NAMED(ROS_PACKAGE_NAME,
"Subscription initialized");
128 (
const void *)subscription,
130 (
const void *)subscription->
impl->rmw_handle,
135 if (subscription->
impl) {
136 if (subscription->
impl->rmw_handle) {
137 rmw_ret_t rmw_fail_ret = rmw_destroy_subscription(
139 if (RMW_RET_OK != rmw_fail_ret) {
140 RCUTILS_SAFE_FWRITE_TO_STDERR(rmw_get_error_string().str);
141 RCUTILS_SAFE_FWRITE_TO_STDERR(
"\n");
147 RCUTILS_SAFE_FWRITE_TO_STDERR(rmw_get_error_string().str);
148 RCUTILS_SAFE_FWRITE_TO_STDERR(
"\n");
151 allocator->deallocate(subscription->
impl, allocator->state);
152 subscription->
impl = NULL;
157 allocator->deallocate(remapped_topic_name, allocator->state);
169 RCUTILS_LOG_DEBUG_NAMED(ROS_PACKAGE_NAME,
"Finalizing subscription");
175 if (subscription->
impl) {
182 rmw_destroy_subscription(rmw_node, subscription->
impl->rmw_handle);
183 if (ret != RMW_RET_OK) {
184 RCL_SET_ERROR_MSG(rmw_get_error_string().str);
189 RCUTILS_SAFE_FWRITE_TO_STDERR(rcl_get_error_string().str);
190 RCUTILS_SAFE_FWRITE_TO_STDERR(
"\n");
194 allocator.deallocate(subscription->
impl, allocator.state);
195 subscription->
impl = NULL;
197 RCUTILS_LOG_DEBUG_NAMED(ROS_PACKAGE_NAME,
"Subscription finalized");
207 default_options.
qos = rmw_qos_profile_default;
210 return default_options;
222 rmw_ret_t ret = rmw_subscription_content_filter_options_fini(
224 if (RCUTILS_RET_OK != ret) {
225 RCUTILS_SAFE_FWRITE_TO_STDERR(
"Failed to fini content filter options.\n");
226 return rcl_convert_rmw_ret_to_rcl_ret(ret);
228 allocator->deallocate(
237 const char * filter_expression,
238 size_t expression_parameters_argc,
239 const char * expression_parameter_argv[],
243 if (expression_parameters_argc > 100) {
244 RCL_SET_ERROR_MSG(
"The maximum of expression parameters argument number is 100");
253 rmw_subscription_content_filter_options_t * original_content_filter_options =
255 rmw_subscription_content_filter_options_t content_filter_options_backup =
256 rmw_get_zero_initialized_content_filter_options();
258 if (original_content_filter_options) {
260 rmw_ret = rmw_subscription_content_filter_options_copy(
261 original_content_filter_options,
263 &content_filter_options_backup
265 if (rmw_ret != RMW_RET_OK) {
266 return rcl_convert_rmw_ret_to_rcl_ret(rmw_ret);
271 sizeof(rmw_subscription_content_filter_options_t), allocator->state);
273 RCL_SET_ERROR_MSG(
"failed to allocate memory");
277 rmw_get_zero_initialized_content_filter_options();
280 rmw_ret = rmw_subscription_content_filter_options_set(
282 expression_parameters_argc,
283 expression_parameter_argv,
288 if (rmw_ret != RMW_RET_OK) {
289 ret = rcl_convert_rmw_ret_to_rcl_ret(rmw_ret);
293 rmw_ret = rmw_subscription_content_filter_options_fini(
294 &content_filter_options_backup,
297 if (rmw_ret != RMW_RET_OK) {
298 return rcl_convert_rmw_ret_to_rcl_ret(rmw_ret);
305 if (original_content_filter_options == NULL) {
307 rmw_ret = rmw_subscription_content_filter_options_fini(
312 if (rmw_ret != RMW_RET_OK) {
313 return rcl_convert_rmw_ret_to_rcl_ret(rmw_ret);
316 allocator->deallocate(
321 rmw_ret = rmw_subscription_content_filter_options_copy(
322 &content_filter_options_backup,
326 if (rmw_ret != RMW_RET_OK) {
327 return rcl_convert_rmw_ret_to_rcl_ret(rmw_ret);
330 rmw_ret = rmw_subscription_content_filter_options_fini(
331 &content_filter_options_backup,
334 if (rmw_ret != RMW_RET_OK) {
335 return rcl_convert_rmw_ret_to_rcl_ret(rmw_ret);
346 .rmw_subscription_content_filter_options =
347 rmw_get_zero_initialized_content_filter_options()
354 const char * filter_expression,
355 size_t expression_parameters_argc,
356 const char * expression_parameter_argv[],
365 if (expression_parameters_argc > 100) {
366 RCL_SET_ERROR_MSG(
"The maximum of expression parameters argument number is 100");
370 rmw_ret_t rmw_ret = rmw_subscription_content_filter_options_init(
372 expression_parameters_argc,
373 expression_parameter_argv,
375 &options->rmw_subscription_content_filter_options
378 return rcl_convert_rmw_ret_to_rcl_ret(rmw_ret);
384 const char * filter_expression,
385 size_t expression_parameters_argc,
386 const char * expression_parameter_argv[],
392 if (expression_parameters_argc > 100) {
393 RCL_SET_ERROR_MSG(
"The maximum of expression parameters argument number is 100");
400 rmw_ret_t ret = rmw_subscription_content_filter_options_set(
402 expression_parameters_argc,
403 expression_parameter_argv,
405 &options->rmw_subscription_content_filter_options
407 return rcl_convert_rmw_ret_to_rcl_ret(ret);
422 rmw_ret_t ret = rmw_subscription_content_filter_options_fini(
423 &options->rmw_subscription_content_filter_options,
427 return rcl_convert_rmw_ret_to_rcl_ret(ret);
436 return subscription->
impl->rmw_handle->is_cft_enabled;
453 rmw_ret_t ret = rmw_subscription_set_content_filter(
454 subscription->
impl->rmw_handle,
455 &options->rmw_subscription_content_filter_options);
457 if (ret != RMW_RET_OK) {
458 RCL_SET_ERROR_MSG(rmw_get_error_string().str);
459 return rcl_convert_rmw_ret_to_rcl_ret(ret);
463 const rmw_subscription_content_filter_options_t * content_filter_options =
464 &options->rmw_subscription_content_filter_options;
466 content_filter_options->filter_expression,
467 content_filter_options->expression_parameters.size,
468 (
const char **)content_filter_options->expression_parameters.data,
469 &subscription->
impl->options
489 rmw_ret_t rmw_ret = rmw_subscription_get_content_filter(
490 subscription->
impl->rmw_handle,
492 &options->rmw_subscription_content_filter_options);
494 return rcl_convert_rmw_ret_to_rcl_ret(rmw_ret);
501 rmw_message_info_t * message_info,
502 rmw_subscription_allocation_t * allocation
505 RCUTILS_LOG_DEBUG_NAMED(ROS_PACKAGE_NAME,
"Subscription taking message");
512 rmw_message_info_t dummy_message_info;
513 rmw_message_info_t * message_info_local = message_info ? message_info : &dummy_message_info;
514 *message_info_local = rmw_get_zero_initialized_message_info();
517 rmw_ret_t ret = rmw_take_with_info(
518 subscription->
impl->rmw_handle, ros_message, &taken, message_info_local, allocation);
519 if (ret != RMW_RET_OK) {
520 RCL_SET_ERROR_MSG(rmw_get_error_string().str);
521 return rcl_convert_rmw_ret_to_rcl_ret(ret);
523 RCUTILS_LOG_DEBUG_NAMED(
524 ROS_PACKAGE_NAME,
"Subscription take succeeded: %s", taken ?
"true" :
"false");
525 TRACEPOINT(
rcl_take, (
const void *)ros_message);
536 rmw_message_sequence_t * message_sequence,
537 rmw_message_info_sequence_t * message_info_sequence,
538 rmw_subscription_allocation_t * allocation
541 RCUTILS_LOG_DEBUG_NAMED(ROS_PACKAGE_NAME,
"Subscription taking %zu messages", count);
548 if (message_sequence->capacity < count) {
549 RCL_SET_ERROR_MSG(
"Insufficient message sequence capacity for requested count");
553 if (message_info_sequence->capacity < count) {
554 RCL_SET_ERROR_MSG(
"Insufficient message info sequence capacity for requested count");
559 message_sequence->size = 0u;
560 message_info_sequence->size = 0u;
563 rmw_ret_t ret = rmw_take_sequence(
564 subscription->
impl->rmw_handle, count, message_sequence, message_info_sequence, &taken,
566 if (ret != RMW_RET_OK) {
567 RCL_SET_ERROR_MSG(rmw_get_error_string().str);
568 return rcl_convert_rmw_ret_to_rcl_ret(ret);
570 RCUTILS_LOG_DEBUG_NAMED(
571 ROS_PACKAGE_NAME,
"Subscription took %zu messages", taken);
582 rmw_message_info_t * message_info,
583 rmw_subscription_allocation_t * allocation
586 RCUTILS_LOG_DEBUG_NAMED(ROS_PACKAGE_NAME,
"Subscription taking serialized message");
592 rmw_message_info_t dummy_message_info;
593 rmw_message_info_t * message_info_local = message_info ? message_info : &dummy_message_info;
594 *message_info_local = rmw_get_zero_initialized_message_info();
597 rmw_ret_t ret = rmw_take_serialized_message_with_info(
598 subscription->
impl->rmw_handle, serialized_message, &taken, message_info_local, allocation);
599 if (ret != RMW_RET_OK) {
600 RCL_SET_ERROR_MSG(rmw_get_error_string().str);
601 return rcl_convert_rmw_ret_to_rcl_ret(ret);
603 RCUTILS_LOG_DEBUG_NAMED(
604 ROS_PACKAGE_NAME,
"Subscription serialized take succeeded: %s", taken ?
"true" :
"false");
614 void ** loaned_message,
615 rmw_message_info_t * message_info,
616 rmw_subscription_allocation_t * allocation)
618 RCUTILS_LOG_DEBUG_NAMED(ROS_PACKAGE_NAME,
"Subscription taking loaned message");
623 if (*loaned_message) {
624 RCL_SET_ERROR_MSG(
"loaned message is already initialized");
628 rmw_message_info_t dummy_message_info;
629 rmw_message_info_t * message_info_local = message_info ? message_info : &dummy_message_info;
630 *message_info_local = rmw_get_zero_initialized_message_info();
633 rmw_ret_t ret = rmw_take_loaned_message_with_info(
634 subscription->
impl->rmw_handle, loaned_message, &taken, message_info_local, allocation);
635 if (ret != RMW_RET_OK) {
636 RCL_SET_ERROR_MSG(rmw_get_error_string().str);
637 return rcl_convert_rmw_ret_to_rcl_ret(ret);
639 RCUTILS_LOG_DEBUG_NAMED(
640 ROS_PACKAGE_NAME,
"Subscription loaned take succeeded: %s", taken ?
"true" :
"false");
650 void * loaned_message)
652 RCUTILS_LOG_DEBUG_NAMED(ROS_PACKAGE_NAME,
"Subscription releasing loaned message");
657 return rcl_convert_rmw_ret_to_rcl_ret(
658 rmw_return_loaned_message_from_subscription(
659 subscription->
impl->rmw_handle, loaned_message));
668 return subscription->
impl->rmw_handle->topic_name;
671 #define _subscription_get_options(subscription) & subscription->impl->options
679 return _subscription_get_options(subscription);
688 return subscription->
impl->rmw_handle;
694 RCL_CHECK_FOR_NULL_WITH_MSG(subscription,
"subscription pointer is invalid",
return false);
695 RCL_CHECK_FOR_NULL_WITH_MSG(
696 subscription->
impl,
"subscription's implementation is invalid",
return false);
697 RCL_CHECK_FOR_NULL_WITH_MSG(
698 subscription->
impl->rmw_handle,
"subscription's rmw handle is invalid",
return false);
705 size_t * publisher_count)
714 rmw_ret_t ret = rmw_subscription_count_matched_publishers(
715 subscription->
impl->rmw_handle, publisher_count);
717 if (ret != RMW_RET_OK) {
718 RCL_SET_ERROR_MSG(rmw_get_error_string().str);
719 return rcl_convert_rmw_ret_to_rcl_ret(ret);
724 const rmw_qos_profile_t *
730 return &subscription->
impl->actual_qos;
743 bool disable_loaned_message =
true;
745 const char * env_val = NULL;
746 const char * env_error_str = rcutils_get_env(RCL_DISABLE_LOANED_MESSAGES_ENV_VAR, &env_val);
747 if (NULL != env_error_str) {
748 RCUTILS_SAFE_FWRITE_TO_STDERR(
"Failed to get disable_loaned_message: ");
749 RCUTILS_SAFE_FWRITE_TO_STDERR_WITH_FORMAT_STRING(
750 "Error getting env var: '" RCUTILS_STRINGIFY(RCL_DISABLE_LOANED_MESSAGES_ENV_VAR)
"': %s\n",
754 disable_loaned_message = !(strcmp(env_val,
"0") == 0);
757 if (disable_loaned_message) {
761 return subscription->
impl->rmw_handle->can_loan_messages;
767 rcl_event_callback_t callback,
768 const void * user_data)
775 return rmw_subscription_set_on_new_message_callback(
776 subscription->
impl->rmw_handle,
#define rcl_get_default_allocator
Return a properly initialized rcl_allocator_t with default values.
#define RCL_CHECK_ALLOCATOR_WITH_MSG(allocator, msg, fail_statement)
Check that the given allocator is initialized, or fail with a message.
rcutils_allocator_t rcl_allocator_t
Encapsulation of an allocator.
RCL_PUBLIC bool rcl_node_is_valid(const rcl_node_t *node)
Return true if the node is valid, else false.
RCL_PUBLIC RCL_WARN_UNUSED rmw_node_t * rcl_node_get_rmw_handle(const rcl_node_t *node)
Return the rmw node handle.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_node_resolve_name(const rcl_node_t *node, const char *input_name, rcl_allocator_t allocator, bool is_service, bool only_expand, char **output_name)
Expand a given name into a fully-qualified topic name and apply remapping rules.
RCL_PUBLIC bool rcl_node_is_valid_except_context(const rcl_node_t *node)
Return true if node is valid, except for the context being valid.
Structure which encapsulates a ROS Node.
Options available for a rcl subscription.
rcl_allocator_t allocator
Custom allocator for the subscription, used for incidental allocations.
rmw_qos_profile_t qos
Middleware quality of service settings for the subscription.
rmw_subscription_options_t rmw_subscription_options
rmw specific subscription options, e.g. the rmw implementation specific payload.
Structure which encapsulates a ROS Subscription.
rcl_subscription_impl_t * impl
Pointer to the subscription implementation.
RCL_PUBLIC RCL_WARN_UNUSED rcl_subscription_options_t rcl_subscription_get_default_options(void)
Return the default subscription options in a rcl_subscription_options_t.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_subscription_content_filter_options_fini(const rcl_subscription_t *subscription, rcl_subscription_content_filter_options_t *options)
Reclaim rcl_subscription_content_filter_options_t structure.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_subscription_options_fini(rcl_subscription_options_t *option)
Reclaim resources held inside rcl_subscription_options_t structure.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_subscription_init(rcl_subscription_t *subscription, const rcl_node_t *node, const rosidl_message_type_support_t *type_support, const char *topic_name, const rcl_subscription_options_t *options)
Initialize a ROS subscription.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_take_sequence(const rcl_subscription_t *subscription, size_t count, rmw_message_sequence_t *message_sequence, rmw_message_info_sequence_t *message_info_sequence, rmw_subscription_allocation_t *allocation)
Take a sequence of messages from a topic using a rcl subscription.
RCL_PUBLIC RCL_WARN_UNUSED const char * rcl_subscription_get_topic_name(const rcl_subscription_t *subscription)
Get the topic name for the subscription.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_subscription_set_on_new_message_callback(const rcl_subscription_t *subscription, rcl_event_callback_t callback, const void *user_data)
Set the on new message callback function for the subscription.
RCL_PUBLIC bool rcl_subscription_can_loan_messages(const rcl_subscription_t *subscription)
Check if subscription instance can loan messages.
RCL_PUBLIC RCL_WARN_UNUSED rmw_subscription_t * rcl_subscription_get_rmw_handle(const rcl_subscription_t *subscription)
Return the rmw subscription handle.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_subscription_fini(rcl_subscription_t *subscription, rcl_node_t *node)
Finalize a rcl_subscription_t.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_take(const rcl_subscription_t *subscription, void *ros_message, rmw_message_info_t *message_info, rmw_subscription_allocation_t *allocation)
Take a ROS message from a topic using a rcl subscription.
RCL_PUBLIC RCL_WARN_UNUSED rmw_ret_t rcl_subscription_get_publisher_count(const rcl_subscription_t *subscription, size_t *publisher_count)
Get the number of publishers matched to a subscription.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_subscription_options_set_content_filter_options(const char *filter_expression, size_t expression_parameters_argc, const char *expression_parameter_argv[], rcl_subscription_options_t *options)
Set the content filter options for the given subscription options.
RCL_PUBLIC bool rcl_subscription_is_valid(const rcl_subscription_t *subscription)
Check that the subscription is valid.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_take_serialized_message(const rcl_subscription_t *subscription, rcl_serialized_message_t *serialized_message, rmw_message_info_t *message_info, rmw_subscription_allocation_t *allocation)
Take a serialized raw message from a topic using a rcl subscription.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_take_loaned_message(const rcl_subscription_t *subscription, void **loaned_message, rmw_message_info_t *message_info, rmw_subscription_allocation_t *allocation)
Take a loaned message from a topic using a rcl subscription.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_subscription_content_filter_options_set(const rcl_subscription_t *subscription, const char *filter_expression, size_t expression_parameters_argc, const char *expression_parameter_argv[], rcl_subscription_content_filter_options_t *options)
Set the content filter options for the given subscription options.
RCL_PUBLIC RCL_WARN_UNUSED rcl_subscription_content_filter_options_t rcl_get_zero_initialized_subscription_content_filter_options(void)
Return the zero initialized subscription content filter options.
RCL_PUBLIC RCL_WARN_UNUSED bool rcl_subscription_is_cft_enabled(const rcl_subscription_t *subscription)
Check if the content filtered topic feature is enabled in the subscription.
RCL_PUBLIC RCL_WARN_UNUSED const rmw_qos_profile_t * rcl_subscription_get_actual_qos(const rcl_subscription_t *subscription)
Get the actual qos settings of the subscription.
RCL_PUBLIC RCL_WARN_UNUSED rcl_subscription_t rcl_get_zero_initialized_subscription(void)
Return a rcl_subscription_t struct with members set to NULL.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_subscription_get_content_filter(const rcl_subscription_t *subscription, rcl_subscription_content_filter_options_t *options)
Retrieve the filter expression of the subscription.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_subscription_set_content_filter(const rcl_subscription_t *subscription, const rcl_subscription_content_filter_options_t *options)
Set the filter expression and expression parameters for the subscription.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_subscription_content_filter_options_init(const rcl_subscription_t *subscription, const char *filter_expression, size_t expression_parameters_argc, const char *expression_parameter_argv[], rcl_subscription_content_filter_options_t *options)
Initialize the content filter options for the given subscription options.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_return_loaned_message_from_subscription(const rcl_subscription_t *subscription, void *loaned_message)
Return a loaned message from a topic using a rcl subscription.
RCL_PUBLIC RCL_WARN_UNUSED const rcl_subscription_options_t * rcl_subscription_get_options(const rcl_subscription_t *subscription)
Return the rcl subscription options.
#define RCL_RET_UNKNOWN_SUBSTITUTION
Topic name substitution is unknown.
#define RCL_RET_ALREADY_INIT
rcl_init() already called return code.
#define RCL_RET_SUBSCRIPTION_TAKE_FAILED
Failed to take a message from the subscription return code.
#define RCL_RET_OK
Success return code.
#define RCL_RET_BAD_ALLOC
Failed to allocate memory return code.
#define RCL_RET_INVALID_ARGUMENT
Invalid argument return code.
#define RCL_RET_ERROR
Unspecified error return code.
rmw_serialized_message_t rcl_serialized_message_t
typedef for rmw_serialized_message_t;
#define RCL_RET_NODE_INVALID
Invalid rcl_node_t given return code.
#define RCL_RET_TOPIC_NAME_INVALID
Topic name does not pass validation.
#define RCL_RET_SUBSCRIPTION_INVALID
Invalid rcl_subscription_t given return code.
rmw_ret_t rcl_ret_t
The type that holds an rcl return code.