22 #include <type_traits>
27 #include "rcl/error_handling.h"
28 #include "rclcpp/executors/executor_notify_waitable.hpp"
29 #include "rclcpp/subscription_wait_set_mask.hpp"
30 #include "rcpputils/scope_exit.hpp"
32 #include "rclcpp/dynamic_typesupport/dynamic_message.hpp"
33 #include "rclcpp/exceptions.hpp"
34 #include "rclcpp/executor.hpp"
35 #include "rclcpp/guard_condition.hpp"
36 #include "rclcpp/node.hpp"
37 #include "rclcpp/utilities.hpp"
39 #include "rcutils/logging_macros.h"
41 #include "tracetools/tracetools.h"
43 using namespace std::chrono_literals;
53 Executor::Executor(
const std::shared_ptr<rclcpp::Context> & context)
55 entities_need_rebuild_(true),
57 wait_set_({}, {}, {}, {}, {}, {}, context)
65 context_(options.context),
66 notify_waitable_(std::make_shared<
rclcpp::executors::ExecutorNotifyWaitable>(
68 this->entities_need_rebuild_.store(
true);
70 entities_need_rebuild_(
true),
71 collector_(notify_waitable_),
72 wait_set_({}, {}, {}, {}, {}, {}, options.context),
73 current_notify_waitable_(notify_waitable_),
74 impl_(std::make_unique<rclcpp::ExecutorImplementation>())
76 shutdown_callback_handle_ = context_->add_on_shutdown_callback(
77 [weak_gc = std::weak_ptr<rclcpp::GuardCondition>{shutdown_guard_condition_}]() {
78 auto strong_gc = weak_gc.lock();
84 notify_waitable_->add_guard_condition(interrupt_guard_condition_);
85 notify_waitable_->add_guard_condition(shutdown_guard_condition_);
94 std::lock_guard<std::mutex> guard(mutex_);
98 current_collection_.timers.update(
100 [
this](
auto timer) {wait_set_.remove_timer(timer);});
102 current_collection_.subscriptions.update(
104 [
this](
auto subscription) {
105 wait_set_.remove_subscription(subscription, kDefaultSubscriptionMask);
108 current_collection_.clients.update(
110 [
this](
auto client) {wait_set_.remove_client(client);});
112 current_collection_.services.update(
114 [
this](
auto service) {wait_set_.remove_service(service);});
116 current_collection_.guard_conditions.update(
118 [
this](
auto guard_condition) {wait_set_.remove_guard_condition(guard_condition);});
120 current_collection_.waitables.update(
122 [
this](
auto waitable) {wait_set_.remove_waitable(waitable);});
126 RCUTILS_LOG_ERROR_NAMED(
128 "failed to remove registered on_shutdown callback");
136 this->entities_need_rebuild_.store(
true);
138 if (!
spinning.load() && entities_need_rebuild_.exchange(
false)) {
139 std::lock_guard<std::mutex> guard(mutex_);
148 std::vector<rclcpp::CallbackGroup::WeakPtr>
155 std::vector<rclcpp::CallbackGroup::WeakPtr>
162 std::vector<rclcpp::CallbackGroup::WeakPtr>
171 rclcpp::CallbackGroup::SharedPtr group_ptr,
172 [[maybe_unused]] rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
180 throw std::runtime_error(
182 "Failed to handle entities update on callback group add: ") + ex.what());
194 throw std::runtime_error(
196 "Failed to handle entities update on node add: ") + ex.what());
202 rclcpp::CallbackGroup::SharedPtr group_ptr,
210 throw std::runtime_error(
212 "Failed to handle entities update on callback group remove: ") + ex.what());
219 this->
add_node(node_ptr->get_node_base_interface(), notify);
230 throw std::runtime_error(
232 "Failed to handle entities update on node remove: ") + ex.what());
239 this->
remove_node(node_ptr->get_node_base_interface(), notify);
244 rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node,
245 std::chrono::nanoseconds timeout)
255 std::chrono::nanoseconds timeout,
256 const std::function<std::future_status(std::chrono::nanoseconds wait_time)> & wait_for_future)
263 std::future_status status = wait_for_future(std::chrono::seconds(0));
264 if (status == std::future_status::ready) {
265 return FutureReturnCode::SUCCESS;
268 auto end_time = std::chrono::steady_clock::now();
269 std::chrono::nanoseconds timeout_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
271 if (timeout_ns > std::chrono::nanoseconds::zero()) {
272 end_time += timeout_ns;
274 std::chrono::nanoseconds timeout_left = timeout_ns;
277 throw std::runtime_error(
"spin_until_future_complete() called while already spinning");
279 RCPPUTILS_SCOPE_EXIT(wait_result_.reset();this->spinning.store(
false););
282 spin_once_impl(timeout_left);
285 status = wait_for_future(std::chrono::seconds(0));
286 if (status == std::future_status::ready) {
287 return FutureReturnCode::SUCCESS;
290 if (timeout_ns < std::chrono::nanoseconds::zero()) {
294 auto now = std::chrono::steady_clock::now();
295 if (now >= end_time) {
296 return FutureReturnCode::TIMEOUT;
299 timeout_left = std::chrono::duration_cast<std::chrono::nanoseconds>(end_time - now);
303 return FutureReturnCode::INTERRUPTED;
327 rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node,
328 std::chrono::nanoseconds max_duration)
338 this->
spin_node_all(node->get_node_base_interface(), max_duration);
343 if (max_duration < 0ns) {
344 throw std::invalid_argument(
"max_duration must be greater than or equal to 0");
352 auto start = std::chrono::steady_clock::now();
353 auto max_duration_not_elapsed = [max_duration, start]() {
354 if (std::chrono::nanoseconds(0) == max_duration) {
357 }
else if (std::chrono::steady_clock::now() - start < max_duration) {
366 throw std::runtime_error(
"spin_some() called while already spinning");
368 RCPPUTILS_SCOPE_EXIT(wait_result_.reset();this->spinning.store(
false););
373 wait_result_.reset();
375 bool entity_states_fully_polled =
true;
377 if (entities_need_rebuild_) {
381 entity_states_fully_polled =
false;
406 entity_states_fully_polled =
false;
409 wait_result_.reset();
411 if (entity_states_fully_polled) {
421 entity_states_fully_polled =
true;
422 if (entities_need_rebuild_) {
426 entity_states_fully_polled =
false;
436 Executor::spin_once_impl(std::chrono::nanoseconds timeout)
448 throw std::runtime_error(
"spin_once() called while already spinning");
450 RCPPUTILS_SCOPE_EXIT(wait_result_.reset();this->spinning.store(
false););
451 spin_once_impl(timeout);
461 throw std::runtime_error(
462 std::string(
"Failed to trigger guard condition in cancel: ") + ex.what());
474 (
void(
"cannot execute an AnyExecutable without a valid callback group"),
475 any_exec.callback_group));
477 if (any_exec.timer) {
478 TRACETOOLS_TRACEPOINT(
479 rclcpp_executor_execute,
480 static_cast<const void *
>(any_exec.timer->get_timer_handle().get()));
483 if (any_exec.subscription) {
484 TRACETOOLS_TRACEPOINT(
485 rclcpp_executor_execute,
486 static_cast<const void *
>(any_exec.subscription->get_subscription_handle().get()));
489 if (any_exec.service) {
492 if (any_exec.client) {
495 if (any_exec.waitable) {
496 const std::shared_ptr<void> & const_data = any_exec.data;
497 any_exec.waitable->execute(const_data);
501 any_exec.callback_group->can_be_taken_from().store(
true);
504 template<
typename Taker,
typename Handler>
507 take_and_do_error_handling(
508 const char * action_description,
509 const char * topic_or_service_name,
511 Handler handle_action)
515 taken = take_action();
519 "executor %s '%s' unexpectedly failed: %s",
521 topic_or_service_name,
535 "executor %s '%s' failed to take anything",
537 topic_or_service_name);
549 switch (subscription->get_delivered_message_kind()) {
551 case rclcpp::DeliveredMessageKind::ROS_MESSAGE:
553 if (subscription->can_loan_messages()) {
557 void * loaned_msg =
nullptr;
560 take_and_do_error_handling(
561 "taking a loaned message from topic",
562 subscription->get_topic_name(),
565 rcl_ret_t ret = rcl_take_loaned_message(
566 subscription->get_subscription_handle().get(),
568 &message_info.get_rmw_message_info(),
570 if (RCL_RET_SUBSCRIPTION_TAKE_FAILED == ret) {
573 rclcpp::exceptions::throw_from_rcl_error(ret);
577 [&]() {subscription->handle_loaned_message(loaned_msg, message_info);});
578 if (
nullptr != loaned_msg) {
580 subscription->get_subscription_handle().get(), loaned_msg);
584 "rcl_return_loaned_message_from_subscription() failed for subscription on topic "
586 subscription->get_topic_name(), rcl_get_error_string().str);
589 loaned_msg =
nullptr;
594 std::shared_ptr<void> message = subscription->create_message();
595 take_and_do_error_handling(
596 "taking a message from topic",
597 subscription->get_topic_name(),
598 [&]() {return subscription->take_type_erased(message.get(), message_info);},
599 [&]() {subscription->handle_message(message, message_info);});
605 subscription->return_message(message);
611 case rclcpp::DeliveredMessageKind::SERIALIZED_MESSAGE:
615 std::shared_ptr<SerializedMessage> serialized_msg =
616 subscription->create_serialized_message();
617 take_and_do_error_handling(
618 "taking a serialized message from topic",
619 subscription->get_topic_name(),
620 [&]() {return subscription->take_serialized(*serialized_msg.get(), message_info);},
623 subscription->handle_serialized_message(serialized_msg, message_info);
625 subscription->return_serialized_message(serialized_msg);
631 case rclcpp::DeliveredMessageKind::DYNAMIC_MESSAGE:
633 throw std::runtime_error(
"Unimplemented");
638 throw std::runtime_error(
"Delivered message kind is not supported");
644 Executor::execute_timer(rclcpp::TimerBase::SharedPtr timer,
const std::shared_ptr<void> & data_ptr)
646 timer->execute_callback(data_ptr);
652 auto request_header = service->create_request_header();
653 std::shared_ptr<void> request = service->create_request();
654 take_and_do_error_handling(
655 "taking a service server request from service",
656 service->get_service_name(),
657 [&]() {return service->take_type_erased_request(request.get(), *request_header);},
658 [&]() {service->handle_request(request_header, request);});
664 auto request_header = client->create_request_header();
665 std::shared_ptr<void> response = client->create_response();
666 take_and_do_error_handling(
667 "taking a service client response from service",
668 client->get_service_name(),
669 [&]() {return client->take_type_erased_response(response.get(), *request_header);},
670 [&]() {client->handle_response(request_header, response);});
677 this->wait_result_.reset();
683 rclcpp::executors::build_entities_collection(callback_groups, collection);
690 current_notify_waitable_ = std::make_shared<rclcpp::executors::ExecutorNotifyWaitable>(
692 auto notify_waitable = std::static_pointer_cast<rclcpp::Waitable>(current_notify_waitable_);
693 collection.
waitables.insert({notify_waitable.get(), {notify_waitable, {}}});
698 current_collection_.remove_expired_entities();
702 current_collection_.timers.
update(
704 [
this](
auto timer) {wait_set_.add_timer(timer);},
705 [
this](
auto timer) {wait_set_.remove_timer(timer);});
707 current_collection_.subscriptions.update(
709 [
this](
auto subscription) {
710 wait_set_.add_subscription(subscription, kDefaultSubscriptionMask);
712 [
this](
auto subscription) {
713 wait_set_.remove_subscription(subscription, kDefaultSubscriptionMask);
716 current_collection_.clients.update(
718 [
this](
auto client) {wait_set_.add_client(client);},
719 [
this](
auto client) {wait_set_.remove_client(client);});
721 current_collection_.services.update(
723 [
this](
auto service) {wait_set_.add_service(service);},
724 [
this](
auto service) {wait_set_.remove_service(service);});
726 current_collection_.guard_conditions.update(
728 [
this](
auto guard_condition) {wait_set_.add_guard_condition(guard_condition);},
729 [
this](
auto guard_condition) {wait_set_.remove_guard_condition(guard_condition);});
731 current_collection_.waitables.update(
733 [
this](
auto waitable) {wait_set_.add_waitable(waitable);},
734 [
this](
auto waitable) {wait_set_.remove_waitable(waitable);});
738 this->wait_set_.prune_deleted_entities();
744 TRACETOOLS_TRACEPOINT(rclcpp_executor_wait_for_work, timeout.count());
747 this->wait_result_.reset();
750 std::lock_guard<std::mutex> guard(mutex_);
752 if (this->entities_need_rebuild_.exchange(
false) || current_collection_.empty()) {
757 this->wait_result_.emplace(wait_set_.wait(timeout));
759 if (!this->wait_result_ || this->wait_result_->kind() == WaitResultKind::Empty) {
760 RCUTILS_LOG_WARN_NAMED(
762 "empty wait set received in wait(). This should never happen.");
764 if (this->wait_result_->kind() == WaitResultKind::Ready && current_notify_waitable_) {
765 auto & rcl_wait_set = this->wait_result_->get_wait_set().get_rcl_wait_set();
766 if (current_notify_waitable_->is_ready(rcl_wait_set)) {
767 current_notify_waitable_->execute(current_notify_waitable_->take_data());
776 TRACETOOLS_TRACEPOINT(rclcpp_executor_get_next_ready);
778 bool valid_executable =
false;
780 if (!wait_result_.has_value() || wait_result_->kind() != rclcpp::WaitResultKind::Ready) {
784 if (!valid_executable) {
785 size_t current_timer_index = 0;
787 auto [timer, timer_index] = wait_result_->peek_next_ready_timer(current_timer_index);
788 if (
nullptr == timer) {
791 current_timer_index = timer_index;
792 auto entity_iter = current_collection_.timers.find(timer->get_timer_handle().get());
793 if (entity_iter != current_collection_.timers.end()) {
794 auto callback_group = entity_iter->second.callback_group.lock();
795 if (!callback_group || !callback_group->can_be_taken_from()) {
796 current_timer_index++;
803 wait_result_->clear_timer_with_index(current_timer_index);
805 any_executable.data = timer->call();
806 if (!any_executable.data) {
807 current_timer_index++;
810 any_executable.timer = timer;
811 any_executable.callback_group = callback_group;
812 valid_executable =
true;
815 current_timer_index++;
819 if (!valid_executable) {
820 while (
auto subscription = wait_result_->next_ready_subscription()) {
821 auto entity_iter = current_collection_.subscriptions.find(
822 subscription->get_subscription_handle().get());
823 if (entity_iter != current_collection_.subscriptions.end()) {
824 auto callback_group = entity_iter->second.callback_group.lock();
825 if (!callback_group || !callback_group->can_be_taken_from()) {
828 any_executable.subscription = subscription;
829 any_executable.callback_group = callback_group;
830 valid_executable =
true;
836 if (!valid_executable) {
837 while (
auto service = wait_result_->next_ready_service()) {
838 auto entity_iter = current_collection_.services.find(service->get_service_handle().get());
839 if (entity_iter != current_collection_.services.end()) {
840 auto callback_group = entity_iter->second.callback_group.lock();
841 if (!callback_group || !callback_group->can_be_taken_from()) {
844 any_executable.service = service;
845 any_executable.callback_group = callback_group;
846 valid_executable =
true;
852 if (!valid_executable) {
853 while (
auto client = wait_result_->next_ready_client()) {
854 auto entity_iter = current_collection_.clients.find(client->get_client_handle().get());
855 if (entity_iter != current_collection_.clients.end()) {
856 auto callback_group = entity_iter->second.callback_group.lock();
857 if (!callback_group || !callback_group->can_be_taken_from()) {
860 any_executable.client = client;
861 any_executable.callback_group = callback_group;
862 valid_executable =
true;
868 if (!valid_executable) {
869 while (
auto waitable = wait_result_->next_ready_waitable()) {
870 auto entity_iter = current_collection_.waitables.find(waitable.get());
871 if (entity_iter != current_collection_.waitables.end()) {
872 auto callback_group = entity_iter->second.callback_group.lock();
873 if (!callback_group || !callback_group->can_be_taken_from()) {
876 any_executable.waitable = waitable;
877 any_executable.callback_group = callback_group;
878 any_executable.data = waitable->take_data();
879 valid_executable =
true;
885 if (any_executable.callback_group) {
886 if (any_executable.callback_group->type() == CallbackGroupType::MutuallyExclusive) {
887 assert(any_executable.callback_group->can_be_taken_from().load());
888 any_executable.callback_group->can_be_taken_from().store(
false);
893 return valid_executable;
899 bool success =
false;
Coordinate the order and timing of available communication tasks.
virtual RCLCPP_PUBLIC void spin_node_some(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node)
Add a node, complete all immediately available work, and remove the node.
std::shared_ptr< rclcpp::GuardCondition > interrupt_guard_condition_
Guard condition for signaling the rmw layer to wake up for special events.
RCLCPP_PUBLIC void spin_node_once_nanoseconds(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node, std::chrono::nanoseconds timeout)
Add a node to executor, execute the next available unit of work, and remove the node.
virtual RCLCPP_PUBLIC ~Executor()
Default destructor.
virtual RCLCPP_PUBLIC std::vector< rclcpp::CallbackGroup::WeakPtr > get_manually_added_callback_groups()
Get callback groups that belong to executor.
virtual RCLCPP_PUBLIC void remove_callback_group(rclcpp::CallbackGroup::SharedPtr group_ptr, bool notify=true)
Remove a callback group from the executor.
virtual RCLCPP_PUBLIC void add_callback_group(rclcpp::CallbackGroup::SharedPtr group_ptr, rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify=true)
Add a callback group to an executor.
RCLCPP_PUBLIC void wait_for_work(std::chrono::nanoseconds timeout=std::chrono::nanoseconds(-1))
Block until more work becomes avilable or timeout is reached.
static RCLCPP_PUBLIC void execute_service(rclcpp::ServiceBase::SharedPtr service)
Run service server executable.
virtual RCLCPP_PUBLIC void remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify=true)
Remove a node from the executor.
RCLCPP_PUBLIC Executor(const rclcpp::ExecutorOptions &options=rclcpp::ExecutorOptions())
Default constructor.
virtual RCLCPP_PUBLIC void spin_node_all(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node, std::chrono::nanoseconds max_duration)
Add a node, complete all immediately available work exhaustively, and remove the node.
RCLCPP_PUBLIC bool get_next_executable(AnyExecutable &any_executable, std::chrono::nanoseconds timeout=std::chrono::nanoseconds(-1))
Wait for executable in ready state and populate union structure.
virtual RCLCPP_PUBLIC void spin_once(std::chrono::nanoseconds timeout=std::chrono::nanoseconds(-1))
Collect work once and execute the next available work, optionally within a duration.
virtual RCLCPP_PUBLIC void spin_some(std::chrono::nanoseconds max_duration=std::chrono::nanoseconds(0))
Collect work once and execute all available work, optionally within a max duration.
virtual RCLCPP_PUBLIC void cancel()
Cancel any running spin* function, causing it to return.
RCLCPP_PUBLIC bool is_spinning()
Returns true if the executor is currently spinning.
virtual RCLCPP_PUBLIC void handle_updated_entities(bool notify)
This function triggers a recollect of all entities that are registered to the executor.
RCLCPP_PUBLIC bool get_next_ready_executable(AnyExecutable &any_executable)
Check for executable in ready state and populate union structure.
std::shared_ptr< rclcpp::Context > context_
The context associated with this executor.
static RCLCPP_PUBLIC void execute_timer(rclcpp::TimerBase::SharedPtr timer, const std::shared_ptr< void > &data_ptr)
Run timer executable.
static RCLCPP_PUBLIC void execute_subscription(rclcpp::SubscriptionBase::SharedPtr subscription)
Run subscription executable.
RCLCPP_PUBLIC void collect_entities()
Gather all of the waitable entities from associated nodes and callback groups.
virtual RCLCPP_PUBLIC std::vector< rclcpp::CallbackGroup::WeakPtr > get_automatically_added_callback_groups_from_nodes()
Get callback groups that belong to executor.
virtual RCLCPP_PUBLIC FutureReturnCode spin_until_future_complete_impl(std::chrono::nanoseconds timeout, const std::function< std::future_status(std::chrono::nanoseconds wait_time)> &wait_for_future)
Spin (blocking) until the future is complete, it times out waiting, or rclcpp is interrupted.
rclcpp::OnShutdownCallbackHandle shutdown_callback_handle_
shutdown callback handle registered to Context
virtual RCLCPP_PUBLIC void add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify=true)
Add a node to the executor.
RCLCPP_PUBLIC void execute_any_executable(AnyExecutable &any_exec)
Find the next available executable and do the work associated with it.
std::shared_ptr< rclcpp::executors::ExecutorNotifyWaitable > notify_waitable_
Waitable containing guard conditions controlling the executor flow.
virtual RCLCPP_PUBLIC std::vector< rclcpp::CallbackGroup::WeakPtr > get_all_callback_groups()
Get callback groups that belong to executor.
static RCLCPP_PUBLIC void execute_client(rclcpp::ClientBase::SharedPtr client)
Run service client executable.
rclcpp::executors::ExecutorEntitiesCollector collector_
Collector used to associate executable entities from nodes and guard conditions.
virtual RCLCPP_PUBLIC void spin_all(std::chrono::nanoseconds max_duration)
Collect and execute work repeatedly within a duration or until no more work is available.
RCLCPP_PUBLIC void spin_some_impl(std::chrono::nanoseconds max_duration, bool exhaustive)
Collect work and execute available work, optionally within a duration.
std::shared_ptr< rclcpp::GuardCondition > shutdown_guard_condition_
Guard condition for signaling the rmw layer to wake up for system shutdown.
std::atomic_bool spinning
Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins.
A condition that can be waited on in a single wait set and asynchronously triggered.
Additional meta data about messages taken from subscriptions.
const rmw_message_info_t & get_rmw_message_info() const
Return the message info as the underlying rmw message info type.
Options used to determine what parts of a subscription get added to or removed from a wait set.
Created when the return code does not match one of the other specialized exceptions.
void update(const EntityCollection< EntityKeyType, EntityValueType > &other, std::function< void(const EntitySharedPtr &)> on_added, std::function< void(const EntitySharedPtr &)> on_removed)
Update this collection based on the contents of another collection.
RCLCPP_PUBLIC void add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr)
Add a node to the entity collector.
RCLCPP_PUBLIC void update_collections()
Update the underlying collections.
RCLCPP_PUBLIC void remove_callback_group(rclcpp::CallbackGroup::SharedPtr group_ptr)
Remove a callback group from the entity collector.
RCLCPP_PUBLIC std::vector< rclcpp::CallbackGroup::WeakPtr > get_all_callback_groups() const
Get all callback groups known to this entity collector.
RCLCPP_PUBLIC std::vector< rclcpp::CallbackGroup::WeakPtr > get_automatically_added_callback_groups() const
Get automatically-added callback groups known to this entity collector.
RCLCPP_PUBLIC void remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr)
Remove a node from the entity collector.
RCLCPP_PUBLIC std::vector< rclcpp::CallbackGroup::WeakPtr > get_manually_added_callback_groups() const
Get manually-added callback groups known to this entity collector.
RCLCPP_PUBLIC void add_callback_group(rclcpp::CallbackGroup::SharedPtr group_ptr)
Add a callback group to the entity collector.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
FutureReturnCode
Return codes to be used with spin_until_future_complete.
RCLCPP_PUBLIC bool ok(rclcpp::Context::SharedPtr context=nullptr)
Check rclcpp's status.
RCLCPP_PUBLIC Logger get_logger(const std::string &name)
Return a named logger.
Options to be passed to the executor constructor.
Represent the total set of entities for a single executor.
TimerCollection timers
Collection of timers currently in use by the executor.
GuardConditionCollection guard_conditions
Collection of guard conditions currently in use by the executor.
ServiceCollection services
Collection of services currently in use by the executor.
SubscriptionCollection subscriptions
Collection of subscriptions currently in use by the executor.
WaitableCollection waitables
Collection of waitables currently in use by the executor.
ClientCollection clients
Collection of clients currently in use by the executor.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_return_loaned_message_from_subscription(const rcl_subscription_t *subscription, void *loaned_message)
Return a loaned message from a topic using a rcl subscription.
#define RCL_RET_OK
Success return code.
rmw_ret_t rcl_ret_t
The type that holds an rcl return code.