15 #include "rclcpp/executors/static_executor_entities_collector.hpp"
24 #include "rclcpp/memory_strategy.hpp"
25 #include "rclcpp/executors/static_single_threaded_executor.hpp"
26 #include "rclcpp/detail/add_guard_condition_to_rcl_wait_set.hpp"
30 StaticExecutorEntitiesCollector::~StaticExecutorEntitiesCollector()
33 for (
const auto & pair : weak_groups_associated_with_executor_to_nodes_) {
34 auto group = pair.first.lock();
36 std::atomic_bool & has_executor = group->get_associated_with_executor_atomic();
37 has_executor.store(
false);
40 for (
const auto & pair : weak_groups_to_nodes_associated_with_executor_) {
41 auto group = pair.first.lock();
43 std::atomic_bool & has_executor = group->get_associated_with_executor_atomic();
44 has_executor.store(
false);
48 for (
const auto & weak_node : weak_nodes_) {
49 auto node = weak_node.lock();
51 std::atomic_bool & has_executor = node->get_associated_with_executor_atomic();
52 has_executor.store(
false);
55 weak_groups_associated_with_executor_to_nodes_.clear();
56 weak_groups_to_nodes_associated_with_executor_.clear();
59 weak_nodes_to_guard_conditions_.clear();
65 rclcpp::memory_strategy::MemoryStrategy::SharedPtr memory_strategy)
70 p_wait_set_ = p_wait_set;
72 if (memory_strategy ==
nullptr) {
73 throw std::runtime_error(
"Received NULL memory strategy in executor waitable.");
75 memory_strategy_ = memory_strategy;
78 std::shared_ptr<void> shared_ptr;
86 StaticExecutorEntitiesCollector::fini()
88 memory_strategy_->clear_handles();
103 fill_memory_strategy();
105 fill_executable_list();
109 std::lock_guard<std::mutex> guard{new_nodes_mutex_};
110 for (
const auto & weak_node : new_nodes_) {
111 if (
auto node_ptr = weak_node.lock()) {
112 const auto & gc = node_ptr->get_notify_guard_condition();
113 weak_nodes_to_guard_conditions_[node_ptr] = &gc;
120 StaticExecutorEntitiesCollector::fill_memory_strategy()
122 memory_strategy_->clear_handles();
123 bool has_invalid_weak_groups_or_nodes =
124 memory_strategy_->collect_entities(weak_groups_to_nodes_associated_with_executor_);
126 if (has_invalid_weak_groups_or_nodes) {
127 std::vector<rclcpp::CallbackGroup::WeakPtr> invalid_group_ptrs;
128 for (
const auto & pair : weak_groups_to_nodes_associated_with_executor_) {
129 auto & weak_group_ptr = pair.first;
130 auto & weak_node_ptr = pair.second;
131 if (weak_group_ptr.expired() || weak_node_ptr.expired()) {
132 invalid_group_ptrs.push_back(weak_group_ptr);
136 invalid_group_ptrs.begin(), invalid_group_ptrs.end(),
137 [
this](rclcpp::CallbackGroup::WeakPtr group_ptr) {
138 weak_groups_to_nodes_associated_with_executor_.erase(group_ptr);
141 has_invalid_weak_groups_or_nodes =
142 memory_strategy_->collect_entities(weak_groups_associated_with_executor_to_nodes_);
144 if (has_invalid_weak_groups_or_nodes) {
145 std::vector<rclcpp::CallbackGroup::WeakPtr> invalid_group_ptrs;
146 for (
const auto & pair : weak_groups_associated_with_executor_to_nodes_) {
147 auto & weak_group_ptr = pair.first;
148 const auto & weak_node_ptr = pair.second;
149 if (weak_group_ptr.expired() || weak_node_ptr.expired()) {
150 invalid_group_ptrs.push_back(weak_group_ptr);
154 invalid_group_ptrs.begin(), invalid_group_ptrs.end(),
155 [
this](rclcpp::CallbackGroup::WeakPtr group_ptr) {
156 weak_groups_associated_with_executor_to_nodes_.erase(group_ptr);
161 memory_strategy_->add_waitable_handle(this->shared_from_this());
165 StaticExecutorEntitiesCollector::fill_executable_list()
168 add_callback_groups_from_nodes_associated_to_executor();
169 fill_executable_list_from_map(weak_groups_associated_with_executor_to_nodes_);
170 fill_executable_list_from_map(weak_groups_to_nodes_associated_with_executor_);
172 exec_list_.add_waitable(shared_from_this());
175 StaticExecutorEntitiesCollector::fill_executable_list_from_map(
176 const rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap &
177 weak_groups_to_nodes)
179 for (
const auto & pair : weak_groups_to_nodes) {
180 auto group = pair.first.lock();
181 auto node = pair.second.lock();
182 if (!node || !group || !group->can_be_taken_from().load()) {
185 group->find_timer_ptrs_if(
186 [
this](
const rclcpp::TimerBase::SharedPtr & timer) {
188 exec_list_.add_timer(timer);
192 group->find_subscription_ptrs_if(
193 [
this](
const rclcpp::SubscriptionBase::SharedPtr & subscription) {
195 exec_list_.add_subscription(subscription);
199 group->find_service_ptrs_if(
200 [
this](
const rclcpp::ServiceBase::SharedPtr & service) {
202 exec_list_.add_service(service);
206 group->find_client_ptrs_if(
207 [
this](
const rclcpp::ClientBase::SharedPtr & client) {
209 exec_list_.add_client(client);
213 group->find_waitable_ptrs_if(
214 [
this](
const rclcpp::Waitable::SharedPtr & waitable) {
216 exec_list_.add_waitable(waitable);
224 StaticExecutorEntitiesCollector::prepare_wait_set()
228 throw std::runtime_error(
"Couldn't clear wait set");
233 p_wait_set_, memory_strategy_->number_of_ready_subscriptions(),
234 memory_strategy_->number_of_guard_conditions(), memory_strategy_->number_of_ready_timers(),
235 memory_strategy_->number_of_ready_clients(), memory_strategy_->number_of_ready_services(),
236 memory_strategy_->number_of_ready_events());
239 throw std::runtime_error(
240 std::string(
"Couldn't resize the wait set: ") + rcl_get_error_string().str);
250 throw std::runtime_error(
"Couldn't clear wait set");
253 if (!memory_strategy_->add_handles_to_wait_set(p_wait_set_)) {
254 throw std::runtime_error(
"Couldn't fill wait set");
258 rcl_wait(p_wait_set_, std::chrono::duration_cast<std::chrono::nanoseconds>(timeout).count());
261 RCUTILS_LOG_WARN_NAMED(
263 "empty wait set received in rcl_wait(). This should never happen.");
265 using rclcpp::exceptions::throw_from_rcl_error;
266 throw_from_rcl_error(status,
"rcl_wait() failed");
274 for (
const auto & pair : weak_nodes_to_guard_conditions_) {
275 auto & gc = pair.second;
276 detail::add_guard_condition_to_rcl_wait_set(*wait_set, *gc);
282 std::lock_guard<std::mutex> guard{new_nodes_mutex_};
283 return weak_nodes_to_guard_conditions_.size() + new_nodes_.size();
288 rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr)
290 bool is_new_node =
false;
292 std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic();
293 if (has_executor.exchange(
true)) {
294 throw std::runtime_error(
"Node has already been added to an executor.");
296 node_ptr->for_each_callback_group(
297 [
this, node_ptr, &is_new_node](rclcpp::CallbackGroup::SharedPtr group_ptr)
300 !group_ptr->get_associated_with_executor_atomic().load() &&
301 group_ptr->automatically_add_to_executor_with_node())
303 is_new_node = (add_callback_group(
306 weak_groups_to_nodes_associated_with_executor_) ||
310 weak_nodes_.push_back(node_ptr);
316 rclcpp::CallbackGroup::SharedPtr group_ptr,
317 rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
318 rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap & weak_groups_to_nodes)
321 std::atomic_bool & has_executor = group_ptr->get_associated_with_executor_atomic();
322 if (has_executor.exchange(
true)) {
323 throw std::runtime_error(
"Callback group has already been added to an executor.");
325 bool is_new_node = !has_node(node_ptr, weak_groups_associated_with_executor_to_nodes_) &&
326 !has_node(node_ptr, weak_groups_to_nodes_associated_with_executor_);
327 rclcpp::CallbackGroup::WeakPtr weak_group_ptr = group_ptr;
328 auto insert_info = weak_groups_to_nodes.insert(
329 std::make_pair(weak_group_ptr, node_ptr));
330 bool was_inserted = insert_info.second;
332 throw std::runtime_error(
"Callback group was already added to executor.");
335 std::lock_guard<std::mutex> guard{new_nodes_mutex_};
336 new_nodes_.push_back(node_ptr);
344 rclcpp::CallbackGroup::SharedPtr group_ptr,
345 rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr)
347 return add_callback_group(group_ptr, node_ptr, weak_groups_associated_with_executor_to_nodes_);
352 rclcpp::CallbackGroup::SharedPtr group_ptr)
356 weak_groups_associated_with_executor_to_nodes_);
361 rclcpp::CallbackGroup::SharedPtr group_ptr,
362 rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap & weak_groups_to_nodes)
364 rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr;
365 rclcpp::CallbackGroup::WeakPtr weak_group_ptr = group_ptr;
366 auto iter = weak_groups_to_nodes.find(weak_group_ptr);
367 if (iter != weak_groups_to_nodes.end()) {
368 node_ptr = iter->second.lock();
369 if (node_ptr ==
nullptr) {
370 throw std::runtime_error(
"Node must not be deleted before its callback group(s).");
372 weak_groups_to_nodes.erase(iter);
374 throw std::runtime_error(
"Callback group needs to be associated with executor.");
377 if (!has_node(node_ptr, weak_groups_associated_with_executor_to_nodes_) &&
378 !has_node(node_ptr, weak_groups_to_nodes_associated_with_executor_))
380 rclcpp::node_interfaces::NodeBaseInterface::WeakPtr node_weak_ptr(node_ptr);
381 weak_nodes_to_guard_conditions_.erase(node_weak_ptr);
389 rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr)
391 if (!node_ptr->get_associated_with_executor_atomic().load()) {
394 bool node_found =
false;
395 auto node_it = weak_nodes_.begin();
396 while (node_it != weak_nodes_.end()) {
397 bool matched = (node_it->lock() == node_ptr);
399 weak_nodes_.erase(node_it);
408 std::vector<rclcpp::CallbackGroup::SharedPtr> found_group_ptrs;
410 weak_groups_to_nodes_associated_with_executor_.begin(),
411 weak_groups_to_nodes_associated_with_executor_.end(),
412 [&found_group_ptrs, node_ptr](std::pair<rclcpp::CallbackGroup::WeakPtr,
413 rclcpp::node_interfaces::NodeBaseInterface::WeakPtr> key_value_pair) {
414 auto & weak_node_ptr = key_value_pair.second;
415 auto shared_node_ptr = weak_node_ptr.lock();
416 auto group_ptr = key_value_pair.first.lock();
417 if (shared_node_ptr == node_ptr) {
418 found_group_ptrs.push_back(group_ptr);
422 found_group_ptrs.begin(), found_group_ptrs.end(), [
this]
423 (rclcpp::CallbackGroup::SharedPtr group_ptr) {
424 this->remove_callback_group_from_map(
426 weak_groups_to_nodes_associated_with_executor_);
428 std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic();
429 has_executor.store(
false);
439 auto found_guard_condition = std::find_if(
440 weak_nodes_to_guard_conditions_.begin(), weak_nodes_to_guard_conditions_.end(),
441 [&](std::pair<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr,
443 const rcl_guard_condition_t & rcl_gc = pair.second->get_rcl_guard_condition();
444 return &rcl_gc == p_wait_set->guard_conditions[i];
446 if (found_guard_condition != weak_nodes_to_guard_conditions_.end()) {
457 StaticExecutorEntitiesCollector::has_node(
458 const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
459 const rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap &
460 weak_groups_to_nodes)
const
463 weak_groups_to_nodes.begin(),
464 weak_groups_to_nodes.end(),
465 [&](
const WeakCallbackGroupsToNodesMap::value_type & other) ->
bool {
466 auto other_ptr = other.second.lock();
467 return other_ptr == node_ptr;
468 }) != weak_groups_to_nodes.end();
472 StaticExecutorEntitiesCollector::add_callback_groups_from_nodes_associated_to_executor()
474 for (
const auto & weak_node : weak_nodes_) {
475 auto node = weak_node.lock();
477 node->for_each_callback_group(
478 [
this, node](rclcpp::CallbackGroup::SharedPtr shared_group_ptr)
480 if (shared_group_ptr->automatically_add_to_executor_with_node() &&
481 !shared_group_ptr->get_associated_with_executor_atomic().load())
486 weak_groups_to_nodes_associated_with_executor_);
493 std::vector<rclcpp::CallbackGroup::WeakPtr>
494 StaticExecutorEntitiesCollector::get_all_callback_groups()
496 std::vector<rclcpp::CallbackGroup::WeakPtr> groups;
497 for (
const auto & group_node_ptr : weak_groups_associated_with_executor_to_nodes_) {
498 groups.push_back(group_node_ptr.first);
500 for (
const auto & group_node_ptr : weak_groups_to_nodes_associated_with_executor_) {
501 groups.push_back(group_node_ptr.first);
506 std::vector<rclcpp::CallbackGroup::WeakPtr>
509 std::vector<rclcpp::CallbackGroup::WeakPtr> groups;
510 for (
const auto & group_node_ptr : weak_groups_associated_with_executor_to_nodes_) {
511 groups.push_back(group_node_ptr.first);
516 std::vector<rclcpp::CallbackGroup::WeakPtr>
519 std::vector<rclcpp::CallbackGroup::WeakPtr> groups;
520 for (
const auto & group_node_ptr : weak_groups_to_nodes_associated_with_executor_) {
521 groups.push_back(group_node_ptr.first);
A condition that can be waited on in a single wait set and asynchronously triggered.
RCLCPP_PUBLIC std::vector< rclcpp::CallbackGroup::WeakPtr > get_automatically_added_callback_groups_from_nodes()
Get callback groups that belong to executor.
RCLCPP_PUBLIC std::vector< rclcpp::CallbackGroup::WeakPtr > get_manually_added_callback_groups()
Get callback groups that belong to executor.
RCLCPP_PUBLIC bool remove_callback_group_from_map(rclcpp::CallbackGroup::SharedPtr group_ptr, WeakCallbackGroupsToNodesMap &weak_groups_to_nodes)
Remove a callback group from the executor.
RCLCPP_PUBLIC std::shared_ptr< void > take_data() override
Take the data so that it can be consumed with execute.
RCLCPP_PUBLIC void refresh_wait_set(std::chrono::nanoseconds timeout=std::chrono::nanoseconds(-1))
Function to add_handles_to_wait_set and wait for work and.
RCLCPP_PUBLIC size_t get_number_of_ready_guard_conditions() override
Get the number of ready guard_conditions.
RCLCPP_PUBLIC bool add_callback_group(rclcpp::CallbackGroup::SharedPtr group_ptr, rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr)
Add a callback group to an executor.
RCLCPP_PUBLIC bool is_ready(rcl_wait_set_t *wait_set) override
Complete all available queued work without blocking.
RCLCPP_PUBLIC bool remove_callback_group(rclcpp::CallbackGroup::SharedPtr group_ptr)
Remove a callback group from the executor.
RCLCPP_PUBLIC void add_to_wait_set(rcl_wait_set_t *wait_set) override
RCLCPP_PUBLIC void init(rcl_wait_set_t *p_wait_set, rclcpp::memory_strategy::MemoryStrategy::SharedPtr memory_strategy)
Initialize StaticExecutorEntitiesCollector.
RCLCPP_PUBLIC bool add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr)
RCLCPP_PUBLIC void execute(std::shared_ptr< void > &data) override
Execute the waitable.
RCLCPP_PUBLIC bool remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr)
This class contains subscriptionbase, timerbase, etc. which can be used to run callbacks.
Container for subscription's, guard condition's, etc to be waited on.
size_t size_of_guard_conditions
Number of guard_conditions.
const rcl_guard_condition_t ** guard_conditions
Storage for guard condition pointers.
#define RCL_RET_WAIT_SET_EMPTY
Given rcl_wait_set_t is empty return code.
#define RCL_RET_OK
Success return code.
#define RCL_RET_TIMEOUT
Timeout occurred return code.
rmw_ret_t rcl_ret_t
The type that holds an rcl return code.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_wait_set_clear(rcl_wait_set_t *wait_set)
Remove (sets to NULL) all entities in the wait set.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_wait(rcl_wait_set_t *wait_set, int64_t timeout)
Block until the wait set is ready or until the timeout has been exceeded.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_wait_set_resize(rcl_wait_set_t *wait_set, size_t subscriptions_size, size_t guard_conditions_size, size_t timers_size, size_t clients_size, size_t services_size, size_t events_size)
Reallocate space for entities in the wait set.