ROS 2 rclcpp + rcl - jazzy  jazzy
ROS 2 C++ Client Library with ROS Client Library
subscription.c
1 // Copyright 2015 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifdef __cplusplus
16 extern "C"
17 {
18 #endif
19 
20 #include "rcl/subscription.h"
21 
22 #include <stdio.h>
23 
24 #include "rcl/error_handling.h"
25 #include "rcl/node.h"
26 #include "rcl/node_type_cache.h"
27 #include "rcutils/env.h"
28 #include "rcutils/logging_macros.h"
29 #include "rcutils/strdup.h"
30 #include "rcutils/types/string_array.h"
31 #include "rmw/error_handling.h"
32 #include "rmw/dynamic_message_type_support.h"
33 #include "rmw/subscription_content_filter_options.h"
34 #include "rmw/validate_full_topic_name.h"
35 #include "rosidl_dynamic_typesupport/identifier.h"
36 #include "tracetools/tracetools.h"
37 
38 #include "./common.h"
39 #include "./subscription_impl.h"
40 
41 
44 {
45  static rcl_subscription_t null_subscription = {0};
46  return null_subscription;
47 }
48 
51  rcl_subscription_t * subscription,
52  const rcl_node_t * node,
53  const rosidl_message_type_support_t * type_support,
54  const char * topic_name,
55  const rcl_subscription_options_t * options
56 )
57 {
58  rcl_ret_t fail_ret = RCL_RET_ERROR;
59 
60  // Check options and allocator first, so the allocator can be used in errors.
61  RCL_CHECK_ARGUMENT_FOR_NULL(options, RCL_RET_INVALID_ARGUMENT);
62  rcl_allocator_t * allocator = (rcl_allocator_t *)&options->allocator;
63  RCL_CHECK_ALLOCATOR_WITH_MSG(allocator, "invalid allocator", return RCL_RET_INVALID_ARGUMENT);
64  RCL_CHECK_ARGUMENT_FOR_NULL(subscription, RCL_RET_INVALID_ARGUMENT);
65  if (!rcl_node_is_valid(node)) {
66  return RCL_RET_NODE_INVALID; // error already set
67  }
68  RCL_CHECK_ARGUMENT_FOR_NULL(type_support, RCL_RET_INVALID_ARGUMENT);
69  RCL_CHECK_ARGUMENT_FOR_NULL(topic_name, RCL_RET_INVALID_ARGUMENT);
70  RCUTILS_LOG_DEBUG_NAMED(
71  ROS_PACKAGE_NAME, "Initializing subscription for topic name '%s'", topic_name);
72  if (subscription->impl) {
73  RCL_SET_ERROR_MSG("subscription already initialized, or memory was uninitialized");
74  return RCL_RET_ALREADY_INIT;
75  }
76 
77  // Expand and remap the given topic name.
78  char * remapped_topic_name = NULL;
80  node,
81  topic_name,
82  *allocator,
83  false,
84  false,
85  &remapped_topic_name);
86  if (ret != RCL_RET_OK) {
89  } else if (ret != RCL_RET_BAD_ALLOC) {
90  ret = RCL_RET_ERROR;
91  }
92  goto cleanup;
93  }
94  RCUTILS_LOG_DEBUG_NAMED(
95  ROS_PACKAGE_NAME, "Expanded and remapped topic name '%s'", remapped_topic_name);
96 
97  // Allocate memory for the implementation struct.
98  subscription->impl = (rcl_subscription_impl_t *)allocator->zero_allocate(
99  1, sizeof(rcl_subscription_impl_t), allocator->state);
100  RCL_CHECK_FOR_NULL_WITH_MSG(
101  subscription->impl, "allocating memory failed", ret = RCL_RET_BAD_ALLOC; goto cleanup);
102  // Fill out the implemenation struct.
103  // rmw_handle
104  // TODO(wjwwood): pass allocator once supported in rmw api.
105  subscription->impl->rmw_handle = rmw_create_subscription(
107  type_support,
108  remapped_topic_name,
109  &(options->qos),
110  &(options->rmw_subscription_options));
111  if (!subscription->impl->rmw_handle) {
112  RCL_SET_ERROR_MSG(rmw_get_error_string().str);
113  goto fail;
114  }
115  // get actual qos, and store it
116  rmw_ret_t rmw_ret = rmw_subscription_get_actual_qos(
117  subscription->impl->rmw_handle,
118  &subscription->impl->actual_qos);
119  if (RMW_RET_OK != rmw_ret) {
120  RCL_SET_ERROR_MSG(rmw_get_error_string().str);
121  goto fail;
122  }
123  subscription->impl->actual_qos.avoid_ros_namespace_conventions =
124  options->qos.avoid_ros_namespace_conventions;
125  // options
126  subscription->impl->options = *options;
127 
128  if (RCL_RET_OK != rcl_node_type_cache_register_type(
129  node, type_support->get_type_hash_func(type_support),
130  type_support->get_type_description_func(type_support),
131  type_support->get_type_description_sources_func(type_support)))
132  {
133  rcutils_reset_error();
134  RCL_SET_ERROR_MSG("Failed to register type for subscription");
135  goto fail;
136  }
137  subscription->impl->type_hash = *type_support->get_type_hash_func(type_support);
138 
139  RCUTILS_LOG_DEBUG_NAMED(ROS_PACKAGE_NAME, "Subscription initialized");
140  ret = RCL_RET_OK;
141  TRACETOOLS_TRACEPOINT(
143  (const void *)subscription,
144  (const void *)node,
145  (const void *)subscription->impl->rmw_handle,
146  remapped_topic_name,
147  options->qos.depth);
148 
149  goto cleanup;
150 fail:
151  if (subscription->impl) {
152  if (subscription->impl->rmw_handle) {
153  rmw_ret_t rmw_fail_ret = rmw_destroy_subscription(
154  rcl_node_get_rmw_handle(node), subscription->impl->rmw_handle);
155  if (RMW_RET_OK != rmw_fail_ret) {
156  RCUTILS_SAFE_FWRITE_TO_STDERR(rmw_get_error_string().str);
157  RCUTILS_SAFE_FWRITE_TO_STDERR("\n");
158  }
159  }
160 
161  ret = rcl_subscription_options_fini(&subscription->impl->options);
162  if (RCL_RET_OK != ret) {
163  RCUTILS_SAFE_FWRITE_TO_STDERR(rmw_get_error_string().str);
164  RCUTILS_SAFE_FWRITE_TO_STDERR("\n");
165  }
166 
167  allocator->deallocate(subscription->impl, allocator->state);
168  subscription->impl = NULL;
169  }
170  ret = fail_ret;
171  // Fall through to cleanup
172 cleanup:
173  allocator->deallocate(remapped_topic_name, allocator->state);
174  return ret;
175 }
176 
177 rcl_ret_t
179 {
180  RCUTILS_CAN_RETURN_WITH_ERROR_OF(RCL_RET_SUBSCRIPTION_INVALID);
181  RCUTILS_CAN_RETURN_WITH_ERROR_OF(RCL_RET_NODE_INVALID);
182  RCUTILS_CAN_RETURN_WITH_ERROR_OF(RCL_RET_INVALID_ARGUMENT);
183  RCUTILS_CAN_RETURN_WITH_ERROR_OF(RCL_RET_ERROR);
184 
185  RCUTILS_LOG_DEBUG_NAMED(ROS_PACKAGE_NAME, "Finalizing subscription");
186  rcl_ret_t result = RCL_RET_OK;
187  RCL_CHECK_ARGUMENT_FOR_NULL(subscription, RCL_RET_SUBSCRIPTION_INVALID);
189  return RCL_RET_NODE_INVALID; // error already set
190  }
191  if (subscription->impl) {
192  rcl_allocator_t allocator = subscription->impl->options.allocator;
193  rmw_node_t * rmw_node = rcl_node_get_rmw_handle(node);
194  if (!rmw_node) {
196  }
197  rmw_ret_t ret =
198  rmw_destroy_subscription(rmw_node, subscription->impl->rmw_handle);
199  if (ret != RMW_RET_OK) {
200  RCL_SET_ERROR_MSG(rmw_get_error_string().str);
201  result = RCL_RET_ERROR;
202  }
203  rcl_ret_t rcl_ret = rcl_subscription_options_fini(&subscription->impl->options);
204  if (RCL_RET_OK != rcl_ret) {
205  RCUTILS_SAFE_FWRITE_TO_STDERR(rcl_get_error_string().str);
206  RCUTILS_SAFE_FWRITE_TO_STDERR("\n");
207  result = RCL_RET_ERROR;
208  }
209 
210  if (
211  ROSIDL_TYPE_HASH_VERSION_UNSET != subscription->impl->type_hash.version &&
212  RCL_RET_OK != rcl_node_type_cache_unregister_type(node, &subscription->impl->type_hash))
213  {
214  RCUTILS_SAFE_FWRITE_TO_STDERR(rcl_get_error_string().str);
215  RCUTILS_SAFE_FWRITE_TO_STDERR("\n");
216  result = RCL_RET_ERROR;
217  }
218 
219  allocator.deallocate(subscription->impl, allocator.state);
220  subscription->impl = NULL;
221  }
222  RCUTILS_LOG_DEBUG_NAMED(ROS_PACKAGE_NAME, "Subscription finalized");
223  return result;
224 }
225 
228 {
229  // !!! MAKE SURE THAT CHANGES TO THESE DEFAULTS ARE REFLECTED IN THE HEADER DOC STRING
230  static rcl_subscription_options_t default_options;
231  // Must set these after declaration because they are not a compile time constants.
232  default_options.qos = rmw_qos_profile_default;
233  default_options.allocator = rcl_get_default_allocator();
234  default_options.rmw_subscription_options = rmw_get_default_subscription_options();
235 
236  // Load disable flag to LoanedMessage via environmental variable.
237  // TODO(clalancette): This is kind of a copy of rcl_get_disable_loaned_message(), but we need
238  // more information than that function provides.
239  default_options.disable_loaned_message = true;
240 
241  const char * env_val = NULL;
242  const char * env_error_str = rcutils_get_env(RCL_DISABLE_LOANED_MESSAGES_ENV_VAR, &env_val);
243  if (NULL != env_error_str) {
244  RCUTILS_SAFE_FWRITE_TO_STDERR("Failed to get disable_loaned_message: ");
245  RCUTILS_SAFE_FWRITE_TO_STDERR_WITH_FORMAT_STRING(
246  "Error getting env var: '" RCUTILS_STRINGIFY(RCL_DISABLE_LOANED_MESSAGES_ENV_VAR) "': %s\n",
247  env_error_str);
248  } else {
249  default_options.disable_loaned_message = !(strcmp(env_val, "0") == 0);
250  }
251 
252  return default_options;
253 }
254 
255 rcl_ret_t
257 {
258  RCL_CHECK_ARGUMENT_FOR_NULL(option, RCL_RET_INVALID_ARGUMENT);
259  // fini rmw_subscription_options_t
260  const rcl_allocator_t * allocator = &option->allocator;
261  RCL_CHECK_ALLOCATOR_WITH_MSG(allocator, "invalid allocator", return RCL_RET_INVALID_ARGUMENT);
262 
263  if (option->rmw_subscription_options.content_filter_options) {
264  rmw_ret_t ret = rmw_subscription_content_filter_options_fini(
265  option->rmw_subscription_options.content_filter_options, allocator);
266  if (RCUTILS_RET_OK != ret) {
267  RCUTILS_SAFE_FWRITE_TO_STDERR("Failed to fini content filter options.\n");
268  return rcl_convert_rmw_ret_to_rcl_ret(ret);
269  }
270  allocator->deallocate(
271  option->rmw_subscription_options.content_filter_options, allocator->state);
272  option->rmw_subscription_options.content_filter_options = NULL;
273  }
274  return RCL_RET_OK;
275 }
276 
277 rcl_ret_t
279  const char * filter_expression,
280  size_t expression_parameters_argc,
281  const char * expression_parameter_argv[],
282  rcl_subscription_options_t * options)
283 {
284  RCL_CHECK_ARGUMENT_FOR_NULL(filter_expression, RCL_RET_INVALID_ARGUMENT);
285  if (expression_parameters_argc > 100) {
286  RCL_SET_ERROR_MSG("The maximum of expression parameters argument number is 100");
288  }
289  RCL_CHECK_ARGUMENT_FOR_NULL(options, RCL_RET_INVALID_ARGUMENT);
290  const rcl_allocator_t * allocator = &options->allocator;
291  RCL_CHECK_ALLOCATOR_WITH_MSG(allocator, "invalid allocator", return RCL_RET_INVALID_ARGUMENT);
292 
293  rcl_ret_t ret;
294  rmw_ret_t rmw_ret;
295  rmw_subscription_content_filter_options_t * original_content_filter_options =
296  options->rmw_subscription_options.content_filter_options;
297  rmw_subscription_content_filter_options_t content_filter_options_backup =
298  rmw_get_zero_initialized_content_filter_options();
299 
300  if (original_content_filter_options) {
301  // make a backup, restore the data if failure happened
302  rmw_ret = rmw_subscription_content_filter_options_copy(
303  original_content_filter_options,
304  allocator,
305  &content_filter_options_backup
306  );
307  if (rmw_ret != RMW_RET_OK) {
308  return rcl_convert_rmw_ret_to_rcl_ret(rmw_ret);
309  }
310  } else {
311  options->rmw_subscription_options.content_filter_options =
312  allocator->allocate(
313  sizeof(rmw_subscription_content_filter_options_t), allocator->state);
314  if (!options->rmw_subscription_options.content_filter_options) {
315  RCL_SET_ERROR_MSG("failed to allocate memory");
316  return RCL_RET_BAD_ALLOC;
317  }
318  *options->rmw_subscription_options.content_filter_options =
319  rmw_get_zero_initialized_content_filter_options();
320  }
321 
322  rmw_ret = rmw_subscription_content_filter_options_set(
323  filter_expression,
324  expression_parameters_argc,
325  expression_parameter_argv,
326  allocator,
327  options->rmw_subscription_options.content_filter_options
328  );
329 
330  if (rmw_ret != RMW_RET_OK) {
331  ret = rcl_convert_rmw_ret_to_rcl_ret(rmw_ret);
332  goto failed;
333  }
334 
335  rmw_ret = rmw_subscription_content_filter_options_fini(
336  &content_filter_options_backup,
337  allocator
338  );
339  if (rmw_ret != RMW_RET_OK) {
340  return rcl_convert_rmw_ret_to_rcl_ret(rmw_ret);
341  }
342 
343  return RMW_RET_OK;
344 
345 failed:
346 
347  if (original_content_filter_options == NULL) {
348  if (options->rmw_subscription_options.content_filter_options) {
349  rmw_ret = rmw_subscription_content_filter_options_fini(
350  options->rmw_subscription_options.content_filter_options,
351  allocator
352  );
353 
354  if (rmw_ret != RMW_RET_OK) {
355  return rcl_convert_rmw_ret_to_rcl_ret(rmw_ret);
356  }
357 
358  allocator->deallocate(
359  options->rmw_subscription_options.content_filter_options, allocator->state);
360  options->rmw_subscription_options.content_filter_options = NULL;
361  }
362  } else {
363  rmw_ret = rmw_subscription_content_filter_options_copy(
364  &content_filter_options_backup,
365  allocator,
366  options->rmw_subscription_options.content_filter_options
367  );
368  if (rmw_ret != RMW_RET_OK) {
369  return rcl_convert_rmw_ret_to_rcl_ret(rmw_ret);
370  }
371 
372  rmw_ret = rmw_subscription_content_filter_options_fini(
373  &content_filter_options_backup,
374  allocator
375  );
376  if (rmw_ret != RMW_RET_OK) {
377  return rcl_convert_rmw_ret_to_rcl_ret(rmw_ret);
378  }
379  }
380 
381  return ret;
382 }
383 
386 {
388  .rmw_subscription_content_filter_options =
389  rmw_get_zero_initialized_content_filter_options()
390  }; // NOLINT(readability/braces): false positive
391 }
392 
393 rcl_ret_t
395  const rcl_subscription_t * subscription,
396  const char * filter_expression,
397  size_t expression_parameters_argc,
398  const char * expression_parameter_argv[],
400 {
401  if (!rcl_subscription_is_valid(subscription)) {
403  }
404  RCL_CHECK_ARGUMENT_FOR_NULL(options, RCL_RET_INVALID_ARGUMENT);
405  const rcl_allocator_t * allocator = &subscription->impl->options.allocator;
406  RCL_CHECK_ALLOCATOR_WITH_MSG(allocator, "invalid allocator", return RCL_RET_INVALID_ARGUMENT);
407  if (expression_parameters_argc > 100) {
408  RCL_SET_ERROR_MSG("The maximum of expression parameters argument number is 100");
410  }
411 
412  rmw_ret_t rmw_ret = rmw_subscription_content_filter_options_init(
413  filter_expression,
414  expression_parameters_argc,
415  expression_parameter_argv,
416  allocator,
417  &options->rmw_subscription_content_filter_options
418  );
419 
420  return rcl_convert_rmw_ret_to_rcl_ret(rmw_ret);
421 }
422 
423 rcl_ret_t
425  const rcl_subscription_t * subscription,
426  const char * filter_expression,
427  size_t expression_parameters_argc,
428  const char * expression_parameter_argv[],
430 {
431  if (!rcl_subscription_is_valid(subscription)) {
433  }
434  if (expression_parameters_argc > 100) {
435  RCL_SET_ERROR_MSG("The maximum of expression parameters argument number is 100");
437  }
438  RCL_CHECK_ARGUMENT_FOR_NULL(options, RCL_RET_INVALID_ARGUMENT);
439  const rcl_allocator_t * allocator = &subscription->impl->options.allocator;
440  RCL_CHECK_ALLOCATOR_WITH_MSG(allocator, "invalid allocator", return RCL_RET_INVALID_ARGUMENT);
441 
442  rmw_ret_t ret = rmw_subscription_content_filter_options_set(
443  filter_expression,
444  expression_parameters_argc,
445  expression_parameter_argv,
446  allocator,
447  &options->rmw_subscription_content_filter_options
448  );
449  return rcl_convert_rmw_ret_to_rcl_ret(ret);
450 }
451 
452 rcl_ret_t
454  const rcl_subscription_t * subscription,
456 {
457  if (!rcl_subscription_is_valid(subscription)) {
459  }
460  RCL_CHECK_ARGUMENT_FOR_NULL(options, RCL_RET_INVALID_ARGUMENT);
461  const rcl_allocator_t * allocator = &subscription->impl->options.allocator;
462  RCL_CHECK_ALLOCATOR_WITH_MSG(allocator, "invalid allocator", return RCL_RET_INVALID_ARGUMENT);
463 
464  rmw_ret_t ret = rmw_subscription_content_filter_options_fini(
465  &options->rmw_subscription_content_filter_options,
466  allocator
467  );
468 
469  return rcl_convert_rmw_ret_to_rcl_ret(ret);
470 }
471 
472 bool
474 {
475  if (!rcl_subscription_is_valid(subscription)) {
476  return false;
477  }
478  return subscription->impl->rmw_handle->is_cft_enabled;
479 }
480 
481 rcl_ret_t
483  const rcl_subscription_t * subscription,
485 )
486 {
487  RCUTILS_CAN_RETURN_WITH_ERROR_OF(RCL_RET_SUBSCRIPTION_INVALID);
488  RCUTILS_CAN_RETURN_WITH_ERROR_OF(RCL_RET_INVALID_ARGUMENT);
489 
490  if (!rcl_subscription_is_valid(subscription)) {
492  }
493 
494  RCL_CHECK_ARGUMENT_FOR_NULL(options, RCL_RET_INVALID_ARGUMENT);
495  rmw_ret_t ret = rmw_subscription_set_content_filter(
496  subscription->impl->rmw_handle,
497  &options->rmw_subscription_content_filter_options);
498 
499  if (ret != RMW_RET_OK) {
500  RCL_SET_ERROR_MSG(rmw_get_error_string().str);
501  return rcl_convert_rmw_ret_to_rcl_ret(ret);
502  }
503 
504  // copy options into subscription_options
505  const rmw_subscription_content_filter_options_t * content_filter_options =
506  &options->rmw_subscription_content_filter_options;
508  content_filter_options->filter_expression,
509  content_filter_options->expression_parameters.size,
510  (const char **)content_filter_options->expression_parameters.data,
511  &subscription->impl->options
512  );
513 }
514 
515 rcl_ret_t
517  const rcl_subscription_t * subscription,
519 )
520 {
521  RCUTILS_CAN_RETURN_WITH_ERROR_OF(RCL_RET_SUBSCRIPTION_INVALID);
522  RCUTILS_CAN_RETURN_WITH_ERROR_OF(RCL_RET_INVALID_ARGUMENT);
523 
524  if (!rcl_subscription_is_valid(subscription)) {
526  }
527  RCL_CHECK_ARGUMENT_FOR_NULL(options, RCL_RET_INVALID_ARGUMENT);
528  rcl_allocator_t * allocator = &subscription->impl->options.allocator;
529  RCL_CHECK_ALLOCATOR_WITH_MSG(allocator, "invalid allocator", return RCL_RET_INVALID_ARGUMENT);
530 
531  rmw_ret_t rmw_ret = rmw_subscription_get_content_filter(
532  subscription->impl->rmw_handle,
533  allocator,
534  &options->rmw_subscription_content_filter_options);
535 
536  return rcl_convert_rmw_ret_to_rcl_ret(rmw_ret);
537 }
538 
539 rcl_ret_t
541  const rcl_subscription_t * subscription,
542  void * ros_message,
543  rmw_message_info_t * message_info,
544  rmw_subscription_allocation_t * allocation
545 )
546 {
547  RCUTILS_LOG_DEBUG_NAMED(ROS_PACKAGE_NAME, "Subscription taking message");
548  if (!rcl_subscription_is_valid(subscription)) {
549  return RCL_RET_SUBSCRIPTION_INVALID; // error message already set
550  }
551  RCL_CHECK_ARGUMENT_FOR_NULL(ros_message, RCL_RET_INVALID_ARGUMENT);
552 
553  // If message_info is NULL, use a place holder which can be discarded.
554  rmw_message_info_t dummy_message_info;
555  rmw_message_info_t * message_info_local = message_info ? message_info : &dummy_message_info;
556  *message_info_local = rmw_get_zero_initialized_message_info();
557  // Call rmw_take_with_info.
558  bool taken = false;
559  rmw_ret_t ret = rmw_take_with_info(
560  subscription->impl->rmw_handle, ros_message, &taken, message_info_local, allocation);
561  if (ret != RMW_RET_OK) {
562  RCL_SET_ERROR_MSG(rmw_get_error_string().str);
563  return rcl_convert_rmw_ret_to_rcl_ret(ret);
564  }
565  RCUTILS_LOG_DEBUG_NAMED(
566  ROS_PACKAGE_NAME, "Subscription take succeeded: %s", taken ? "true" : "false");
567  TRACETOOLS_TRACEPOINT(rcl_take, (const void *)ros_message);
568  if (!taken) {
570  }
571  return RCL_RET_OK;
572 }
573 
574 rcl_ret_t
576  const rcl_subscription_t * subscription,
577  size_t count,
578  rmw_message_sequence_t * message_sequence,
579  rmw_message_info_sequence_t * message_info_sequence,
580  rmw_subscription_allocation_t * allocation
581 )
582 {
583  RCUTILS_LOG_DEBUG_NAMED(ROS_PACKAGE_NAME, "Subscription taking %zu messages", count);
584  if (!rcl_subscription_is_valid(subscription)) {
585  return RCL_RET_SUBSCRIPTION_INVALID; // error message already set
586  }
587  RCL_CHECK_ARGUMENT_FOR_NULL(message_sequence, RCL_RET_INVALID_ARGUMENT);
588  RCL_CHECK_ARGUMENT_FOR_NULL(message_info_sequence, RCL_RET_INVALID_ARGUMENT);
589 
590  if (message_sequence->capacity < count) {
591  RCL_SET_ERROR_MSG("Insufficient message sequence capacity for requested count");
593  }
594 
595  if (message_info_sequence->capacity < count) {
596  RCL_SET_ERROR_MSG("Insufficient message info sequence capacity for requested count");
598  }
599 
600  // Set the sizes to zero to indicate that there are no valid messages
601  message_sequence->size = 0u;
602  message_info_sequence->size = 0u;
603 
604  size_t taken = 0u;
605  rmw_ret_t ret = rmw_take_sequence(
606  subscription->impl->rmw_handle, count, message_sequence, message_info_sequence, &taken,
607  allocation);
608  if (ret != RMW_RET_OK) {
609  RCL_SET_ERROR_MSG(rmw_get_error_string().str);
610  return rcl_convert_rmw_ret_to_rcl_ret(ret);
611  }
612  RCUTILS_LOG_DEBUG_NAMED(
613  ROS_PACKAGE_NAME, "Subscription took %zu messages", taken);
614  if (0u == taken) {
616  }
617  return RCL_RET_OK;
618 }
619 
620 rcl_ret_t
622  const rcl_subscription_t * subscription,
623  rcl_serialized_message_t * serialized_message,
624  rmw_message_info_t * message_info,
625  rmw_subscription_allocation_t * allocation
626 )
627 {
628  RCUTILS_LOG_DEBUG_NAMED(ROS_PACKAGE_NAME, "Subscription taking serialized message");
629  if (!rcl_subscription_is_valid(subscription)) {
630  return RCL_RET_SUBSCRIPTION_INVALID; // error already set
631  }
632  RCL_CHECK_ARGUMENT_FOR_NULL(serialized_message, RCL_RET_INVALID_ARGUMENT);
633  // If message_info is NULL, use a place holder which can be discarded.
634  rmw_message_info_t dummy_message_info;
635  rmw_message_info_t * message_info_local = message_info ? message_info : &dummy_message_info;
636  *message_info_local = rmw_get_zero_initialized_message_info();
637  // Call rmw_take_with_info.
638  bool taken = false;
639  rmw_ret_t ret = rmw_take_serialized_message_with_info(
640  subscription->impl->rmw_handle, serialized_message, &taken, message_info_local, allocation);
641  if (ret != RMW_RET_OK) {
642  RCL_SET_ERROR_MSG(rmw_get_error_string().str);
643  return rcl_convert_rmw_ret_to_rcl_ret(ret);
644  }
645  RCUTILS_LOG_DEBUG_NAMED(
646  ROS_PACKAGE_NAME, "Subscription serialized take succeeded: %s", taken ? "true" : "false");
647  TRACETOOLS_TRACEPOINT(rcl_take, (const void *)serialized_message);
648  if (!taken) {
650  }
651  return RCL_RET_OK;
652 }
653 
654 rcl_ret_t
656  const rcl_subscription_t * subscription,
657  rosidl_dynamic_typesupport_dynamic_data_t * dynamic_message,
658  rmw_message_info_t * message_info,
659  rmw_subscription_allocation_t * allocation)
660 {
661  RCUTILS_LOG_DEBUG_NAMED(ROS_PACKAGE_NAME, "Subscription taking dynamic message");
662  if (!rcl_subscription_is_valid(subscription)) {
663  return RCL_RET_SUBSCRIPTION_INVALID; // error already set
664  }
665  RCL_CHECK_ARGUMENT_FOR_NULL(dynamic_message, RCL_RET_INVALID_ARGUMENT);
666  // If message_info is NULL, use a place holder which can be discarded.
667  rmw_message_info_t dummy_message_info;
668  rmw_message_info_t * message_info_local = message_info ? message_info : &dummy_message_info;
669  *message_info_local = rmw_get_zero_initialized_message_info();
670  // Call take with info
671  bool taken = false;
672  rmw_ret_t ret = rmw_take_dynamic_message_with_info(
673  subscription->impl->rmw_handle, dynamic_message, &taken, message_info_local, allocation);
674  if (ret != RMW_RET_OK) {
675  RCL_SET_ERROR_MSG(rmw_get_error_string().str);
676  return rcl_convert_rmw_ret_to_rcl_ret(ret);
677  }
678  RCUTILS_LOG_DEBUG_NAMED(
679  ROS_PACKAGE_NAME, "Subscription dynamic take succeeded: %s", taken ? "true" : "false");
680  if (!taken) {
682  }
683  return RCL_RET_OK;
684 }
685 
686 rcl_ret_t
688  const rcl_subscription_t * subscription,
689  void ** loaned_message,
690  rmw_message_info_t * message_info,
691  rmw_subscription_allocation_t * allocation)
692 {
693  RCUTILS_LOG_DEBUG_NAMED(ROS_PACKAGE_NAME, "Subscription taking loaned message");
694  if (!rcl_subscription_is_valid(subscription)) {
695  return RCL_RET_SUBSCRIPTION_INVALID; // error already set
696  }
697  RCL_CHECK_ARGUMENT_FOR_NULL(loaned_message, RCL_RET_INVALID_ARGUMENT);
698  if (*loaned_message) {
699  RCL_SET_ERROR_MSG("loaned message is already initialized");
701  }
702  // If message_info is NULL, use a place holder which can be discarded.
703  rmw_message_info_t dummy_message_info;
704  rmw_message_info_t * message_info_local = message_info ? message_info : &dummy_message_info;
705  *message_info_local = rmw_get_zero_initialized_message_info();
706  // Call rmw_take_with_info.
707  bool taken = false;
708  rmw_ret_t ret = rmw_take_loaned_message_with_info(
709  subscription->impl->rmw_handle, loaned_message, &taken, message_info_local, allocation);
710  if (ret != RMW_RET_OK) {
711  RCL_SET_ERROR_MSG(rmw_get_error_string().str);
712  return rcl_convert_rmw_ret_to_rcl_ret(ret);
713  }
714  RCUTILS_LOG_DEBUG_NAMED(
715  ROS_PACKAGE_NAME, "Subscription loaned take succeeded: %s", taken ? "true" : "false");
716  if (!taken) {
718  }
719  return RCL_RET_OK;
720 }
721 
722 rcl_ret_t
724  const rcl_subscription_t * subscription,
725  void * loaned_message)
726 {
727  RCUTILS_LOG_DEBUG_NAMED(ROS_PACKAGE_NAME, "Subscription releasing loaned message");
728  if (!rcl_subscription_is_valid(subscription)) {
729  return RCL_RET_SUBSCRIPTION_INVALID; // error already set
730  }
731  RCL_CHECK_ARGUMENT_FOR_NULL(loaned_message, RCL_RET_INVALID_ARGUMENT);
732  return rcl_convert_rmw_ret_to_rcl_ret(
733  rmw_return_loaned_message_from_subscription(
734  subscription->impl->rmw_handle, loaned_message));
735 }
736 
737 const char *
739 {
740  if (!rcl_subscription_is_valid(subscription)) {
741  return NULL; // error already set
742  }
743  return subscription->impl->rmw_handle->topic_name;
744 }
745 
748 {
749  if (!rcl_subscription_is_valid(subscription)) {
750  return NULL; // error already set
751  }
752  return &subscription->impl->options;
753 }
754 
755 rmw_subscription_t *
757 {
758  if (!rcl_subscription_is_valid(subscription)) {
759  return NULL; // error already set
760  }
761  return subscription->impl->rmw_handle;
762 }
763 
764 bool
766 {
767  RCL_CHECK_FOR_NULL_WITH_MSG(subscription, "subscription pointer is invalid", return false);
768  RCL_CHECK_FOR_NULL_WITH_MSG(
769  subscription->impl, "subscription's implementation is invalid", return false);
770  RCL_CHECK_FOR_NULL_WITH_MSG(
771  subscription->impl->rmw_handle, "subscription's rmw handle is invalid", return false);
772  return true;
773 }
774 
775 rmw_ret_t
777  const rcl_subscription_t * subscription,
778  size_t * publisher_count)
779 {
780  RCUTILS_CAN_RETURN_WITH_ERROR_OF(RCL_RET_SUBSCRIPTION_INVALID);
781  RCUTILS_CAN_RETURN_WITH_ERROR_OF(RCL_RET_INVALID_ARGUMENT);
782 
783  if (!rcl_subscription_is_valid(subscription)) {
785  }
786  RCL_CHECK_ARGUMENT_FOR_NULL(publisher_count, RCL_RET_INVALID_ARGUMENT);
787  rmw_ret_t ret = rmw_subscription_count_matched_publishers(
788  subscription->impl->rmw_handle, publisher_count);
789 
790  if (ret != RMW_RET_OK) {
791  RCL_SET_ERROR_MSG(rmw_get_error_string().str);
792  return rcl_convert_rmw_ret_to_rcl_ret(ret);
793  }
794  return RCL_RET_OK;
795 }
796 
797 const rmw_qos_profile_t *
799 {
800  if (!rcl_subscription_is_valid(subscription)) {
801  return NULL;
802  }
803  return &subscription->impl->actual_qos;
804 }
805 
806 bool
808 {
809  if (!rcl_subscription_is_valid(subscription)) {
810  return false; // error message already set
811  }
812 
813  if (subscription->impl->options.disable_loaned_message) {
814  return false;
815  }
816 
817  return subscription->impl->rmw_handle->can_loan_messages;
818 }
819 
820 rcl_ret_t
822  const rcl_subscription_t * subscription,
823  rcl_event_callback_t callback,
824  const void * user_data)
825 {
826  if (!rcl_subscription_is_valid(subscription)) {
827  // error state already set
829  }
830 
831  return rmw_subscription_set_on_new_message_callback(
832  subscription->impl->rmw_handle,
833  callback,
834  user_data);
835 }
836 
837 #ifdef __cplusplus
838 }
839 #endif
#define rcl_get_default_allocator
Return a properly initialized rcl_allocator_t with default values.
Definition: allocator.h:37
#define RCL_CHECK_ALLOCATOR_WITH_MSG(allocator, msg, fail_statement)
Check that the given allocator is initialized, or fail with a message.
Definition: allocator.h:56
rcutils_allocator_t rcl_allocator_t
Encapsulation of an allocator.
Definition: allocator.h:31
RCL_PUBLIC bool rcl_node_is_valid(const rcl_node_t *node)
Return true if the node is valid, else false.
Definition: node.c:397
RCL_PUBLIC RCL_WARN_UNUSED rmw_node_t * rcl_node_get_rmw_handle(const rcl_node_t *node)
Return the rmw node handle.
Definition: node.c:461
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_node_resolve_name(const rcl_node_t *node, const char *input_name, rcl_allocator_t allocator, bool is_service, bool only_expand, char **output_name)
Expand a given name into a fully-qualified topic name and apply remapping rules.
RCL_PUBLIC bool rcl_node_is_valid_except_context(const rcl_node_t *node)
Return true if node is valid, except for the context being valid.
Definition: node.c:387
Structure which encapsulates a ROS Node.
Definition: node.h:45
Options available for a rcl subscription.
Definition: subscription.h:47
rcl_allocator_t allocator
Custom allocator for the subscription, used for incidental allocations.
Definition: subscription.h:52
rmw_qos_profile_t qos
Middleware quality of service settings for the subscription.
Definition: subscription.h:49
rmw_subscription_options_t rmw_subscription_options
rmw specific subscription options, e.g. the rmw implementation specific payload.
Definition: subscription.h:54
bool disable_loaned_message
Disable flag to LoanedMessage, initialized via environmental variable.
Definition: subscription.h:56
Structure which encapsulates a ROS Subscription.
Definition: subscription.h:40
rcl_subscription_impl_t * impl
Pointer to the subscription implementation.
Definition: subscription.h:42
RCL_PUBLIC RCL_WARN_UNUSED rcl_subscription_options_t rcl_subscription_get_default_options(void)
Return the default subscription options in a rcl_subscription_options_t.
Definition: subscription.c:227
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_subscription_content_filter_options_fini(const rcl_subscription_t *subscription, rcl_subscription_content_filter_options_t *options)
Reclaim rcl_subscription_content_filter_options_t structure.
Definition: subscription.c:453
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_subscription_options_fini(rcl_subscription_options_t *option)
Reclaim resources held inside rcl_subscription_options_t structure.
Definition: subscription.c:256
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_subscription_init(rcl_subscription_t *subscription, const rcl_node_t *node, const rosidl_message_type_support_t *type_support, const char *topic_name, const rcl_subscription_options_t *options)
Initialize a ROS subscription.
Definition: subscription.c:50
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_take_sequence(const rcl_subscription_t *subscription, size_t count, rmw_message_sequence_t *message_sequence, rmw_message_info_sequence_t *message_info_sequence, rmw_subscription_allocation_t *allocation)
Take a sequence of messages from a topic using a rcl subscription.
Definition: subscription.c:575
RCL_PUBLIC RCL_WARN_UNUSED const char * rcl_subscription_get_topic_name(const rcl_subscription_t *subscription)
Get the topic name for the subscription.
Definition: subscription.c:738
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_subscription_set_on_new_message_callback(const rcl_subscription_t *subscription, rcl_event_callback_t callback, const void *user_data)
Set the on new message callback function for the subscription.
Definition: subscription.c:821
RCL_PUBLIC bool rcl_subscription_can_loan_messages(const rcl_subscription_t *subscription)
Check if subscription instance can loan messages.
Definition: subscription.c:807
RCL_PUBLIC RCL_WARN_UNUSED rmw_subscription_t * rcl_subscription_get_rmw_handle(const rcl_subscription_t *subscription)
Return the rmw subscription handle.
Definition: subscription.c:756
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_subscription_fini(rcl_subscription_t *subscription, rcl_node_t *node)
Finalize a rcl_subscription_t.
Definition: subscription.c:178
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_take(const rcl_subscription_t *subscription, void *ros_message, rmw_message_info_t *message_info, rmw_subscription_allocation_t *allocation)
Take a ROS message from a topic using a rcl subscription.
Definition: subscription.c:540
RCL_PUBLIC RCL_WARN_UNUSED rmw_ret_t rcl_subscription_get_publisher_count(const rcl_subscription_t *subscription, size_t *publisher_count)
Get the number of publishers matched to a subscription.
Definition: subscription.c:776
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_subscription_options_set_content_filter_options(const char *filter_expression, size_t expression_parameters_argc, const char *expression_parameter_argv[], rcl_subscription_options_t *options)
Set the content filter options for the given subscription options.
Definition: subscription.c:278
RCL_PUBLIC bool rcl_subscription_is_valid(const rcl_subscription_t *subscription)
Check that the subscription is valid.
Definition: subscription.c:765
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_take_serialized_message(const rcl_subscription_t *subscription, rcl_serialized_message_t *serialized_message, rmw_message_info_t *message_info, rmw_subscription_allocation_t *allocation)
Take a serialized raw message from a topic using a rcl subscription.
Definition: subscription.c:621
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_take_loaned_message(const rcl_subscription_t *subscription, void **loaned_message, rmw_message_info_t *message_info, rmw_subscription_allocation_t *allocation)
Take a loaned message from a topic using a rcl subscription.
Definition: subscription.c:687
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_subscription_content_filter_options_set(const rcl_subscription_t *subscription, const char *filter_expression, size_t expression_parameters_argc, const char *expression_parameter_argv[], rcl_subscription_content_filter_options_t *options)
Set the content filter options for the given subscription options.
Definition: subscription.c:424
RCL_PUBLIC RCL_WARN_UNUSED rcl_subscription_content_filter_options_t rcl_get_zero_initialized_subscription_content_filter_options(void)
Return the zero initialized subscription content filter options.
Definition: subscription.c:385
RCL_PUBLIC RCL_WARN_UNUSED bool rcl_subscription_is_cft_enabled(const rcl_subscription_t *subscription)
Check if the content filtered topic feature is enabled in the subscription.
Definition: subscription.c:473
RCL_PUBLIC RCL_WARN_UNUSED const rmw_qos_profile_t * rcl_subscription_get_actual_qos(const rcl_subscription_t *subscription)
Get the actual qos settings of the subscription.
Definition: subscription.c:798
RCL_PUBLIC RCL_WARN_UNUSED rcl_subscription_t rcl_get_zero_initialized_subscription(void)
Return a rcl_subscription_t struct with members set to NULL.
Definition: subscription.c:43
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_subscription_get_content_filter(const rcl_subscription_t *subscription, rcl_subscription_content_filter_options_t *options)
Retrieve the filter expression of the subscription.
Definition: subscription.c:516
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_subscription_set_content_filter(const rcl_subscription_t *subscription, const rcl_subscription_content_filter_options_t *options)
Set the filter expression and expression parameters for the subscription.
Definition: subscription.c:482
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_subscription_content_filter_options_init(const rcl_subscription_t *subscription, const char *filter_expression, size_t expression_parameters_argc, const char *expression_parameter_argv[], rcl_subscription_content_filter_options_t *options)
Initialize the content filter options for the given subscription options.
Definition: subscription.c:394
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_return_loaned_message_from_subscription(const rcl_subscription_t *subscription, void *loaned_message)
Return a loaned message from a topic using a rcl subscription.
Definition: subscription.c:723
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_take_dynamic_message(const rcl_subscription_t *subscription, rosidl_dynamic_typesupport_dynamic_data_t *dynamic_message, rmw_message_info_t *message_info, rmw_subscription_allocation_t *allocation)
Take a dynamic type message from a topic using a rcl subscription.
Definition: subscription.c:655
RCL_PUBLIC RCL_WARN_UNUSED const rcl_subscription_options_t * rcl_subscription_get_options(const rcl_subscription_t *subscription)
Return the rcl subscription options.
Definition: subscription.c:747
#define RCL_RET_UNKNOWN_SUBSTITUTION
Topic name substitution is unknown.
Definition: types.h:51
#define RCL_RET_ALREADY_INIT
rcl_init() already called return code.
Definition: types.h:41
#define RCL_RET_SUBSCRIPTION_TAKE_FAILED
Failed to take a message from the subscription return code.
Definition: types.h:75
#define RCL_RET_OK
Success return code.
Definition: types.h:27
#define RCL_RET_BAD_ALLOC
Failed to allocate memory return code.
Definition: types.h:33
#define RCL_RET_INVALID_ARGUMENT
Invalid argument return code.
Definition: types.h:35
#define RCL_RET_ERROR
Unspecified error return code.
Definition: types.h:29
rmw_serialized_message_t rcl_serialized_message_t
typedef for rmw_serialized_message_t;
Definition: types.h:134
#define RCL_RET_NODE_INVALID
Invalid rcl_node_t given return code.
Definition: types.h:59
#define RCL_RET_TOPIC_NAME_INVALID
Topic name does not pass validation.
Definition: types.h:47
#define RCL_RET_SUBSCRIPTION_INVALID
Invalid rcl_subscription_t given return code.
Definition: types.h:73
rmw_ret_t rcl_ret_t
The type that holds an rcl return code.
Definition: types.h:24