ROS 2 rclcpp + rcl - rolling  rolling-a919a6e5
ROS 2 C++ Client Library with ROS Client Library
subscription.c
1 // Copyright 2015 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifdef __cplusplus
16 extern "C"
17 {
18 #endif
19 
20 #include "rcl/subscription.h"
21 
22 #include <stdio.h>
23 
24 #include "rcl/error_handling.h"
25 #include "rcl/node.h"
26 #include "rcl/node_type_cache.h"
27 #include "rcutils/env.h"
28 #include "rcutils/logging_macros.h"
29 #include "rcutils/strdup.h"
30 #include "rcutils/types/string_array.h"
31 #include "rmw/error_handling.h"
32 #include "rmw/dynamic_message_type_support.h"
33 #include "rmw/subscription_content_filter_options.h"
34 #include "rmw/validate_full_topic_name.h"
35 #include "rosidl_dynamic_typesupport/identifier.h"
36 #include "tracetools/tracetools.h"
37 
38 #include "./common.h"
39 #include "./subscription_impl.h"
40 
41 
44 {
45  // All members are initialized to 0 or NULL by C99 6.7.8/10.
46  static rcl_subscription_t null_subscription;
47  return null_subscription;
48 }
49 
52  rcl_subscription_t * subscription,
53  const rcl_node_t * node,
54  const rosidl_message_type_support_t * type_support,
55  const char * topic_name,
56  const rcl_subscription_options_t * options
57 )
58 {
59  rcl_ret_t fail_ret = RCL_RET_ERROR;
60 
61  // Check options and allocator first, so the allocator can be used in errors.
62  RCL_CHECK_ARGUMENT_FOR_NULL(options, RCL_RET_INVALID_ARGUMENT);
63  rcl_allocator_t * allocator = (rcl_allocator_t *)&options->allocator;
64  RCL_CHECK_ALLOCATOR_WITH_MSG(allocator, "invalid allocator", return RCL_RET_INVALID_ARGUMENT);
65  RCL_CHECK_ARGUMENT_FOR_NULL(subscription, RCL_RET_INVALID_ARGUMENT);
66  if (!rcl_node_is_valid(node)) {
67  return RCL_RET_NODE_INVALID; // error already set
68  }
69  RCL_CHECK_ARGUMENT_FOR_NULL(type_support, RCL_RET_INVALID_ARGUMENT);
70  RCL_CHECK_ARGUMENT_FOR_NULL(topic_name, RCL_RET_INVALID_ARGUMENT);
71  RCUTILS_LOG_DEBUG_NAMED(
72  ROS_PACKAGE_NAME, "Initializing subscription for topic name '%s'", topic_name);
73  if (subscription->impl) {
74  RCL_SET_ERROR_MSG("subscription already initialized, or memory was uninitialized");
75  return RCL_RET_ALREADY_INIT;
76  }
77 
78  // Expand and remap the given topic name.
79  char * remapped_topic_name = NULL;
81  node,
82  topic_name,
83  *allocator,
84  false,
85  false,
86  &remapped_topic_name);
87  if (ret != RCL_RET_OK) {
90  } else if (ret != RCL_RET_BAD_ALLOC) {
91  ret = RCL_RET_ERROR;
92  }
93  goto cleanup;
94  }
95  RCUTILS_LOG_DEBUG_NAMED(
96  ROS_PACKAGE_NAME, "Expanded and remapped topic name '%s'", remapped_topic_name);
97 
98  // Allocate memory for the implementation struct.
99  subscription->impl = (rcl_subscription_impl_t *)allocator->zero_allocate(
100  1, sizeof(rcl_subscription_impl_t), allocator->state);
101  RCL_CHECK_FOR_NULL_WITH_MSG(
102  subscription->impl, "allocating memory failed", ret = RCL_RET_BAD_ALLOC; goto cleanup);
103 
104  // options
105  subscription->impl->options = *options;
106 
107  // Fill out the implemenation struct.
108  // TODO(wjwwood): pass allocator once supported in rmw api.
109  subscription->impl->rmw_handle = rmw_create_subscription(
111  type_support,
112  remapped_topic_name,
113  &(options->qos),
114  &(options->rmw_subscription_options));
115  if (!subscription->impl->rmw_handle) {
116  RCL_SET_ERROR_MSG(rmw_get_error_string().str);
117  goto fail;
118  }
119  // get actual qos, and store it
120  rmw_ret_t rmw_ret = rmw_subscription_get_actual_qos(
121  subscription->impl->rmw_handle,
122  &subscription->impl->actual_qos);
123  if (RMW_RET_OK != rmw_ret) {
124  RCL_SET_ERROR_MSG(rmw_get_error_string().str);
125  goto fail;
126  }
127  subscription->impl->actual_qos.avoid_ros_namespace_conventions =
128  options->qos.avoid_ros_namespace_conventions;
129 
130  if (RCL_RET_OK != rcl_node_type_cache_register_type(
131  node, type_support->get_type_hash_func(type_support),
132  type_support->get_type_description_func(type_support),
133  type_support->get_type_description_sources_func(type_support)))
134  {
135  rcutils_reset_error();
136  RCL_SET_ERROR_MSG("Failed to register type for subscription");
137  goto fail;
138  }
139  subscription->impl->type_hash = *type_support->get_type_hash_func(type_support);
140 
141  RCUTILS_LOG_DEBUG_NAMED(ROS_PACKAGE_NAME, "Subscription initialized");
142  ret = RCL_RET_OK;
143  TRACETOOLS_TRACEPOINT(
145  (const void *)subscription,
146  (const void *)node,
147  (const void *)subscription->impl->rmw_handle,
148  remapped_topic_name,
149  options->qos.depth);
150 
151  goto cleanup;
152 fail:
153  if (subscription->impl) {
154  if (subscription->impl->rmw_handle) {
155  rmw_ret_t rmw_fail_ret = rmw_destroy_subscription(
156  rcl_node_get_rmw_handle(node), subscription->impl->rmw_handle);
157  if (RMW_RET_OK != rmw_fail_ret) {
158  RCUTILS_SAFE_FWRITE_TO_STDERR(rmw_get_error_string().str);
159  RCUTILS_SAFE_FWRITE_TO_STDERR("\n");
160  }
161  }
162 
163  ret = rcl_subscription_options_fini(&subscription->impl->options);
164  if (RCL_RET_OK != ret) {
165  RCUTILS_SAFE_FWRITE_TO_STDERR(rmw_get_error_string().str);
166  RCUTILS_SAFE_FWRITE_TO_STDERR("\n");
167  }
168 
169  allocator->deallocate(subscription->impl, allocator->state);
170  subscription->impl = NULL;
171  }
172  ret = fail_ret;
173  // Fall through to cleanup
174 cleanup:
175  allocator->deallocate(remapped_topic_name, allocator->state);
176  return ret;
177 }
178 
179 rcl_ret_t
181 {
182  RCUTILS_CAN_RETURN_WITH_ERROR_OF(RCL_RET_SUBSCRIPTION_INVALID);
183  RCUTILS_CAN_RETURN_WITH_ERROR_OF(RCL_RET_NODE_INVALID);
184  RCUTILS_CAN_RETURN_WITH_ERROR_OF(RCL_RET_INVALID_ARGUMENT);
185  RCUTILS_CAN_RETURN_WITH_ERROR_OF(RCL_RET_ERROR);
186 
187  RCUTILS_LOG_DEBUG_NAMED(ROS_PACKAGE_NAME, "Finalizing subscription");
188  rcl_ret_t result = RCL_RET_OK;
189  RCL_CHECK_ARGUMENT_FOR_NULL(subscription, RCL_RET_SUBSCRIPTION_INVALID);
191  return RCL_RET_NODE_INVALID; // error already set
192  }
193  if (subscription->impl) {
194  rcl_allocator_t allocator = subscription->impl->options.allocator;
195  rmw_node_t * rmw_node = rcl_node_get_rmw_handle(node);
196  if (!rmw_node) {
198  }
199  rmw_ret_t ret =
200  rmw_destroy_subscription(rmw_node, subscription->impl->rmw_handle);
201  if (ret != RMW_RET_OK) {
202  RCL_SET_ERROR_MSG(rmw_get_error_string().str);
203  result = RCL_RET_ERROR;
204  }
205  rcl_ret_t rcl_ret = rcl_subscription_options_fini(&subscription->impl->options);
206  if (RCL_RET_OK != rcl_ret) {
207  RCUTILS_SAFE_FWRITE_TO_STDERR(rcl_get_error_string().str);
208  RCUTILS_SAFE_FWRITE_TO_STDERR("\n");
209  result = RCL_RET_ERROR;
210  }
211 
212  if (
213  ROSIDL_TYPE_HASH_VERSION_UNSET != subscription->impl->type_hash.version &&
214  RCL_RET_OK != rcl_node_type_cache_unregister_type(node, &subscription->impl->type_hash))
215  {
216  RCUTILS_SAFE_FWRITE_TO_STDERR(rcl_get_error_string().str);
217  RCUTILS_SAFE_FWRITE_TO_STDERR("\n");
218  result = RCL_RET_ERROR;
219  }
220 
221  allocator.deallocate(subscription->impl, allocator.state);
222  subscription->impl = NULL;
223  }
224  RCUTILS_LOG_DEBUG_NAMED(ROS_PACKAGE_NAME, "Subscription finalized");
225  return result;
226 }
227 
230 {
231  // !!! MAKE SURE THAT CHANGES TO THESE DEFAULTS ARE REFLECTED IN THE HEADER DOC STRING
232  rcl_subscription_options_t default_options;
233  // Must set these after declaration because they are not a compile time constants.
234  default_options.qos = rmw_qos_profile_default;
235  default_options.allocator = rcl_get_default_allocator();
236  default_options.rmw_subscription_options = rmw_get_default_subscription_options();
237 
238  // Load disable flag to LoanedMessage via environmental variable.
239  // TODO(clalancette): This is kind of a copy of rcl_get_disable_loaned_message(), but we need
240  // more information than that function provides.
241  default_options.disable_loaned_message = true;
242 
243  const char * env_val = NULL;
244  const char * env_error_str = rcutils_get_env(RCL_DISABLE_LOANED_MESSAGES_ENV_VAR, &env_val);
245  if (NULL != env_error_str) {
246  RCUTILS_SAFE_FWRITE_TO_STDERR("Failed to get disable_loaned_message: ");
247  RCUTILS_SAFE_FWRITE_TO_STDERR_WITH_FORMAT_STRING(
248  "Error getting env var: '" RCUTILS_STRINGIFY(RCL_DISABLE_LOANED_MESSAGES_ENV_VAR) "': %s\n",
249  env_error_str);
250  } else {
251  default_options.disable_loaned_message = !(strcmp(env_val, "0") == 0);
252  }
253 
254  return default_options;
255 }
256 
257 rcl_ret_t
259 {
260  RCL_CHECK_ARGUMENT_FOR_NULL(option, RCL_RET_INVALID_ARGUMENT);
261  // fini rmw_subscription_options_t
262  const rcl_allocator_t * allocator = &option->allocator;
263  RCL_CHECK_ALLOCATOR_WITH_MSG(allocator, "invalid allocator", return RCL_RET_INVALID_ARGUMENT);
264 
265  if (option->rmw_subscription_options.content_filter_options) {
266  rmw_ret_t ret = rmw_subscription_content_filter_options_fini(
267  option->rmw_subscription_options.content_filter_options, allocator);
268  if (RCUTILS_RET_OK != ret) {
269  RCUTILS_SAFE_FWRITE_TO_STDERR("Failed to fini content filter options.\n");
270  return rcl_convert_rmw_ret_to_rcl_ret(ret);
271  }
272  allocator->deallocate(
273  option->rmw_subscription_options.content_filter_options, allocator->state);
274  option->rmw_subscription_options.content_filter_options = NULL;
275  }
276  return RCL_RET_OK;
277 }
278 
279 rcl_ret_t
281  const char * filter_expression,
282  size_t expression_parameters_argc,
283  const char * expression_parameter_argv[],
284  rcl_subscription_options_t * options)
285 {
286  RCL_CHECK_ARGUMENT_FOR_NULL(filter_expression, RCL_RET_INVALID_ARGUMENT);
287  if (expression_parameters_argc > 100) {
288  RCL_SET_ERROR_MSG("The maximum of expression parameters argument number is 100");
290  }
291  RCL_CHECK_ARGUMENT_FOR_NULL(options, RCL_RET_INVALID_ARGUMENT);
292  const rcl_allocator_t * allocator = &options->allocator;
293  RCL_CHECK_ALLOCATOR_WITH_MSG(allocator, "invalid allocator", return RCL_RET_INVALID_ARGUMENT);
294 
295  rcl_ret_t ret;
296  rmw_ret_t rmw_ret;
297  rmw_subscription_content_filter_options_t * original_content_filter_options =
298  options->rmw_subscription_options.content_filter_options;
299  rmw_subscription_content_filter_options_t content_filter_options_backup =
300  rmw_get_zero_initialized_content_filter_options();
301 
302  if (original_content_filter_options) {
303  // make a backup, restore the data if failure happened
304  rmw_ret = rmw_subscription_content_filter_options_copy(
305  original_content_filter_options,
306  allocator,
307  &content_filter_options_backup
308  );
309  if (rmw_ret != RMW_RET_OK) {
310  return rcl_convert_rmw_ret_to_rcl_ret(rmw_ret);
311  }
312  } else {
313  options->rmw_subscription_options.content_filter_options =
314  allocator->allocate(
315  sizeof(rmw_subscription_content_filter_options_t), allocator->state);
316  if (!options->rmw_subscription_options.content_filter_options) {
317  RCL_SET_ERROR_MSG("failed to allocate memory");
318  return RCL_RET_BAD_ALLOC;
319  }
320  *options->rmw_subscription_options.content_filter_options =
321  rmw_get_zero_initialized_content_filter_options();
322  }
323 
324  rmw_ret = rmw_subscription_content_filter_options_set(
325  filter_expression,
326  expression_parameters_argc,
327  expression_parameter_argv,
328  allocator,
329  options->rmw_subscription_options.content_filter_options
330  );
331 
332  if (rmw_ret != RMW_RET_OK) {
333  ret = rcl_convert_rmw_ret_to_rcl_ret(rmw_ret);
334  goto failed;
335  }
336 
337  rmw_ret = rmw_subscription_content_filter_options_fini(
338  &content_filter_options_backup,
339  allocator
340  );
341  if (rmw_ret != RMW_RET_OK) {
342  return rcl_convert_rmw_ret_to_rcl_ret(rmw_ret);
343  }
344 
345  return RMW_RET_OK;
346 
347 failed:
348 
349  if (original_content_filter_options == NULL) {
350  if (options->rmw_subscription_options.content_filter_options) {
351  rmw_ret = rmw_subscription_content_filter_options_fini(
352  options->rmw_subscription_options.content_filter_options,
353  allocator
354  );
355 
356  if (rmw_ret != RMW_RET_OK) {
357  return rcl_convert_rmw_ret_to_rcl_ret(rmw_ret);
358  }
359 
360  allocator->deallocate(
361  options->rmw_subscription_options.content_filter_options, allocator->state);
362  options->rmw_subscription_options.content_filter_options = NULL;
363  }
364  } else {
365  rmw_ret = rmw_subscription_content_filter_options_copy(
366  &content_filter_options_backup,
367  allocator,
368  options->rmw_subscription_options.content_filter_options
369  );
370  if (rmw_ret != RMW_RET_OK) {
371  return rcl_convert_rmw_ret_to_rcl_ret(rmw_ret);
372  }
373 
374  rmw_ret = rmw_subscription_content_filter_options_fini(
375  &content_filter_options_backup,
376  allocator
377  );
378  if (rmw_ret != RMW_RET_OK) {
379  return rcl_convert_rmw_ret_to_rcl_ret(rmw_ret);
380  }
381  }
382 
383  return ret;
384 }
385 
388 {
390  .rmw_subscription_content_filter_options =
391  rmw_get_zero_initialized_content_filter_options()
392  }; // NOLINT(readability/braces): false positive
393 }
394 
395 rcl_ret_t
397  const rcl_subscription_t * subscription,
398  const char * filter_expression,
399  size_t expression_parameters_argc,
400  const char * expression_parameter_argv[],
402 {
403  if (!rcl_subscription_is_valid(subscription)) {
405  }
406  RCL_CHECK_ARGUMENT_FOR_NULL(options, RCL_RET_INVALID_ARGUMENT);
407  const rcl_allocator_t * allocator = &subscription->impl->options.allocator;
408  RCL_CHECK_ALLOCATOR_WITH_MSG(allocator, "invalid allocator", return RCL_RET_INVALID_ARGUMENT);
409  if (expression_parameters_argc > 100) {
410  RCL_SET_ERROR_MSG("The maximum of expression parameters argument number is 100");
412  }
413 
414  rmw_ret_t rmw_ret = rmw_subscription_content_filter_options_init(
415  filter_expression,
416  expression_parameters_argc,
417  expression_parameter_argv,
418  allocator,
419  &options->rmw_subscription_content_filter_options
420  );
421 
422  return rcl_convert_rmw_ret_to_rcl_ret(rmw_ret);
423 }
424 
425 rcl_ret_t
427  const rcl_subscription_t * subscription,
428  const char * filter_expression,
429  size_t expression_parameters_argc,
430  const char * expression_parameter_argv[],
432 {
433  if (!rcl_subscription_is_valid(subscription)) {
435  }
436  if (expression_parameters_argc > 100) {
437  RCL_SET_ERROR_MSG("The maximum of expression parameters argument number is 100");
439  }
440  RCL_CHECK_ARGUMENT_FOR_NULL(options, RCL_RET_INVALID_ARGUMENT);
441  const rcl_allocator_t * allocator = &subscription->impl->options.allocator;
442  RCL_CHECK_ALLOCATOR_WITH_MSG(allocator, "invalid allocator", return RCL_RET_INVALID_ARGUMENT);
443 
444  rmw_ret_t ret = rmw_subscription_content_filter_options_set(
445  filter_expression,
446  expression_parameters_argc,
447  expression_parameter_argv,
448  allocator,
449  &options->rmw_subscription_content_filter_options
450  );
451  return rcl_convert_rmw_ret_to_rcl_ret(ret);
452 }
453 
454 rcl_ret_t
456  const rcl_subscription_t * subscription,
458 {
459  if (!rcl_subscription_is_valid(subscription)) {
461  }
462  RCL_CHECK_ARGUMENT_FOR_NULL(options, RCL_RET_INVALID_ARGUMENT);
463  const rcl_allocator_t * allocator = &subscription->impl->options.allocator;
464  RCL_CHECK_ALLOCATOR_WITH_MSG(allocator, "invalid allocator", return RCL_RET_INVALID_ARGUMENT);
465 
466  rmw_ret_t ret = rmw_subscription_content_filter_options_fini(
467  &options->rmw_subscription_content_filter_options,
468  allocator
469  );
470 
471  return rcl_convert_rmw_ret_to_rcl_ret(ret);
472 }
473 
474 bool
476 {
477  if (!rcl_subscription_is_valid(subscription)) {
478  return false;
479  }
480  return subscription->impl->rmw_handle->is_cft_enabled;
481 }
482 
483 rcl_ret_t
485  const rcl_subscription_t * subscription,
487 )
488 {
489  RCUTILS_CAN_RETURN_WITH_ERROR_OF(RCL_RET_SUBSCRIPTION_INVALID);
490  RCUTILS_CAN_RETURN_WITH_ERROR_OF(RCL_RET_INVALID_ARGUMENT);
491 
492  if (!rcl_subscription_is_valid(subscription)) {
494  }
495 
496  RCL_CHECK_ARGUMENT_FOR_NULL(options, RCL_RET_INVALID_ARGUMENT);
497  rmw_ret_t ret = rmw_subscription_set_content_filter(
498  subscription->impl->rmw_handle,
499  &options->rmw_subscription_content_filter_options);
500 
501  if (ret != RMW_RET_OK) {
502  RCL_SET_ERROR_MSG(rmw_get_error_string().str);
503  return rcl_convert_rmw_ret_to_rcl_ret(ret);
504  }
505 
506  // copy options into subscription_options
507  const rmw_subscription_content_filter_options_t * content_filter_options =
508  &options->rmw_subscription_content_filter_options;
510  content_filter_options->filter_expression,
511  content_filter_options->expression_parameters.size,
512  (const char **)content_filter_options->expression_parameters.data,
513  &subscription->impl->options
514  );
515 }
516 
517 rcl_ret_t
519  const rcl_subscription_t * subscription,
521 )
522 {
523  RCUTILS_CAN_RETURN_WITH_ERROR_OF(RCL_RET_SUBSCRIPTION_INVALID);
524  RCUTILS_CAN_RETURN_WITH_ERROR_OF(RCL_RET_INVALID_ARGUMENT);
525 
526  if (!rcl_subscription_is_valid(subscription)) {
528  }
529  RCL_CHECK_ARGUMENT_FOR_NULL(options, RCL_RET_INVALID_ARGUMENT);
530  rcl_allocator_t * allocator = &subscription->impl->options.allocator;
531  RCL_CHECK_ALLOCATOR_WITH_MSG(allocator, "invalid allocator", return RCL_RET_INVALID_ARGUMENT);
532 
533  rmw_ret_t rmw_ret = rmw_subscription_get_content_filter(
534  subscription->impl->rmw_handle,
535  allocator,
536  &options->rmw_subscription_content_filter_options);
537 
538  return rcl_convert_rmw_ret_to_rcl_ret(rmw_ret);
539 }
540 
541 rcl_ret_t
543  const rcl_subscription_t * subscription,
544  void * ros_message,
545  rmw_message_info_t * message_info,
546  rmw_subscription_allocation_t * allocation
547 )
548 {
549  RCUTILS_LOG_DEBUG_NAMED(ROS_PACKAGE_NAME, "Subscription taking message");
550  if (!rcl_subscription_is_valid(subscription)) {
551  return RCL_RET_SUBSCRIPTION_INVALID; // error message already set
552  }
553  RCL_CHECK_ARGUMENT_FOR_NULL(ros_message, RCL_RET_INVALID_ARGUMENT);
554 
555  // If message_info is NULL, use a place holder which can be discarded.
556  rmw_message_info_t dummy_message_info;
557  rmw_message_info_t * message_info_local = message_info ? message_info : &dummy_message_info;
558  *message_info_local = rmw_get_zero_initialized_message_info();
559  // Call rmw_take_with_info.
560  bool taken = false;
561  rmw_ret_t ret = rmw_take_with_info(
562  subscription->impl->rmw_handle, ros_message, &taken, message_info_local, allocation);
563  if (ret != RMW_RET_OK) {
564  RCL_SET_ERROR_MSG(rmw_get_error_string().str);
565  return rcl_convert_rmw_ret_to_rcl_ret(ret);
566  }
567  RCUTILS_LOG_DEBUG_NAMED(
568  ROS_PACKAGE_NAME, "Subscription take succeeded: %s", taken ? "true" : "false");
569  TRACETOOLS_TRACEPOINT(rcl_take, (const void *)ros_message);
570  if (!taken) {
572  }
573  return RCL_RET_OK;
574 }
575 
576 rcl_ret_t
578  const rcl_subscription_t * subscription,
579  size_t count,
580  rmw_message_sequence_t * message_sequence,
581  rmw_message_info_sequence_t * message_info_sequence,
582  rmw_subscription_allocation_t * allocation
583 )
584 {
585  RCUTILS_LOG_DEBUG_NAMED(ROS_PACKAGE_NAME, "Subscription taking %zu messages", count);
586  if (!rcl_subscription_is_valid(subscription)) {
587  return RCL_RET_SUBSCRIPTION_INVALID; // error message already set
588  }
589  RCL_CHECK_ARGUMENT_FOR_NULL(message_sequence, RCL_RET_INVALID_ARGUMENT);
590  RCL_CHECK_ARGUMENT_FOR_NULL(message_info_sequence, RCL_RET_INVALID_ARGUMENT);
591 
592  if (message_sequence->capacity < count) {
593  RCL_SET_ERROR_MSG("Insufficient message sequence capacity for requested count");
595  }
596 
597  if (message_info_sequence->capacity < count) {
598  RCL_SET_ERROR_MSG("Insufficient message info sequence capacity for requested count");
600  }
601 
602  // Set the sizes to zero to indicate that there are no valid messages
603  message_sequence->size = 0u;
604  message_info_sequence->size = 0u;
605 
606  size_t taken = 0u;
607  rmw_ret_t ret = rmw_take_sequence(
608  subscription->impl->rmw_handle, count, message_sequence, message_info_sequence, &taken,
609  allocation);
610  if (ret != RMW_RET_OK) {
611  RCL_SET_ERROR_MSG(rmw_get_error_string().str);
612  return rcl_convert_rmw_ret_to_rcl_ret(ret);
613  }
614  RCUTILS_LOG_DEBUG_NAMED(
615  ROS_PACKAGE_NAME, "Subscription took %zu messages", taken);
616  if (0u == taken) {
618  }
619  return RCL_RET_OK;
620 }
621 
622 rcl_ret_t
624  const rcl_subscription_t * subscription,
625  rcl_serialized_message_t * serialized_message,
626  rmw_message_info_t * message_info,
627  rmw_subscription_allocation_t * allocation
628 )
629 {
630  RCUTILS_LOG_DEBUG_NAMED(ROS_PACKAGE_NAME, "Subscription taking serialized message");
631  if (!rcl_subscription_is_valid(subscription)) {
632  return RCL_RET_SUBSCRIPTION_INVALID; // error already set
633  }
634  RCL_CHECK_ARGUMENT_FOR_NULL(serialized_message, RCL_RET_INVALID_ARGUMENT);
635  // If message_info is NULL, use a place holder which can be discarded.
636  rmw_message_info_t dummy_message_info;
637  rmw_message_info_t * message_info_local = message_info ? message_info : &dummy_message_info;
638  *message_info_local = rmw_get_zero_initialized_message_info();
639  // Call rmw_take_with_info.
640  bool taken = false;
641  rmw_ret_t ret = rmw_take_serialized_message_with_info(
642  subscription->impl->rmw_handle, serialized_message, &taken, message_info_local, allocation);
643  if (ret != RMW_RET_OK) {
644  RCL_SET_ERROR_MSG(rmw_get_error_string().str);
645  return rcl_convert_rmw_ret_to_rcl_ret(ret);
646  }
647  RCUTILS_LOG_DEBUG_NAMED(
648  ROS_PACKAGE_NAME, "Subscription serialized take succeeded: %s", taken ? "true" : "false");
649  TRACETOOLS_TRACEPOINT(rcl_take, (const void *)serialized_message);
650  if (!taken) {
652  }
653  return RCL_RET_OK;
654 }
655 
656 rcl_ret_t
658  const rcl_subscription_t * subscription,
659  rosidl_dynamic_typesupport_dynamic_data_t * dynamic_message,
660  rmw_message_info_t * message_info,
661  rmw_subscription_allocation_t * allocation)
662 {
663  RCUTILS_LOG_DEBUG_NAMED(ROS_PACKAGE_NAME, "Subscription taking dynamic message");
664  if (!rcl_subscription_is_valid(subscription)) {
665  return RCL_RET_SUBSCRIPTION_INVALID; // error already set
666  }
667  RCL_CHECK_ARGUMENT_FOR_NULL(dynamic_message, RCL_RET_INVALID_ARGUMENT);
668  // If message_info is NULL, use a place holder which can be discarded.
669  rmw_message_info_t dummy_message_info;
670  rmw_message_info_t * message_info_local = message_info ? message_info : &dummy_message_info;
671  *message_info_local = rmw_get_zero_initialized_message_info();
672  // Call take with info
673  bool taken = false;
674  rmw_ret_t ret = rmw_take_dynamic_message_with_info(
675  subscription->impl->rmw_handle, dynamic_message, &taken, message_info_local, allocation);
676  if (ret != RMW_RET_OK) {
677  RCL_SET_ERROR_MSG(rmw_get_error_string().str);
678  return rcl_convert_rmw_ret_to_rcl_ret(ret);
679  }
680  RCUTILS_LOG_DEBUG_NAMED(
681  ROS_PACKAGE_NAME, "Subscription dynamic take succeeded: %s", taken ? "true" : "false");
682  if (!taken) {
684  }
685  return RCL_RET_OK;
686 }
687 
688 rcl_ret_t
690  const rcl_subscription_t * subscription,
691  void ** loaned_message,
692  rmw_message_info_t * message_info,
693  rmw_subscription_allocation_t * allocation)
694 {
695  RCUTILS_LOG_DEBUG_NAMED(ROS_PACKAGE_NAME, "Subscription taking loaned message");
696  if (!rcl_subscription_is_valid(subscription)) {
697  return RCL_RET_SUBSCRIPTION_INVALID; // error already set
698  }
699  RCL_CHECK_ARGUMENT_FOR_NULL(loaned_message, RCL_RET_INVALID_ARGUMENT);
700  if (*loaned_message) {
701  RCL_SET_ERROR_MSG("loaned message is already initialized");
703  }
704  // If message_info is NULL, use a place holder which can be discarded.
705  rmw_message_info_t dummy_message_info;
706  rmw_message_info_t * message_info_local = message_info ? message_info : &dummy_message_info;
707  *message_info_local = rmw_get_zero_initialized_message_info();
708  // Call rmw_take_with_info.
709  bool taken = false;
710  rmw_ret_t ret = rmw_take_loaned_message_with_info(
711  subscription->impl->rmw_handle, loaned_message, &taken, message_info_local, allocation);
712  if (ret != RMW_RET_OK) {
713  RCL_SET_ERROR_MSG(rmw_get_error_string().str);
714  return rcl_convert_rmw_ret_to_rcl_ret(ret);
715  }
716  RCUTILS_LOG_DEBUG_NAMED(
717  ROS_PACKAGE_NAME, "Subscription loaned take succeeded: %s", taken ? "true" : "false");
718  if (!taken) {
720  }
721  return RCL_RET_OK;
722 }
723 
724 rcl_ret_t
726  const rcl_subscription_t * subscription,
727  void * loaned_message)
728 {
729  RCUTILS_LOG_DEBUG_NAMED(ROS_PACKAGE_NAME, "Subscription releasing loaned message");
730  if (!rcl_subscription_is_valid(subscription)) {
731  return RCL_RET_SUBSCRIPTION_INVALID; // error already set
732  }
733  RCL_CHECK_ARGUMENT_FOR_NULL(loaned_message, RCL_RET_INVALID_ARGUMENT);
734  return rcl_convert_rmw_ret_to_rcl_ret(
735  rmw_return_loaned_message_from_subscription(
736  subscription->impl->rmw_handle, loaned_message));
737 }
738 
739 const char *
741 {
742  if (!rcl_subscription_is_valid(subscription)) {
743  return NULL; // error already set
744  }
745  return subscription->impl->rmw_handle->topic_name;
746 }
747 
750 {
751  if (!rcl_subscription_is_valid(subscription)) {
752  return NULL; // error already set
753  }
754  return &subscription->impl->options;
755 }
756 
757 rmw_subscription_t *
759 {
760  if (!rcl_subscription_is_valid(subscription)) {
761  return NULL; // error already set
762  }
763  return subscription->impl->rmw_handle;
764 }
765 
766 bool
768 {
769  RCL_CHECK_FOR_NULL_WITH_MSG(subscription, "subscription pointer is invalid", return false);
770  RCL_CHECK_FOR_NULL_WITH_MSG(
771  subscription->impl, "subscription's implementation is invalid", return false);
772  RCL_CHECK_FOR_NULL_WITH_MSG(
773  subscription->impl->rmw_handle, "subscription's rmw handle is invalid", return false);
774  return true;
775 }
776 
777 rmw_ret_t
779  const rcl_subscription_t * subscription,
780  size_t * publisher_count)
781 {
782  RCUTILS_CAN_RETURN_WITH_ERROR_OF(RCL_RET_SUBSCRIPTION_INVALID);
783  RCUTILS_CAN_RETURN_WITH_ERROR_OF(RCL_RET_INVALID_ARGUMENT);
784 
785  if (!rcl_subscription_is_valid(subscription)) {
787  }
788  RCL_CHECK_ARGUMENT_FOR_NULL(publisher_count, RCL_RET_INVALID_ARGUMENT);
789  rmw_ret_t ret = rmw_subscription_count_matched_publishers(
790  subscription->impl->rmw_handle, publisher_count);
791 
792  if (ret != RMW_RET_OK) {
793  RCL_SET_ERROR_MSG(rmw_get_error_string().str);
794  return rcl_convert_rmw_ret_to_rcl_ret(ret);
795  }
796  return RCL_RET_OK;
797 }
798 
799 const rmw_qos_profile_t *
801 {
802  if (!rcl_subscription_is_valid(subscription)) {
803  return NULL;
804  }
805  return &subscription->impl->actual_qos;
806 }
807 
808 bool
810 {
811  if (!rcl_subscription_is_valid(subscription)) {
812  return false; // error message already set
813  }
814 
815  if (subscription->impl->options.disable_loaned_message) {
816  return false;
817  }
818 
819  return subscription->impl->rmw_handle->can_loan_messages;
820 }
821 
822 rcl_ret_t
824  const rcl_subscription_t * subscription,
825  rcl_event_callback_t callback,
826  const void * user_data)
827 {
828  if (!rcl_subscription_is_valid(subscription)) {
829  // error state already set
831  }
832 
833  return rmw_subscription_set_on_new_message_callback(
834  subscription->impl->rmw_handle,
835  callback,
836  user_data);
837 }
838 
839 #ifdef __cplusplus
840 }
841 #endif
#define rcl_get_default_allocator
Return a properly initialized rcl_allocator_t with default values.
Definition: allocator.h:37
#define RCL_CHECK_ALLOCATOR_WITH_MSG(allocator, msg, fail_statement)
Check that the given allocator is initialized, or fail with a message.
Definition: allocator.h:56
rcutils_allocator_t rcl_allocator_t
Encapsulation of an allocator.
Definition: allocator.h:31
RCL_PUBLIC bool rcl_node_is_valid(const rcl_node_t *node)
Return true if the node is valid, else false.
Definition: node.c:394
RCL_PUBLIC RCL_WARN_UNUSED rmw_node_t * rcl_node_get_rmw_handle(const rcl_node_t *node)
Return the rmw node handle.
Definition: node.c:458
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_node_resolve_name(const rcl_node_t *node, const char *input_name, rcl_allocator_t allocator, bool is_service, bool only_expand, char **output_name)
Expand a given name into a fully-qualified topic name and apply remapping rules.
RCL_PUBLIC bool rcl_node_is_valid_except_context(const rcl_node_t *node)
Return true if node is valid, except for the context being valid.
Definition: node.c:384
Structure which encapsulates a ROS Node.
Definition: node.h:45
Options available for a rcl subscription.
Definition: subscription.h:47
rcl_allocator_t allocator
Custom allocator for the subscription, used for incidental allocations.
Definition: subscription.h:52
rmw_qos_profile_t qos
Middleware quality of service settings for the subscription.
Definition: subscription.h:49
rmw_subscription_options_t rmw_subscription_options
rmw specific subscription options, e.g. the rmw implementation specific payload.
Definition: subscription.h:54
bool disable_loaned_message
Disable flag to LoanedMessage, initialized via environmental variable.
Definition: subscription.h:56
Structure which encapsulates a ROS Subscription.
Definition: subscription.h:40
rcl_subscription_impl_t * impl
Pointer to the subscription implementation.
Definition: subscription.h:42
RCL_PUBLIC RCL_WARN_UNUSED rcl_subscription_options_t rcl_subscription_get_default_options(void)
Return the default subscription options in a rcl_subscription_options_t.
Definition: subscription.c:229
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_subscription_content_filter_options_fini(const rcl_subscription_t *subscription, rcl_subscription_content_filter_options_t *options)
Reclaim rcl_subscription_content_filter_options_t structure.
Definition: subscription.c:455
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_subscription_options_fini(rcl_subscription_options_t *option)
Reclaim resources held inside rcl_subscription_options_t structure.
Definition: subscription.c:258
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_subscription_init(rcl_subscription_t *subscription, const rcl_node_t *node, const rosidl_message_type_support_t *type_support, const char *topic_name, const rcl_subscription_options_t *options)
Initialize a ROS subscription.
Definition: subscription.c:51
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_take_sequence(const rcl_subscription_t *subscription, size_t count, rmw_message_sequence_t *message_sequence, rmw_message_info_sequence_t *message_info_sequence, rmw_subscription_allocation_t *allocation)
Take a sequence of messages from a topic using a rcl subscription.
Definition: subscription.c:577
RCL_PUBLIC RCL_WARN_UNUSED const char * rcl_subscription_get_topic_name(const rcl_subscription_t *subscription)
Get the topic name for the subscription.
Definition: subscription.c:740
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_subscription_set_on_new_message_callback(const rcl_subscription_t *subscription, rcl_event_callback_t callback, const void *user_data)
Set the on new message callback function for the subscription.
Definition: subscription.c:823
RCL_PUBLIC bool rcl_subscription_can_loan_messages(const rcl_subscription_t *subscription)
Check if subscription instance can loan messages.
Definition: subscription.c:809
RCL_PUBLIC RCL_WARN_UNUSED rmw_subscription_t * rcl_subscription_get_rmw_handle(const rcl_subscription_t *subscription)
Return the rmw subscription handle.
Definition: subscription.c:758
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_subscription_fini(rcl_subscription_t *subscription, rcl_node_t *node)
Finalize a rcl_subscription_t.
Definition: subscription.c:180
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_take(const rcl_subscription_t *subscription, void *ros_message, rmw_message_info_t *message_info, rmw_subscription_allocation_t *allocation)
Take a ROS message from a topic using a rcl subscription.
Definition: subscription.c:542
RCL_PUBLIC RCL_WARN_UNUSED rmw_ret_t rcl_subscription_get_publisher_count(const rcl_subscription_t *subscription, size_t *publisher_count)
Get the number of publishers matched to a subscription.
Definition: subscription.c:778
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_subscription_options_set_content_filter_options(const char *filter_expression, size_t expression_parameters_argc, const char *expression_parameter_argv[], rcl_subscription_options_t *options)
Set the content filter options for the given subscription options.
Definition: subscription.c:280
RCL_PUBLIC bool rcl_subscription_is_valid(const rcl_subscription_t *subscription)
Check that the subscription is valid.
Definition: subscription.c:767
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_take_serialized_message(const rcl_subscription_t *subscription, rcl_serialized_message_t *serialized_message, rmw_message_info_t *message_info, rmw_subscription_allocation_t *allocation)
Take a serialized raw message from a topic using a rcl subscription.
Definition: subscription.c:623
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_take_loaned_message(const rcl_subscription_t *subscription, void **loaned_message, rmw_message_info_t *message_info, rmw_subscription_allocation_t *allocation)
Take a loaned message from a topic using a rcl subscription.
Definition: subscription.c:689
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_subscription_content_filter_options_set(const rcl_subscription_t *subscription, const char *filter_expression, size_t expression_parameters_argc, const char *expression_parameter_argv[], rcl_subscription_content_filter_options_t *options)
Set the content filter options for the given subscription options.
Definition: subscription.c:426
RCL_PUBLIC RCL_WARN_UNUSED rcl_subscription_content_filter_options_t rcl_get_zero_initialized_subscription_content_filter_options(void)
Return the zero initialized subscription content filter options.
Definition: subscription.c:387
RCL_PUBLIC RCL_WARN_UNUSED bool rcl_subscription_is_cft_enabled(const rcl_subscription_t *subscription)
Check if the content filtered topic feature is enabled in the subscription.
Definition: subscription.c:475
RCL_PUBLIC RCL_WARN_UNUSED const rmw_qos_profile_t * rcl_subscription_get_actual_qos(const rcl_subscription_t *subscription)
Get the actual qos settings of the subscription.
Definition: subscription.c:800
RCL_PUBLIC RCL_WARN_UNUSED rcl_subscription_t rcl_get_zero_initialized_subscription(void)
Return a rcl_subscription_t struct with members set to NULL.
Definition: subscription.c:43
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_subscription_get_content_filter(const rcl_subscription_t *subscription, rcl_subscription_content_filter_options_t *options)
Retrieve the filter expression of the subscription.
Definition: subscription.c:518
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_subscription_set_content_filter(const rcl_subscription_t *subscription, const rcl_subscription_content_filter_options_t *options)
Set the filter expression and expression parameters for the subscription.
Definition: subscription.c:484
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_subscription_content_filter_options_init(const rcl_subscription_t *subscription, const char *filter_expression, size_t expression_parameters_argc, const char *expression_parameter_argv[], rcl_subscription_content_filter_options_t *options)
Initialize the content filter options for the given subscription options.
Definition: subscription.c:396
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_return_loaned_message_from_subscription(const rcl_subscription_t *subscription, void *loaned_message)
Return a loaned message from a topic using a rcl subscription.
Definition: subscription.c:725
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_take_dynamic_message(const rcl_subscription_t *subscription, rosidl_dynamic_typesupport_dynamic_data_t *dynamic_message, rmw_message_info_t *message_info, rmw_subscription_allocation_t *allocation)
Take a dynamic type message from a topic using a rcl subscription.
Definition: subscription.c:657
RCL_PUBLIC RCL_WARN_UNUSED const rcl_subscription_options_t * rcl_subscription_get_options(const rcl_subscription_t *subscription)
Return the rcl subscription options.
Definition: subscription.c:749
#define RCL_RET_UNKNOWN_SUBSTITUTION
Topic name substitution is unknown.
Definition: types.h:51
#define RCL_RET_ALREADY_INIT
rcl_init() already called return code.
Definition: types.h:41
#define RCL_RET_SUBSCRIPTION_TAKE_FAILED
Failed to take a message from the subscription return code.
Definition: types.h:75
#define RCL_RET_OK
Success return code.
Definition: types.h:27
#define RCL_RET_BAD_ALLOC
Failed to allocate memory return code.
Definition: types.h:33
#define RCL_RET_INVALID_ARGUMENT
Invalid argument return code.
Definition: types.h:35
#define RCL_RET_ERROR
Unspecified error return code.
Definition: types.h:29
rmw_serialized_message_t rcl_serialized_message_t
typedef for rmw_serialized_message_t;
Definition: types.h:152
#define RCL_RET_NODE_INVALID
Invalid rcl_node_t given return code.
Definition: types.h:59
#define RCL_RET_TOPIC_NAME_INVALID
Topic name does not pass validation.
Definition: types.h:47
#define RCL_RET_SUBSCRIPTION_INVALID
Invalid rcl_subscription_t given return code.
Definition: types.h:73
rmw_ret_t rcl_ret_t
The type that holds an rcl return code.
Definition: types.h:24