19 #include <type_traits>
24 #include "rcl/error_handling.h"
25 #include "rcpputils/scope_exit.hpp"
27 #include "rclcpp/exceptions.hpp"
28 #include "rclcpp/executor.hpp"
29 #include "rclcpp/guard_condition.hpp"
30 #include "rclcpp/memory_strategy.hpp"
31 #include "rclcpp/node.hpp"
32 #include "rclcpp/utilities.hpp"
34 #include "rcutils/logging_macros.h"
36 #include "tracetools/tracetools.h"
38 using namespace std::chrono_literals;
40 using rclcpp::exceptions::throw_from_rcl_error;
48 interrupt_guard_condition_(options.context),
50 memory_strategy_(options.memory_strategy)
56 [weak_gc = std::weak_ptr<rclcpp::GuardCondition>{shutdown_guard_condition_}]() {
57 auto strong_gc = weak_gc.lock();
65 memory_strategy_->add_guard_condition(*shutdown_guard_condition_.get());
77 RCUTILS_LOG_ERROR_NAMED(
79 "failed to create wait set: %s", rcl_get_error_string().str);
81 throw_from_rcl_error(ret,
"Failed to create wait set in Executor constructor");
88 for (
auto & pair : weak_groups_to_nodes_) {
89 auto group = pair.first.lock();
91 std::atomic_bool & has_executor = group->get_associated_with_executor_atomic();
92 has_executor.store(
false);
97 weak_nodes_.begin(), weak_nodes_.end(), []
98 (rclcpp::node_interfaces::NodeBaseInterface::WeakPtr weak_node_ptr) {
99 auto shared_node_ptr = weak_node_ptr.lock();
100 if (shared_node_ptr) {
101 std::atomic_bool & has_executor = shared_node_ptr->get_associated_with_executor_atomic();
102 has_executor.store(false);
106 weak_groups_associated_with_executor_to_nodes_.clear();
107 weak_groups_to_nodes_associated_with_executor_.clear();
108 weak_groups_to_nodes_.clear();
109 for (
const auto & pair : weak_groups_to_guard_conditions_) {
110 auto guard_condition = pair.second;
111 memory_strategy_->remove_guard_condition(guard_condition);
113 weak_groups_to_guard_conditions_.clear();
117 RCUTILS_LOG_ERROR_NAMED(
119 "failed to destroy wait set: %s", rcl_get_error_string().str);
123 memory_strategy_->remove_guard_condition(shutdown_guard_condition_.get());
124 memory_strategy_->remove_guard_condition(&interrupt_guard_condition_);
127 if (!context_->remove_on_shutdown_callback(shutdown_callback_handle_)) {
128 RCUTILS_LOG_ERROR_NAMED(
130 "failed to remove registered on_shutdown callback");
135 std::vector<rclcpp::CallbackGroup::WeakPtr>
136 Executor::get_all_callback_groups()
138 std::vector<rclcpp::CallbackGroup::WeakPtr> groups;
139 std::lock_guard<std::mutex> guard{mutex_};
140 for (
const auto & group_node_ptr : weak_groups_associated_with_executor_to_nodes_) {
141 groups.push_back(group_node_ptr.first);
143 for (
auto const & group_node_ptr : weak_groups_to_nodes_associated_with_executor_) {
144 groups.push_back(group_node_ptr.first);
149 std::vector<rclcpp::CallbackGroup::WeakPtr>
152 std::vector<rclcpp::CallbackGroup::WeakPtr> groups;
153 std::lock_guard<std::mutex> guard{mutex_};
154 for (
auto const & group_node_ptr : weak_groups_associated_with_executor_to_nodes_) {
155 groups.push_back(group_node_ptr.first);
160 std::vector<rclcpp::CallbackGroup::WeakPtr>
163 std::vector<rclcpp::CallbackGroup::WeakPtr> groups;
164 std::lock_guard<std::mutex> guard{mutex_};
165 for (
auto const & group_node_ptr : weak_groups_to_nodes_associated_with_executor_) {
166 groups.push_back(group_node_ptr.first);
174 for (
auto & weak_node : weak_nodes_) {
175 auto node = weak_node.lock();
177 node->for_each_callback_group(
178 [
this, node](rclcpp::CallbackGroup::SharedPtr shared_group_ptr)
181 shared_group_ptr->automatically_add_to_executor_with_node() &&
182 !shared_group_ptr->get_associated_with_executor_atomic().load())
184 this->add_callback_group_to_map(
187 weak_groups_to_nodes_associated_with_executor_,
197 rclcpp::CallbackGroup::SharedPtr group_ptr,
198 rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
199 rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap & weak_groups_to_nodes,
203 std::atomic_bool & has_executor = group_ptr->get_associated_with_executor_atomic();
204 if (has_executor.exchange(
true)) {
205 throw std::runtime_error(
"Callback group has already been added to an executor.");
208 rclcpp::CallbackGroup::WeakPtr weak_group_ptr = group_ptr;
210 weak_groups_to_nodes.insert(std::make_pair(weak_group_ptr, node_ptr));
211 bool was_inserted = insert_info.second;
213 throw std::runtime_error(
"Callback group was already added to executor.");
216 weak_groups_to_nodes_.insert(std::make_pair(weak_group_ptr, node_ptr));
218 if (node_ptr->get_context()->is_valid()) {
219 auto callback_group_guard_condition =
220 group_ptr->get_notify_guard_condition(node_ptr->get_context());
221 weak_groups_to_guard_conditions_[weak_group_ptr] = callback_group_guard_condition.get();
223 memory_strategy_->add_guard_condition(*callback_group_guard_condition);
231 throw std::runtime_error(
233 "Failed to trigger guard condition on callback group add: ") + ex.what());
240 rclcpp::CallbackGroup::SharedPtr group_ptr,
241 rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
244 std::lock_guard<std::mutex> guard{mutex_};
248 weak_groups_associated_with_executor_to_nodes_,
256 std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic();
257 if (has_executor.exchange(
true)) {
258 throw std::runtime_error(
259 std::string(
"Node '") + node_ptr->get_fully_qualified_name() +
260 "' has already been added to an executor.");
262 std::lock_guard<std::mutex> guard{mutex_};
263 node_ptr->for_each_callback_group(
264 [
this, node_ptr, notify](rclcpp::CallbackGroup::SharedPtr group_ptr)
266 if (!group_ptr->get_associated_with_executor_atomic().load() &&
267 group_ptr->automatically_add_to_executor_with_node())
269 this->add_callback_group_to_map(
272 weak_groups_to_nodes_associated_with_executor_,
277 weak_nodes_.push_back(node_ptr);
282 rclcpp::CallbackGroup::SharedPtr group_ptr,
283 rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap & weak_groups_to_nodes,
286 rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr;
287 rclcpp::CallbackGroup::WeakPtr weak_group_ptr = group_ptr;
288 auto iter = weak_groups_to_nodes.find(weak_group_ptr);
289 if (iter != weak_groups_to_nodes.end()) {
290 node_ptr = iter->second.lock();
291 if (node_ptr ==
nullptr) {
292 throw std::runtime_error(
"Node must not be deleted before its callback group(s).");
294 weak_groups_to_nodes.erase(iter);
295 weak_groups_to_nodes_.erase(group_ptr);
296 std::atomic_bool & has_executor = group_ptr->get_associated_with_executor_atomic();
297 has_executor.store(
false);
299 throw std::runtime_error(
"Callback group needs to be associated with executor.");
302 if (!
has_node(node_ptr, weak_groups_to_nodes_associated_with_executor_) &&
303 !
has_node(node_ptr, weak_groups_associated_with_executor_to_nodes_))
305 auto iter = weak_groups_to_guard_conditions_.find(weak_group_ptr);
306 if (iter != weak_groups_to_guard_conditions_.end()) {
307 memory_strategy_->remove_guard_condition(iter->second);
309 weak_groups_to_guard_conditions_.erase(weak_group_ptr);
315 throw std::runtime_error(
317 "Failed to trigger guard condition on callback group remove: ") + ex.what());
325 rclcpp::CallbackGroup::SharedPtr group_ptr,
328 std::lock_guard<std::mutex> guard{mutex_};
331 weak_groups_associated_with_executor_to_nodes_,
338 this->
add_node(node_ptr->get_node_base_interface(), notify);
344 if (!node_ptr->get_associated_with_executor_atomic().load()) {
345 throw std::runtime_error(
"Node needs to be associated with an executor.");
348 std::lock_guard<std::mutex> guard{mutex_};
349 bool found_node =
false;
350 auto node_it = weak_nodes_.begin();
351 while (node_it != weak_nodes_.end()) {
352 bool matched = (node_it->lock() == node_ptr);
355 node_it = weak_nodes_.erase(node_it);
361 throw std::runtime_error(
"Node needs to be associated with this executor.");
364 for (
auto it = weak_groups_to_nodes_associated_with_executor_.begin();
365 it != weak_groups_to_nodes_associated_with_executor_.end(); )
367 auto weak_node_ptr = it->second;
368 auto shared_node_ptr = weak_node_ptr.lock();
369 auto group_ptr = it->first.lock();
373 if (shared_node_ptr == node_ptr) {
376 weak_groups_to_nodes_associated_with_executor_,
381 std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic();
382 has_executor.store(
false);
388 this->
remove_node(node_ptr->get_node_base_interface(), notify);
392 Executor::spin_node_once_nanoseconds(
393 rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node,
394 std::chrono::nanoseconds timeout)
418 return this->spin_some_impl(max_duration,
false);
423 if (max_duration < 0ns) {
424 throw std::invalid_argument(
"max_duration must be greater than or equal to 0");
426 return this->spin_some_impl(max_duration,
true);
430 Executor::spin_some_impl(std::chrono::nanoseconds max_duration,
bool exhaustive)
432 auto start = std::chrono::steady_clock::now();
433 auto max_duration_not_elapsed = [max_duration, start]() {
434 if (std::chrono::nanoseconds(0) == max_duration) {
437 }
else if (std::chrono::steady_clock::now() - start < max_duration) {
446 throw std::runtime_error(
"spin_some() called while already spinning");
448 RCPPUTILS_SCOPE_EXIT(this->
spinning.store(
false); );
449 bool work_available =
false;
452 if (!work_available) {
455 if (get_next_ready_executable(any_exec)) {
457 work_available =
true;
459 if (!work_available || !exhaustive) {
462 work_available =
false;
468 Executor::spin_once_impl(std::chrono::nanoseconds timeout)
471 if (get_next_executable(any_exec, timeout)) {
477 Executor::spin_once(std::chrono::nanoseconds timeout)
480 throw std::runtime_error(
"spin_once() called while already spinning");
482 RCPPUTILS_SCOPE_EXIT(this->
spinning.store(
false); );
483 spin_once_impl(timeout);
493 throw std::runtime_error(
494 std::string(
"Failed to trigger guard condition in cancel: ") + ex.what());
501 if (memory_strategy ==
nullptr) {
502 throw std::runtime_error(
"Received NULL memory strategy in executor.");
504 std::lock_guard<std::mutex> guard{mutex_};
505 memory_strategy_ = memory_strategy;
514 if (any_exec.timer) {
516 rclcpp_executor_execute,
517 static_cast<const void *
>(any_exec.timer->get_timer_handle().get()));
518 execute_timer(any_exec.timer);
520 if (any_exec.subscription) {
522 rclcpp_executor_execute,
523 static_cast<const void *
>(any_exec.subscription->get_subscription_handle().get()));
524 execute_subscription(any_exec.subscription);
526 if (any_exec.service) {
527 execute_service(any_exec.service);
529 if (any_exec.client) {
530 execute_client(any_exec.client);
532 if (any_exec.waitable) {
533 any_exec.waitable->execute(any_exec.data);
536 any_exec.callback_group->can_be_taken_from().store(
true);
542 throw std::runtime_error(
544 "Failed to trigger guard condition from execute_any_executable: ") + ex.what());
550 take_and_do_error_handling(
551 const char * action_description,
552 const char * topic_or_service_name,
553 std::function<
bool()> take_action,
554 std::function<
void()> handle_action)
558 taken = take_action();
562 "executor %s '%s' unexpectedly failed: %s",
564 topic_or_service_name,
578 "executor %s '%s' failed to take anything",
580 topic_or_service_name);
585 Executor::execute_subscription(rclcpp::SubscriptionBase::SharedPtr subscription)
590 if (subscription->is_serialized()) {
593 std::shared_ptr<SerializedMessage> serialized_msg = subscription->create_serialized_message();
594 take_and_do_error_handling(
595 "taking a serialized message from topic",
596 subscription->get_topic_name(),
597 [&]() {return subscription->take_serialized(*serialized_msg.get(), message_info);},
600 subscription->handle_serialized_message(serialized_msg, message_info);
602 subscription->return_serialized_message(serialized_msg);
603 }
else if (subscription->can_loan_messages()) {
607 void * loaned_msg =
nullptr;
610 take_and_do_error_handling(
611 "taking a loaned message from topic",
612 subscription->get_topic_name(),
615 rcl_ret_t ret = rcl_take_loaned_message(
616 subscription->get_subscription_handle().get(),
618 &message_info.get_rmw_message_info(),
620 if (RCL_RET_SUBSCRIPTION_TAKE_FAILED == ret) {
623 rclcpp::exceptions::throw_from_rcl_error(ret);
627 [&]() {subscription->handle_loaned_message(loaned_msg, message_info);});
628 if (
nullptr != loaned_msg) {
630 subscription->get_subscription_handle().get(),
635 "rcl_return_loaned_message_from_subscription() failed for subscription on topic '%s': %s",
636 subscription->get_topic_name(), rcl_get_error_string().str);
638 loaned_msg =
nullptr;
643 std::shared_ptr<void> message = subscription->create_message();
644 take_and_do_error_handling(
645 "taking a message from topic",
646 subscription->get_topic_name(),
647 [&]() {return subscription->take_type_erased(message.get(), message_info);},
648 [&]() {subscription->handle_message(message, message_info);});
654 subscription->return_message(message);
659 Executor::execute_timer(rclcpp::TimerBase::SharedPtr timer)
661 timer->execute_callback();
665 Executor::execute_service(rclcpp::ServiceBase::SharedPtr service)
667 auto request_header = service->create_request_header();
668 std::shared_ptr<void> request = service->create_request();
669 take_and_do_error_handling(
670 "taking a service server request from service",
671 service->get_service_name(),
672 [&]() {return service->take_type_erased_request(request.get(), *request_header);},
673 [&]() {service->handle_request(request_header, request);});
677 Executor::execute_client(
678 rclcpp::ClientBase::SharedPtr client)
680 auto request_header = client->create_request_header();
681 std::shared_ptr<void> response = client->create_response();
682 take_and_do_error_handling(
683 "taking a service client response from service",
684 client->get_service_name(),
685 [&]() {return client->take_type_erased_response(response.get(), *request_header);},
686 [&]() {client->handle_response(request_header, response);});
692 TRACEPOINT(rclcpp_executor_wait_for_work, timeout.count());
694 std::lock_guard<std::mutex> guard(mutex_);
703 memory_strategy_->clear_handles();
704 bool has_invalid_weak_groups_or_nodes =
705 memory_strategy_->collect_entities(weak_groups_to_nodes_);
707 if (has_invalid_weak_groups_or_nodes) {
708 std::vector<rclcpp::CallbackGroup::WeakPtr> invalid_group_ptrs;
709 for (
auto pair : weak_groups_to_nodes_) {
710 auto weak_group_ptr = pair.first;
711 auto weak_node_ptr = pair.second;
712 if (weak_group_ptr.expired() || weak_node_ptr.expired()) {
713 invalid_group_ptrs.push_back(weak_group_ptr);
717 invalid_group_ptrs.begin(), invalid_group_ptrs.end(),
718 [
this](rclcpp::CallbackGroup::WeakPtr group_ptr) {
719 if (weak_groups_to_nodes_associated_with_executor_.find(group_ptr) !=
720 weak_groups_to_nodes_associated_with_executor_.end())
722 weak_groups_to_nodes_associated_with_executor_.erase(group_ptr);
724 if (weak_groups_associated_with_executor_to_nodes_.find(group_ptr) !=
725 weak_groups_associated_with_executor_to_nodes_.end())
727 weak_groups_associated_with_executor_to_nodes_.erase(group_ptr);
729 auto callback_guard_pair = weak_groups_to_guard_conditions_.find(group_ptr);
730 if (callback_guard_pair != weak_groups_to_guard_conditions_.end()) {
731 auto guard_condition = callback_guard_pair->second;
732 weak_groups_to_guard_conditions_.erase(group_ptr);
733 memory_strategy_->remove_guard_condition(guard_condition);
735 weak_groups_to_nodes_.erase(group_ptr);
742 throw_from_rcl_error(ret,
"Couldn't clear wait set");
747 &
wait_set_, memory_strategy_->number_of_ready_subscriptions(),
748 memory_strategy_->number_of_guard_conditions(), memory_strategy_->number_of_ready_timers(),
749 memory_strategy_->number_of_ready_clients(), memory_strategy_->number_of_ready_services(),
750 memory_strategy_->number_of_ready_events());
752 throw_from_rcl_error(ret,
"Couldn't resize the wait set");
755 if (!memory_strategy_->add_handles_to_wait_set(&
wait_set_)) {
756 throw std::runtime_error(
"Couldn't fill wait set");
761 rcl_wait(&wait_set_, std::chrono::duration_cast<std::chrono::nanoseconds>(timeout).count());
763 RCUTILS_LOG_WARN_NAMED(
765 "empty wait set received in rcl_wait(). This should never happen.");
767 using rclcpp::exceptions::throw_from_rcl_error;
768 throw_from_rcl_error(status,
"rcl_wait() failed");
773 std::lock_guard<std::mutex> guard(mutex_);
774 memory_strategy_->remove_null_handles(&wait_set_);
777 rclcpp::node_interfaces::NodeBaseInterface::SharedPtr
778 Executor::get_node_by_group(
779 const rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap &
780 weak_groups_to_nodes,
781 rclcpp::CallbackGroup::SharedPtr group)
786 rclcpp::CallbackGroup::WeakPtr weak_group_ptr(group);
787 const auto finder = weak_groups_to_nodes.find(weak_group_ptr);
788 if (finder != weak_groups_to_nodes.end()) {
789 rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr = finder->second.lock();
795 rclcpp::CallbackGroup::SharedPtr
796 Executor::get_group_by_timer(rclcpp::TimerBase::SharedPtr timer)
798 std::lock_guard<std::mutex> guard{mutex_};
799 for (
const auto & pair : weak_groups_associated_with_executor_to_nodes_) {
800 auto group = pair.first.lock();
804 auto timer_ref = group->find_timer_ptrs_if(
805 [timer](
const rclcpp::TimerBase::SharedPtr & timer_ptr) ->
bool {
806 return timer_ptr == timer;
813 for (
const auto & pair : weak_groups_to_nodes_associated_with_executor_) {
814 auto group = pair.first.lock();
818 auto timer_ref = group->find_timer_ptrs_if(
819 [timer](
const rclcpp::TimerBase::SharedPtr & timer_ptr) ->
bool {
820 return timer_ptr == timer;
830 Executor::get_next_ready_executable(
AnyExecutable & any_executable)
832 bool success = get_next_ready_executable_from_map(any_executable, weak_groups_to_nodes_);
837 Executor::get_next_ready_executable_from_map(
839 const rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap &
840 weak_groups_to_nodes)
842 TRACEPOINT(rclcpp_executor_get_next_ready);
843 bool success =
false;
844 std::lock_guard<std::mutex> guard{mutex_};
846 memory_strategy_->get_next_timer(any_executable, weak_groups_to_nodes);
847 if (any_executable.timer) {
852 memory_strategy_->get_next_subscription(any_executable, weak_groups_to_nodes);
853 if (any_executable.subscription) {
859 memory_strategy_->get_next_service(any_executable, weak_groups_to_nodes);
860 if (any_executable.service) {
866 memory_strategy_->get_next_client(any_executable, weak_groups_to_nodes);
867 if (any_executable.client) {
873 memory_strategy_->get_next_waitable(any_executable, weak_groups_to_nodes);
874 if (any_executable.waitable) {
875 any_executable.data = any_executable.waitable->take_data();
882 rclcpp::CallbackGroup::WeakPtr weak_group_ptr = any_executable.callback_group;
883 auto iter = weak_groups_to_nodes.find(weak_group_ptr);
884 if (iter == weak_groups_to_nodes.end()) {
892 if (any_executable.callback_group && any_executable.callback_group->type() == \
893 CallbackGroupType::MutuallyExclusive)
896 assert(any_executable.callback_group->can_be_taken_from().load());
900 any_executable.callback_group->can_be_taken_from().store(
false);
908 Executor::get_next_executable(
AnyExecutable & any_executable, std::chrono::nanoseconds timeout)
910 bool success =
false;
913 success = get_next_ready_executable(any_executable);
922 success = get_next_ready_executable(any_executable);
930 const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
931 const rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap &
932 weak_groups_to_nodes)
const
935 weak_groups_to_nodes.begin(),
936 weak_groups_to_nodes.end(),
937 [&](
const WeakCallbackGroupsToNodesMap::value_type & other) ->
bool {
938 auto other_ptr = other.second.lock();
939 return other_ptr == node_ptr;
940 }) != weak_groups_to_nodes.end();
rcutils_allocator_t rcl_allocator_t
Encapsulation of an allocator.
Coordinate the order and timing of available communication tasks.
RCLCPP_PUBLIC void spin_node_some(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node)
Add a node, complete all immediately available work, and remove the node.
rcl_wait_set_t wait_set_
Wait set for managing entities that the rmw layer waits on.
RCLCPP_PUBLIC void set_memory_strategy(memory_strategy::MemoryStrategy::SharedPtr memory_strategy)
Support dynamic switching of the memory strategy.
virtual RCLCPP_PUBLIC ~Executor()
Default destructor.
virtual RCLCPP_PUBLIC std::vector< rclcpp::CallbackGroup::WeakPtr > get_manually_added_callback_groups()
Get callback groups that belong to executor.
virtual RCLCPP_PUBLIC void remove_callback_group(rclcpp::CallbackGroup::SharedPtr group_ptr, bool notify=true)
Remove a callback group from the executor.
virtual RCLCPP_PUBLIC void add_callback_group(rclcpp::CallbackGroup::SharedPtr group_ptr, rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify=true)
Add a callback group to an executor.
RCLCPP_PUBLIC void wait_for_work(std::chrono::nanoseconds timeout=std::chrono::nanoseconds(-1))
virtual RCLCPP_PUBLIC void remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify=true)
Remove a node from the executor.
virtual RCLCPP_PUBLIC void spin_some(std::chrono::nanoseconds max_duration=std::chrono::nanoseconds(0))
Collect work once and execute all available work, optionally within a duration.
RCLCPP_PUBLIC void cancel()
Cancel any running spin* function, causing it to return.
RCLCPP_PUBLIC bool is_spinning()
Returns true if the executor is currently spinning.
std::shared_ptr< rclcpp::Context > context_
The context associated with this executor.
virtual RCLCPP_PUBLIC std::vector< rclcpp::CallbackGroup::WeakPtr > get_automatically_added_callback_groups_from_nodes()
Get callback groups that belong to executor.
rclcpp::GuardCondition interrupt_guard_condition_
Guard condition for signaling the rmw layer to wake up for special events.
rclcpp::OnShutdownCallbackHandle shutdown_callback_handle_
shutdown callback handle registered to Context
virtual RCLCPP_PUBLIC void add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify=true)
Add a node to the executor.
RCLCPP_PUBLIC void execute_any_executable(AnyExecutable &any_exec)
Find the next available executable and do the work associated with it.
virtual RCLCPP_PUBLIC void add_callback_group_to_map(rclcpp::CallbackGroup::SharedPtr group_ptr, rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, WeakCallbackGroupsToNodesMap &weak_groups_to_nodes, bool notify=true) RCPPUTILS_TSA_REQUIRES(mutex_)
Add a callback group to an executor.
virtual RCLCPP_PUBLIC void add_callback_groups_from_nodes_associated_to_executor() RCPPUTILS_TSA_REQUIRES(mutex_)
Add all callback groups that can be automatically added from associated nodes.
virtual RCLCPP_PUBLIC void remove_callback_group_from_map(rclcpp::CallbackGroup::SharedPtr group_ptr, WeakCallbackGroupsToNodesMap &weak_groups_to_nodes, bool notify=true) RCPPUTILS_TSA_REQUIRES(mutex_)
Remove a callback group from the executor.
RCLCPP_PUBLIC bool has_node(const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, const WeakCallbackGroupsToNodesMap &weak_groups_to_nodes) const
Return true if the node has been added to this executor.
virtual RCLCPP_PUBLIC void spin_all(std::chrono::nanoseconds max_duration)
Collect and execute work repeatedly within a duration or until no more work is available.
std::atomic_bool spinning
Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins.
A condition that can be waited on in a single wait set and asynchronously triggered.
RCLCPP_PUBLIC void trigger()
Notify the wait set waiting on this condition, if any, that the condition had been met.
Additional meta data about messages taken from subscriptions.
const rmw_message_info_t & get_rmw_message_info() const
Return the message info as the underlying rmw message info type.
Created when the return code does not match one of the other specialized exceptions.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
FutureReturnCode
Return codes to be used with spin_until_future_complete.
RCLCPP_PUBLIC bool ok(rclcpp::Context::SharedPtr context=nullptr)
Check rclcpp's status.
RCLCPP_PUBLIC Logger get_logger(const std::string &name)
Return a named logger.
Options to be passed to the executor constructor.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_return_loaned_message_from_subscription(const rcl_subscription_t *subscription, void *loaned_message)
Return a loaned message from a topic using a rcl subscription.
#define RCL_RET_WAIT_SET_EMPTY
Given rcl_wait_set_t is empty return code.
#define RCL_RET_OK
Success return code.
#define RCL_RET_TIMEOUT
Timeout occurred return code.
rmw_ret_t rcl_ret_t
The type that holds an rcl return code.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_wait_set_init(rcl_wait_set_t *wait_set, size_t number_of_subscriptions, size_t number_of_guard_conditions, size_t number_of_timers, size_t number_of_clients, size_t number_of_services, size_t number_of_events, rcl_context_t *context, rcl_allocator_t allocator)
Initialize a rcl wait set with space for items to be waited on.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_wait_set_clear(rcl_wait_set_t *wait_set)
Remove (sets to NULL) all entities in the wait set.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_wait_set_fini(rcl_wait_set_t *wait_set)
Finalize a rcl wait set.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_wait(rcl_wait_set_t *wait_set, int64_t timeout)
Block until the wait set is ready or until the timeout has been exceeded.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_wait_set_resize(rcl_wait_set_t *wait_set, size_t subscriptions_size, size_t guard_conditions_size, size_t timers_size, size_t clients_size, size_t services_size, size_t events_size)
Reallocate space for entities in the wait set.