23 #include <type_traits>
28 #include "rcl/error_handling.h"
29 #include "rclcpp/executors/executor_notify_waitable.hpp"
30 #include "rclcpp/subscription_wait_set_mask.hpp"
31 #include "rcpputils/scope_exit.hpp"
33 #include "rclcpp/dynamic_typesupport/dynamic_message.hpp"
34 #include "rclcpp/exceptions.hpp"
35 #include "rclcpp/executor.hpp"
36 #include "rclcpp/guard_condition.hpp"
37 #include "rclcpp/node.hpp"
38 #include "rclcpp/utilities.hpp"
40 #include "rcutils/logging_macros.h"
42 #include "tracetools/tracetools.h"
44 using namespace std::chrono_literals;
54 Executor::Executor(
const std::shared_ptr<rclcpp::Context> & context)
57 entities_need_rebuild_(true),
59 wait_set_({}, {}, {}, {}, {}, {}, context)
67 context_(options.context),
68 notify_waitable_(std::make_shared<
rclcpp::executors::ExecutorNotifyWaitable>(
70 this->entities_need_rebuild_.store(
true);
72 entities_need_rebuild_(
true),
73 collector_(notify_waitable_),
74 wait_set_({}, {}, {}, {}, {}, {}, options.context),
75 current_notify_waitable_(notify_waitable_),
76 impl_(std::make_unique<rclcpp::ExecutorImplementation>())
78 shutdown_callback_handle_ = context_->add_on_shutdown_callback(
79 [weak_gc = std::weak_ptr<rclcpp::GuardCondition>{shutdown_guard_condition_}]() {
80 auto strong_gc = weak_gc.lock();
86 notify_waitable_->add_guard_condition(interrupt_guard_condition_);
87 notify_waitable_->add_guard_condition(shutdown_guard_condition_);
96 std::lock_guard<std::mutex> guard(mutex_);
100 current_collection_.timers.update(
102 [
this](
auto timer) {wait_set_.remove_timer(timer);});
104 current_collection_.subscriptions.update(
106 [
this](
auto subscription) {
107 wait_set_.remove_subscription(subscription, kDefaultSubscriptionMask);
110 current_collection_.clients.update(
112 [
this](
auto client) {wait_set_.remove_client(client);});
114 current_collection_.services.update(
116 [
this](
auto service) {wait_set_.remove_service(service);});
118 current_collection_.guard_conditions.update(
120 [
this](
auto guard_condition) {wait_set_.remove_guard_condition(guard_condition);});
122 current_collection_.waitables.update(
124 [
this](
auto waitable) {wait_set_.remove_waitable(waitable);});
128 RCUTILS_LOG_ERROR_NAMED(
130 "failed to remove registered on_shutdown callback");
138 this->entities_need_rebuild_.store(
true);
140 if (!
spinning.load() && entities_need_rebuild_.exchange(
false)) {
141 std::lock_guard<std::mutex> guard(mutex_);
150 std::vector<rclcpp::CallbackGroup::WeakPtr>
157 std::vector<rclcpp::CallbackGroup::WeakPtr>
164 std::vector<rclcpp::CallbackGroup::WeakPtr>
173 rclcpp::CallbackGroup::SharedPtr group_ptr,
174 [[maybe_unused]] rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
182 throw std::runtime_error(
184 "Failed to handle entities update on callback group add: ") + ex.what());
191 if (node_ptr->get_context() !=
context_) {
192 throw std::runtime_error(
193 "add_node() called with a node with a different context from this executor");
201 throw std::runtime_error(
203 "Failed to handle entities update on node add: ") + ex.what());
209 rclcpp::CallbackGroup::SharedPtr group_ptr,
217 throw std::runtime_error(
219 "Failed to handle entities update on callback group remove: ") + ex.what());
226 this->
add_node(node_ptr->get_node_base_interface(), notify);
237 throw std::runtime_error(
239 "Failed to handle entities update on node remove: ") + ex.what());
246 this->
remove_node(node_ptr->get_node_base_interface(), notify);
251 rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node,
252 std::chrono::nanoseconds timeout)
262 std::chrono::nanoseconds timeout,
263 const std::function<std::future_status(std::chrono::nanoseconds wait_time)> & wait_for_future)
270 std::future_status status = wait_for_future(std::chrono::seconds(0));
271 if (status == std::future_status::ready) {
272 return FutureReturnCode::SUCCESS;
275 auto end_time = std::chrono::steady_clock::now();
276 std::chrono::nanoseconds timeout_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
278 if (timeout_ns > std::chrono::nanoseconds::zero()) {
279 end_time += timeout_ns;
281 std::chrono::nanoseconds timeout_left = timeout_ns;
284 throw std::runtime_error(
"spin_until_future_complete() called while already spinning");
286 RCPPUTILS_SCOPE_EXIT(wait_result_.reset();this->spinning.store(
false););
289 spin_once_impl(timeout_left);
292 status = wait_for_future(std::chrono::seconds(0));
293 if (status == std::future_status::ready) {
294 return FutureReturnCode::SUCCESS;
297 if (timeout_ns < std::chrono::nanoseconds::zero()) {
301 auto now = std::chrono::steady_clock::now();
302 if (now >= end_time) {
303 return FutureReturnCode::TIMEOUT;
306 timeout_left = std::chrono::duration_cast<std::chrono::nanoseconds>(end_time - now);
310 return FutureReturnCode::INTERRUPTED;
334 rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node,
335 std::chrono::nanoseconds max_duration)
345 this->
spin_node_all(node->get_node_base_interface(), max_duration);
350 if (max_duration < 0ns) {
351 throw std::invalid_argument(
"max_duration must be greater than or equal to 0");
359 auto start = std::chrono::steady_clock::now();
360 auto max_duration_not_elapsed = [max_duration, start]() {
361 if (std::chrono::nanoseconds(0) == max_duration) {
364 }
else if (std::chrono::steady_clock::now() - start < max_duration) {
373 throw std::runtime_error(
"spin_some() called while already spinning");
375 RCPPUTILS_SCOPE_EXIT(wait_result_.reset();this->spinning.store(
false););
380 wait_result_.reset();
382 bool entity_states_fully_polled =
true;
384 if (entities_need_rebuild_) {
388 entity_states_fully_polled =
false;
413 entity_states_fully_polled =
false;
416 wait_result_.reset();
418 if (entity_states_fully_polled) {
428 entity_states_fully_polled =
true;
429 if (entities_need_rebuild_) {
433 entity_states_fully_polled =
false;
443 Executor::spin_once_impl(std::chrono::nanoseconds timeout)
455 throw std::runtime_error(
"spin_once() called while already spinning");
457 RCPPUTILS_SCOPE_EXIT(wait_result_.reset();this->spinning.store(
false););
458 spin_once_impl(timeout);
468 throw std::runtime_error(
469 std::string(
"Failed to trigger guard condition in cancel: ") + ex.what());
481 (
void(
"cannot execute an AnyExecutable without a valid callback group"),
482 any_exec.callback_group));
484 if (any_exec.timer) {
485 TRACETOOLS_TRACEPOINT(
486 rclcpp_executor_execute,
487 static_cast<const void *
>(any_exec.timer->get_timer_handle().get()));
490 if (any_exec.subscription) {
491 TRACETOOLS_TRACEPOINT(
492 rclcpp_executor_execute,
493 static_cast<const void *
>(any_exec.subscription->get_subscription_handle().get()));
496 if (any_exec.service) {
499 if (any_exec.client) {
502 if (any_exec.waitable) {
503 const std::shared_ptr<void> & const_data = any_exec.data;
504 any_exec.waitable->execute(const_data);
508 any_exec.callback_group->can_be_taken_from().store(
true);
511 template<
typename Taker,
typename Handler>
514 take_and_do_error_handling(
515 const char * action_description,
516 const char * topic_or_service_name,
518 Handler handle_action)
522 taken = take_action();
526 "executor %s '%s' unexpectedly failed: %s",
528 topic_or_service_name,
542 "executor %s '%s' failed to take anything",
544 topic_or_service_name);
556 switch (subscription->get_delivered_message_kind()) {
558 case rclcpp::DeliveredMessageKind::ROS_MESSAGE:
560 if (subscription->can_loan_messages()) {
564 void * loaned_msg =
nullptr;
567 take_and_do_error_handling(
568 "taking a loaned message from topic",
569 subscription->get_topic_name(),
572 rcl_ret_t ret = rcl_take_loaned_message(
573 subscription->get_subscription_handle().get(),
575 &message_info.get_rmw_message_info(),
577 if (RCL_RET_SUBSCRIPTION_TAKE_FAILED == ret) {
580 rclcpp::exceptions::throw_from_rcl_error(ret);
584 [&]() {subscription->handle_loaned_message(loaned_msg, message_info);});
585 if (
nullptr != loaned_msg) {
587 subscription->get_subscription_handle().get(), loaned_msg);
591 "rcl_return_loaned_message_from_subscription() failed for subscription on topic "
593 subscription->get_topic_name(), rcl_get_error_string().str);
596 loaned_msg =
nullptr;
601 std::shared_ptr<void> message = subscription->create_message();
602 take_and_do_error_handling(
603 "taking a message from topic",
604 subscription->get_topic_name(),
605 [&]() {return subscription->take_type_erased(message.get(), message_info);},
606 [&]() {subscription->handle_message(message, message_info);});
612 subscription->return_message(message);
618 case rclcpp::DeliveredMessageKind::SERIALIZED_MESSAGE:
622 std::shared_ptr<SerializedMessage> serialized_msg =
623 subscription->create_serialized_message();
624 take_and_do_error_handling(
625 "taking a serialized message from topic",
626 subscription->get_topic_name(),
627 [&]() {return subscription->take_serialized(*serialized_msg.get(), message_info);},
630 subscription->handle_serialized_message(serialized_msg, message_info);
632 subscription->return_serialized_message(serialized_msg);
638 case rclcpp::DeliveredMessageKind::DYNAMIC_MESSAGE:
640 throw std::runtime_error(
"Unimplemented");
645 throw std::runtime_error(
"Delivered message kind is not supported");
651 Executor::execute_timer(rclcpp::TimerBase::SharedPtr timer,
const std::shared_ptr<void> & data_ptr)
653 timer->execute_callback(data_ptr);
659 auto request_header = service->create_request_header();
660 std::shared_ptr<void> request = service->create_request();
661 take_and_do_error_handling(
662 "taking a service server request from service",
663 service->get_service_name(),
664 [&]() {return service->take_type_erased_request(request.get(), *request_header);},
665 [&]() {service->handle_request(request_header, request);});
671 auto request_header = client->create_request_header();
672 std::shared_ptr<void> response = client->create_response();
673 take_and_do_error_handling(
674 "taking a service client response from service",
675 client->get_service_name(),
676 [&]() {return client->take_type_erased_response(response.get(), *request_header);},
677 [&]() {client->handle_response(request_header, response);});
684 this->wait_result_.reset();
690 rclcpp::executors::build_entities_collection(callback_groups, collection);
697 current_notify_waitable_ = std::make_shared<rclcpp::executors::ExecutorNotifyWaitable>(
699 auto notify_waitable = std::static_pointer_cast<rclcpp::Waitable>(current_notify_waitable_);
700 collection.
waitables.insert({notify_waitable.get(), {notify_waitable, {}}});
705 current_collection_.remove_expired_entities();
709 current_collection_.timers.
update(
711 [
this](
auto timer) {wait_set_.add_timer(timer);},
712 [
this](
auto timer) {wait_set_.remove_timer(timer);});
714 current_collection_.subscriptions.update(
716 [
this](
auto subscription) {
717 wait_set_.add_subscription(subscription, kDefaultSubscriptionMask);
719 [
this](
auto subscription) {
720 wait_set_.remove_subscription(subscription, kDefaultSubscriptionMask);
723 current_collection_.clients.update(
725 [
this](
auto client) {wait_set_.add_client(client);},
726 [
this](
auto client) {wait_set_.remove_client(client);});
728 current_collection_.services.update(
730 [
this](
auto service) {wait_set_.add_service(service);},
731 [
this](
auto service) {wait_set_.remove_service(service);});
733 current_collection_.guard_conditions.update(
735 [
this](
auto guard_condition) {wait_set_.add_guard_condition(guard_condition);},
736 [
this](
auto guard_condition) {wait_set_.remove_guard_condition(guard_condition);});
738 current_collection_.waitables.update(
740 [
this](
auto waitable) {wait_set_.add_waitable(waitable);},
741 [
this](
auto waitable) {wait_set_.remove_waitable(waitable);});
745 this->wait_set_.prune_deleted_entities();
751 TRACETOOLS_TRACEPOINT(rclcpp_executor_wait_for_work, timeout.count());
754 this->wait_result_.reset();
757 std::lock_guard<std::mutex> guard(mutex_);
759 if (this->entities_need_rebuild_.exchange(
false) || current_collection_.empty()) {
764 this->wait_result_.emplace(wait_set_.wait(timeout));
766 if (!this->wait_result_ || this->wait_result_->kind() == WaitResultKind::Empty) {
767 RCUTILS_LOG_WARN_NAMED(
769 "empty wait set received in wait(). This should never happen.");
771 if (this->wait_result_->kind() == WaitResultKind::Ready && current_notify_waitable_) {
772 auto & rcl_wait_set = this->wait_result_->get_wait_set().get_rcl_wait_set();
773 if (current_notify_waitable_->is_ready(rcl_wait_set)) {
774 current_notify_waitable_->execute(current_notify_waitable_->take_data());
783 TRACETOOLS_TRACEPOINT(rclcpp_executor_get_next_ready);
785 bool valid_executable =
false;
787 if (!wait_result_.has_value() || wait_result_->kind() != rclcpp::WaitResultKind::Ready) {
791 if (!valid_executable) {
792 size_t current_timer_index = 0;
794 auto [timer, timer_index] = wait_result_->peek_next_ready_timer(current_timer_index);
795 if (
nullptr == timer) {
798 current_timer_index = timer_index;
799 auto entity_iter = current_collection_.timers.find(timer->get_timer_handle().get());
800 if (entity_iter != current_collection_.timers.end()) {
801 auto callback_group = entity_iter->second.callback_group.lock();
802 if (!callback_group || !callback_group->can_be_taken_from()) {
803 current_timer_index++;
810 wait_result_->clear_timer_with_index(current_timer_index);
812 any_executable.data = timer->call();
813 if (!any_executable.data) {
814 current_timer_index++;
817 any_executable.timer = timer;
818 any_executable.callback_group = callback_group;
819 valid_executable =
true;
822 current_timer_index++;
826 if (!valid_executable) {
827 while (
auto subscription = wait_result_->next_ready_subscription()) {
828 auto entity_iter = current_collection_.subscriptions.find(
829 subscription->get_subscription_handle().get());
830 if (entity_iter != current_collection_.subscriptions.end()) {
831 auto callback_group = entity_iter->second.callback_group.lock();
832 if (!callback_group || !callback_group->can_be_taken_from()) {
835 any_executable.subscription = subscription;
836 any_executable.callback_group = callback_group;
837 valid_executable =
true;
843 if (!valid_executable) {
844 while (
auto service = wait_result_->next_ready_service()) {
845 auto entity_iter = current_collection_.services.find(service->get_service_handle().get());
846 if (entity_iter != current_collection_.services.end()) {
847 auto callback_group = entity_iter->second.callback_group.lock();
848 if (!callback_group || !callback_group->can_be_taken_from()) {
851 any_executable.service = service;
852 any_executable.callback_group = callback_group;
853 valid_executable =
true;
859 if (!valid_executable) {
860 while (
auto client = wait_result_->next_ready_client()) {
861 auto entity_iter = current_collection_.clients.find(client->get_client_handle().get());
862 if (entity_iter != current_collection_.clients.end()) {
863 auto callback_group = entity_iter->second.callback_group.lock();
864 if (!callback_group || !callback_group->can_be_taken_from()) {
867 any_executable.client = client;
868 any_executable.callback_group = callback_group;
869 valid_executable =
true;
875 if (!valid_executable) {
876 while (
auto waitable = wait_result_->next_ready_waitable()) {
877 auto entity_iter = current_collection_.waitables.find(waitable.get());
878 if (entity_iter != current_collection_.waitables.end()) {
879 auto callback_group = entity_iter->second.callback_group.lock();
880 if (!callback_group || !callback_group->can_be_taken_from()) {
883 any_executable.waitable = waitable;
884 any_executable.callback_group = callback_group;
885 any_executable.data = waitable->take_data();
886 valid_executable =
true;
892 if (any_executable.callback_group) {
893 if (any_executable.callback_group->type() == CallbackGroupType::MutuallyExclusive) {
894 assert(any_executable.callback_group->can_be_taken_from().load());
895 any_executable.callback_group->can_be_taken_from().store(
false);
900 return valid_executable;
906 bool success =
false;
Coordinate the order and timing of available communication tasks.
virtual RCLCPP_PUBLIC void spin_node_some(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node)
Add a node, complete all immediately available work, and remove the node.
std::shared_ptr< rclcpp::GuardCondition > interrupt_guard_condition_
Guard condition for signaling the rmw layer to wake up for special events.
RCLCPP_PUBLIC void spin_node_once_nanoseconds(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node, std::chrono::nanoseconds timeout)
Add a node to executor, execute the next available unit of work, and remove the node.
virtual RCLCPP_PUBLIC ~Executor()
Default destructor.
virtual RCLCPP_PUBLIC std::vector< rclcpp::CallbackGroup::WeakPtr > get_manually_added_callback_groups()
Get callback groups that belong to executor.
virtual RCLCPP_PUBLIC void remove_callback_group(rclcpp::CallbackGroup::SharedPtr group_ptr, bool notify=true)
Remove a callback group from the executor.
virtual RCLCPP_PUBLIC void add_callback_group(rclcpp::CallbackGroup::SharedPtr group_ptr, rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify=true)
Add a callback group to an executor.
RCLCPP_PUBLIC void wait_for_work(std::chrono::nanoseconds timeout=std::chrono::nanoseconds(-1))
Block until more work becomes avilable or timeout is reached.
static RCLCPP_PUBLIC void execute_service(rclcpp::ServiceBase::SharedPtr service)
Run service server executable.
virtual RCLCPP_PUBLIC void remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify=true)
Remove a node from the executor.
RCLCPP_PUBLIC Executor(const rclcpp::ExecutorOptions &options=rclcpp::ExecutorOptions())
Default constructor.
virtual RCLCPP_PUBLIC void spin_node_all(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node, std::chrono::nanoseconds max_duration)
Add a node, complete all immediately available work exhaustively, and remove the node.
RCLCPP_PUBLIC bool get_next_executable(AnyExecutable &any_executable, std::chrono::nanoseconds timeout=std::chrono::nanoseconds(-1))
Wait for executable in ready state and populate union structure.
virtual RCLCPP_PUBLIC void spin_once(std::chrono::nanoseconds timeout=std::chrono::nanoseconds(-1))
Collect work once and execute the next available work, optionally within a duration.
virtual RCLCPP_PUBLIC void spin_some(std::chrono::nanoseconds max_duration=std::chrono::nanoseconds(0))
Collect work once and execute all available work, optionally within a max duration.
virtual RCLCPP_PUBLIC void cancel()
Cancel any running spin* function, causing it to return.
RCLCPP_PUBLIC bool is_spinning()
Returns true if the executor is currently spinning.
virtual RCLCPP_PUBLIC void handle_updated_entities(bool notify)
This function triggers a recollect of all entities that are registered to the executor.
RCLCPP_PUBLIC bool get_next_ready_executable(AnyExecutable &any_executable)
Check for executable in ready state and populate union structure.
std::shared_ptr< rclcpp::Context > context_
The context associated with this executor.
static RCLCPP_PUBLIC void execute_timer(rclcpp::TimerBase::SharedPtr timer, const std::shared_ptr< void > &data_ptr)
Run timer executable.
static RCLCPP_PUBLIC void execute_subscription(rclcpp::SubscriptionBase::SharedPtr subscription)
Run subscription executable.
RCLCPP_PUBLIC void collect_entities()
Gather all of the waitable entities from associated nodes and callback groups.
virtual RCLCPP_PUBLIC std::vector< rclcpp::CallbackGroup::WeakPtr > get_automatically_added_callback_groups_from_nodes()
Get callback groups that belong to executor.
virtual RCLCPP_PUBLIC FutureReturnCode spin_until_future_complete_impl(std::chrono::nanoseconds timeout, const std::function< std::future_status(std::chrono::nanoseconds wait_time)> &wait_for_future)
Spin (blocking) until the future is complete, it times out waiting, or rclcpp is interrupted.
rclcpp::OnShutdownCallbackHandle shutdown_callback_handle_
shutdown callback handle registered to Context
virtual RCLCPP_PUBLIC void add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify=true)
Add a node to the executor.
RCLCPP_PUBLIC void execute_any_executable(AnyExecutable &any_exec)
Find the next available executable and do the work associated with it.
std::shared_ptr< rclcpp::executors::ExecutorNotifyWaitable > notify_waitable_
Waitable containing guard conditions controlling the executor flow.
virtual RCLCPP_PUBLIC std::vector< rclcpp::CallbackGroup::WeakPtr > get_all_callback_groups()
Get callback groups that belong to executor.
static RCLCPP_PUBLIC void execute_client(rclcpp::ClientBase::SharedPtr client)
Run service client executable.
rclcpp::executors::ExecutorEntitiesCollector collector_
Collector used to associate executable entities from nodes and guard conditions.
virtual RCLCPP_PUBLIC void spin_all(std::chrono::nanoseconds max_duration)
Collect and execute work repeatedly within a duration or until no more work is available.
RCLCPP_PUBLIC void spin_some_impl(std::chrono::nanoseconds max_duration, bool exhaustive)
Collect work and execute available work, optionally within a duration.
std::shared_ptr< rclcpp::GuardCondition > shutdown_guard_condition_
Guard condition for signaling the rmw layer to wake up for system shutdown.
std::atomic_bool spinning
Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins.
A condition that can be waited on in a single wait set and asynchronously triggered.
Additional meta data about messages taken from subscriptions.
const rmw_message_info_t & get_rmw_message_info() const
Return the message info as the underlying rmw message info type.
Options used to determine what parts of a subscription get added to or removed from a wait set.
Created when the return code does not match one of the other specialized exceptions.
void update(const EntityCollection< EntityKeyType, EntityValueType > &other, std::function< void(const EntitySharedPtr &)> on_added, std::function< void(const EntitySharedPtr &)> on_removed)
Update this collection based on the contents of another collection.
RCLCPP_PUBLIC void add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr)
Add a node to the entity collector.
RCLCPP_PUBLIC void update_collections()
Update the underlying collections.
RCLCPP_PUBLIC void remove_callback_group(rclcpp::CallbackGroup::SharedPtr group_ptr)
Remove a callback group from the entity collector.
RCLCPP_PUBLIC std::vector< rclcpp::CallbackGroup::WeakPtr > get_all_callback_groups() const
Get all callback groups known to this entity collector.
RCLCPP_PUBLIC std::vector< rclcpp::CallbackGroup::WeakPtr > get_automatically_added_callback_groups() const
Get automatically-added callback groups known to this entity collector.
RCLCPP_PUBLIC void remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr)
Remove a node from the entity collector.
RCLCPP_PUBLIC std::vector< rclcpp::CallbackGroup::WeakPtr > get_manually_added_callback_groups() const
Get manually-added callback groups known to this entity collector.
RCLCPP_PUBLIC void add_callback_group(rclcpp::CallbackGroup::SharedPtr group_ptr)
Add a callback group to the entity collector.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
FutureReturnCode
Return codes to be used with spin_until_future_complete.
RCLCPP_PUBLIC bool ok(rclcpp::Context::SharedPtr context=nullptr)
Check rclcpp's status.
RCLCPP_PUBLIC Logger get_logger(const std::string &name)
Return a named logger.
Options to be passed to the executor constructor.
Represent the total set of entities for a single executor.
TimerCollection timers
Collection of timers currently in use by the executor.
GuardConditionCollection guard_conditions
Collection of guard conditions currently in use by the executor.
ServiceCollection services
Collection of services currently in use by the executor.
SubscriptionCollection subscriptions
Collection of subscriptions currently in use by the executor.
WaitableCollection waitables
Collection of waitables currently in use by the executor.
ClientCollection clients
Collection of clients currently in use by the executor.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_return_loaned_message_from_subscription(const rcl_subscription_t *subscription, void *loaned_message)
Return a loaned message from a topic using a rcl subscription.
#define RCL_RET_OK
Success return code.
rmw_ret_t rcl_ret_t
The type that holds an rcl return code.