ROS 2 rclcpp + rcl - humble  humble
ROS 2 C++ Client Library with ROS Client Library
static_executor_entities_collector.cpp
1 // Copyright 2020 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "rclcpp/executors/static_executor_entities_collector.hpp"
16 
17 #include <algorithm>
18 #include <memory>
19 #include <stdexcept>
20 #include <string>
21 #include <utility>
22 #include <vector>
23 
24 #include "rclcpp/memory_strategy.hpp"
25 #include "rclcpp/executors/static_single_threaded_executor.hpp"
26 #include "rclcpp/detail/add_guard_condition_to_rcl_wait_set.hpp"
27 
29 
30 StaticExecutorEntitiesCollector::~StaticExecutorEntitiesCollector()
31 {
32  // Disassociate all callback groups and thus nodes.
33  for (const auto & pair : weak_groups_associated_with_executor_to_nodes_) {
34  auto group = pair.first.lock();
35  if (group) {
36  std::atomic_bool & has_executor = group->get_associated_with_executor_atomic();
37  has_executor.store(false);
38  }
39  }
40  for (const auto & pair : weak_groups_to_nodes_associated_with_executor_) {
41  auto group = pair.first.lock();
42  if (group) {
43  std::atomic_bool & has_executor = group->get_associated_with_executor_atomic();
44  has_executor.store(false);
45  }
46  }
47  // Disassociate all nodes
48  for (const auto & weak_node : weak_nodes_) {
49  auto node = weak_node.lock();
50  if (node) {
51  std::atomic_bool & has_executor = node->get_associated_with_executor_atomic();
52  has_executor.store(false);
53  }
54  }
55  weak_groups_associated_with_executor_to_nodes_.clear();
56  weak_groups_to_nodes_associated_with_executor_.clear();
57  exec_list_.clear();
58  weak_nodes_.clear();
59  weak_nodes_to_guard_conditions_.clear();
60 }
61 
62 void
64  rcl_wait_set_t * p_wait_set,
65  rclcpp::memory_strategy::MemoryStrategy::SharedPtr memory_strategy)
66 {
67  // Empty initialize executable list
69  // Get executor's wait_set_ pointer
70  p_wait_set_ = p_wait_set;
71  // Get executor's memory strategy ptr
72  if (memory_strategy == nullptr) {
73  throw std::runtime_error("Received NULL memory strategy in executor waitable.");
74  }
75  memory_strategy_ = memory_strategy;
76 
77  // Get memory strategy and executable list. Prepare wait_set_
78  std::shared_ptr<void> shared_ptr;
79  execute(shared_ptr);
80 
81  // The entities collector is now initialized
82  initialized_ = true;
83 }
84 
85 void
86 StaticExecutorEntitiesCollector::fini()
87 {
88  memory_strategy_->clear_handles();
89  exec_list_.clear();
90 }
91 
92 std::shared_ptr<void>
94 {
95  return nullptr;
96 }
97 
98 void
99 StaticExecutorEntitiesCollector::execute(std::shared_ptr<void> & data)
100 {
101  (void) data;
102  // Fill memory strategy with entities coming from weak_nodes_
103  fill_memory_strategy();
104  // Fill exec_list_ with entities coming from weak_nodes_ (same as memory strategy)
105  fill_executable_list();
106  // Resize the wait_set_ based on memory_strategy handles (rcl_wait_set_resize)
107  prepare_wait_set();
108  // Add new nodes guard conditions to map
109  std::lock_guard<std::mutex> guard{new_nodes_mutex_};
110  for (const auto & weak_node : new_nodes_) {
111  if (auto node_ptr = weak_node.lock()) {
112  const auto & gc = node_ptr->get_notify_guard_condition();
113  weak_nodes_to_guard_conditions_[node_ptr] = &gc;
114  }
115  }
116  new_nodes_.clear();
117 }
118 
119 void
120 StaticExecutorEntitiesCollector::fill_memory_strategy()
121 {
122  memory_strategy_->clear_handles();
123  bool has_invalid_weak_groups_or_nodes =
124  memory_strategy_->collect_entities(weak_groups_to_nodes_associated_with_executor_);
125  // Clean up any invalid nodes, if they were detected
126  if (has_invalid_weak_groups_or_nodes) {
127  std::vector<rclcpp::CallbackGroup::WeakPtr> invalid_group_ptrs;
128  for (const auto & pair : weak_groups_to_nodes_associated_with_executor_) {
129  auto & weak_group_ptr = pair.first;
130  auto & weak_node_ptr = pair.second;
131  if (weak_group_ptr.expired() || weak_node_ptr.expired()) {
132  invalid_group_ptrs.push_back(weak_group_ptr);
133  }
134  }
135  std::for_each(
136  invalid_group_ptrs.begin(), invalid_group_ptrs.end(),
137  [this](rclcpp::CallbackGroup::WeakPtr group_ptr) {
138  weak_groups_to_nodes_associated_with_executor_.erase(group_ptr);
139  });
140  }
141  has_invalid_weak_groups_or_nodes =
142  memory_strategy_->collect_entities(weak_groups_associated_with_executor_to_nodes_);
143  // Clean up any invalid nodes, if they were detected
144  if (has_invalid_weak_groups_or_nodes) {
145  std::vector<rclcpp::CallbackGroup::WeakPtr> invalid_group_ptrs;
146  for (const auto & pair : weak_groups_associated_with_executor_to_nodes_) {
147  auto & weak_group_ptr = pair.first;
148  const auto & weak_node_ptr = pair.second;
149  if (weak_group_ptr.expired() || weak_node_ptr.expired()) {
150  invalid_group_ptrs.push_back(weak_group_ptr);
151  }
152  }
153  std::for_each(
154  invalid_group_ptrs.begin(), invalid_group_ptrs.end(),
155  [this](rclcpp::CallbackGroup::WeakPtr group_ptr) {
156  weak_groups_associated_with_executor_to_nodes_.erase(group_ptr);
157  });
158  }
159 
160  // Add the static executor waitable to the memory strategy
161  memory_strategy_->add_waitable_handle(this->shared_from_this());
162 }
163 
164 void
165 StaticExecutorEntitiesCollector::fill_executable_list()
166 {
167  exec_list_.clear();
168  add_callback_groups_from_nodes_associated_to_executor();
169  fill_executable_list_from_map(weak_groups_associated_with_executor_to_nodes_);
170  fill_executable_list_from_map(weak_groups_to_nodes_associated_with_executor_);
171  // Add the executor's waitable to the executable list
172  exec_list_.add_waitable(shared_from_this());
173 }
174 void
175 StaticExecutorEntitiesCollector::fill_executable_list_from_map(
176  const rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap &
177  weak_groups_to_nodes)
178 {
179  for (const auto & pair : weak_groups_to_nodes) {
180  auto group = pair.first.lock();
181  auto node = pair.second.lock();
182  if (!node || !group || !group->can_be_taken_from().load()) {
183  continue;
184  }
185  group->find_timer_ptrs_if(
186  [this](const rclcpp::TimerBase::SharedPtr & timer) {
187  if (timer) {
188  exec_list_.add_timer(timer);
189  }
190  return false;
191  });
192  group->find_subscription_ptrs_if(
193  [this](const rclcpp::SubscriptionBase::SharedPtr & subscription) {
194  if (subscription) {
195  exec_list_.add_subscription(subscription);
196  }
197  return false;
198  });
199  group->find_service_ptrs_if(
200  [this](const rclcpp::ServiceBase::SharedPtr & service) {
201  if (service) {
202  exec_list_.add_service(service);
203  }
204  return false;
205  });
206  group->find_client_ptrs_if(
207  [this](const rclcpp::ClientBase::SharedPtr & client) {
208  if (client) {
209  exec_list_.add_client(client);
210  }
211  return false;
212  });
213  group->find_waitable_ptrs_if(
214  [this](const rclcpp::Waitable::SharedPtr & waitable) {
215  if (waitable) {
216  exec_list_.add_waitable(waitable);
217  }
218  return false;
219  });
220  }
221 }
222 
223 void
224 StaticExecutorEntitiesCollector::prepare_wait_set()
225 {
226  // clear wait set
227  if (rcl_wait_set_clear(p_wait_set_) != RCL_RET_OK) {
228  throw std::runtime_error("Couldn't clear wait set");
229  }
230 
231  // The size of waitables are accounted for in size of the other entities
233  p_wait_set_, memory_strategy_->number_of_ready_subscriptions(),
234  memory_strategy_->number_of_guard_conditions(), memory_strategy_->number_of_ready_timers(),
235  memory_strategy_->number_of_ready_clients(), memory_strategy_->number_of_ready_services(),
236  memory_strategy_->number_of_ready_events());
237 
238  if (RCL_RET_OK != ret) {
239  throw std::runtime_error(
240  std::string("Couldn't resize the wait set: ") + rcl_get_error_string().str);
241  }
242 }
243 
244 void
245 StaticExecutorEntitiesCollector::refresh_wait_set(std::chrono::nanoseconds timeout)
246 {
247  // clear wait set (memset to '0' all wait_set_ entities
248  // but keeps the wait_set_ number of entities)
249  if (rcl_wait_set_clear(p_wait_set_) != RCL_RET_OK) {
250  throw std::runtime_error("Couldn't clear wait set");
251  }
252 
253  if (!memory_strategy_->add_handles_to_wait_set(p_wait_set_)) {
254  throw std::runtime_error("Couldn't fill wait set");
255  }
256 
257  rcl_ret_t status =
258  rcl_wait(p_wait_set_, std::chrono::duration_cast<std::chrono::nanoseconds>(timeout).count());
259 
260  if (status == RCL_RET_WAIT_SET_EMPTY) {
261  RCUTILS_LOG_WARN_NAMED(
262  "rclcpp",
263  "empty wait set received in rcl_wait(). This should never happen.");
264  } else if (status != RCL_RET_OK && status != RCL_RET_TIMEOUT) {
265  using rclcpp::exceptions::throw_from_rcl_error;
266  throw_from_rcl_error(status, "rcl_wait() failed");
267  }
268 }
269 
270 void
272 {
273  // Add waitable guard conditions (one for each registered node) into the wait set.
274  for (const auto & pair : weak_nodes_to_guard_conditions_) {
275  auto & gc = pair.second;
276  detail::add_guard_condition_to_rcl_wait_set(*wait_set, *gc);
277  }
278 }
279 
281 {
282  std::lock_guard<std::mutex> guard{new_nodes_mutex_};
283  return weak_nodes_to_guard_conditions_.size() + new_nodes_.size();
284 }
285 
286 bool
288  rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr)
289 {
290  bool is_new_node = false;
291  // If the node already has an executor
292  std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic();
293  if (has_executor.exchange(true)) {
294  throw std::runtime_error("Node has already been added to an executor.");
295  }
296  node_ptr->for_each_callback_group(
297  [this, node_ptr, &is_new_node](rclcpp::CallbackGroup::SharedPtr group_ptr)
298  {
299  if (
300  !group_ptr->get_associated_with_executor_atomic().load() &&
301  group_ptr->automatically_add_to_executor_with_node())
302  {
303  is_new_node = (add_callback_group(
304  group_ptr,
305  node_ptr,
306  weak_groups_to_nodes_associated_with_executor_) ||
307  is_new_node);
308  }
309  });
310  weak_nodes_.push_back(node_ptr);
311  return is_new_node;
312 }
313 
314 bool
316  rclcpp::CallbackGroup::SharedPtr group_ptr,
317  rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
318  rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap & weak_groups_to_nodes)
319 {
320  // If the callback_group already has an executor
321  std::atomic_bool & has_executor = group_ptr->get_associated_with_executor_atomic();
322  if (has_executor.exchange(true)) {
323  throw std::runtime_error("Callback group has already been added to an executor.");
324  }
325  bool is_new_node = !has_node(node_ptr, weak_groups_associated_with_executor_to_nodes_) &&
326  !has_node(node_ptr, weak_groups_to_nodes_associated_with_executor_);
327  rclcpp::CallbackGroup::WeakPtr weak_group_ptr = group_ptr;
328  auto insert_info = weak_groups_to_nodes.insert(
329  std::make_pair(weak_group_ptr, node_ptr));
330  bool was_inserted = insert_info.second;
331  if (!was_inserted) {
332  throw std::runtime_error("Callback group was already added to executor.");
333  }
334  if (is_new_node) {
335  std::lock_guard<std::mutex> guard{new_nodes_mutex_};
336  new_nodes_.push_back(node_ptr);
337  return true;
338  }
339  return false;
340 }
341 
342 bool
344  rclcpp::CallbackGroup::SharedPtr group_ptr,
345  rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr)
346 {
347  return add_callback_group(group_ptr, node_ptr, weak_groups_associated_with_executor_to_nodes_);
348 }
349 
350 bool
352  rclcpp::CallbackGroup::SharedPtr group_ptr)
353 {
354  return this->remove_callback_group_from_map(
355  group_ptr,
356  weak_groups_associated_with_executor_to_nodes_);
357 }
358 
359 bool
361  rclcpp::CallbackGroup::SharedPtr group_ptr,
362  rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap & weak_groups_to_nodes)
363 {
364  rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr;
365  rclcpp::CallbackGroup::WeakPtr weak_group_ptr = group_ptr;
366  auto iter = weak_groups_to_nodes.find(weak_group_ptr);
367  if (iter != weak_groups_to_nodes.end()) {
368  node_ptr = iter->second.lock();
369  if (node_ptr == nullptr) {
370  throw std::runtime_error("Node must not be deleted before its callback group(s).");
371  }
372  weak_groups_to_nodes.erase(iter);
373  } else {
374  throw std::runtime_error("Callback group needs to be associated with executor.");
375  }
376  // If the node was matched and removed, interrupt waiting.
377  if (!has_node(node_ptr, weak_groups_associated_with_executor_to_nodes_) &&
378  !has_node(node_ptr, weak_groups_to_nodes_associated_with_executor_))
379  {
380  rclcpp::node_interfaces::NodeBaseInterface::WeakPtr node_weak_ptr(node_ptr);
381  weak_nodes_to_guard_conditions_.erase(node_weak_ptr);
382  return true;
383  }
384  return false;
385 }
386 
387 bool
389  rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr)
390 {
391  if (!node_ptr->get_associated_with_executor_atomic().load()) {
392  return false;
393  }
394  bool node_found = false;
395  auto node_it = weak_nodes_.begin();
396  while (node_it != weak_nodes_.end()) {
397  bool matched = (node_it->lock() == node_ptr);
398  if (matched) {
399  weak_nodes_.erase(node_it);
400  node_found = true;
401  break;
402  }
403  ++node_it;
404  }
405  if (!node_found) {
406  return false;
407  }
408  std::vector<rclcpp::CallbackGroup::SharedPtr> found_group_ptrs;
409  std::for_each(
410  weak_groups_to_nodes_associated_with_executor_.begin(),
411  weak_groups_to_nodes_associated_with_executor_.end(),
412  [&found_group_ptrs, node_ptr](std::pair<rclcpp::CallbackGroup::WeakPtr,
413  rclcpp::node_interfaces::NodeBaseInterface::WeakPtr> key_value_pair) {
414  auto & weak_node_ptr = key_value_pair.second;
415  auto shared_node_ptr = weak_node_ptr.lock();
416  auto group_ptr = key_value_pair.first.lock();
417  if (shared_node_ptr == node_ptr) {
418  found_group_ptrs.push_back(group_ptr);
419  }
420  });
421  std::for_each(
422  found_group_ptrs.begin(), found_group_ptrs.end(), [this]
423  (rclcpp::CallbackGroup::SharedPtr group_ptr) {
424  this->remove_callback_group_from_map(
425  group_ptr,
426  weak_groups_to_nodes_associated_with_executor_);
427  });
428  std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic();
429  has_executor.store(false);
430  return true;
431 }
432 
433 bool
435 {
436  // Check wait_set guard_conditions for added/removed entities to/from a node
437  for (size_t i = 0; i < p_wait_set->size_of_guard_conditions; ++i) {
438  if (p_wait_set->guard_conditions[i] != NULL) {
439  auto found_guard_condition = std::find_if(
440  weak_nodes_to_guard_conditions_.begin(), weak_nodes_to_guard_conditions_.end(),
441  [&](std::pair<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr,
442  const GuardCondition *> pair) -> bool {
443  const rcl_guard_condition_t & rcl_gc = pair.second->get_rcl_guard_condition();
444  return &rcl_gc == p_wait_set->guard_conditions[i];
445  });
446  if (found_guard_condition != weak_nodes_to_guard_conditions_.end()) {
447  return true;
448  }
449  }
450  }
451  // None of the guard conditions triggered belong to a registered node
452  return false;
453 }
454 
455 // Returns true iff the weak_groups_to_nodes map has node_ptr as the value in any of its entry.
456 bool
457 StaticExecutorEntitiesCollector::has_node(
458  const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
459  const rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap &
460  weak_groups_to_nodes) const
461 {
462  return std::find_if(
463  weak_groups_to_nodes.begin(),
464  weak_groups_to_nodes.end(),
465  [&](const WeakCallbackGroupsToNodesMap::value_type & other) -> bool {
466  auto other_ptr = other.second.lock();
467  return other_ptr == node_ptr;
468  }) != weak_groups_to_nodes.end();
469 }
470 
471 void
472 StaticExecutorEntitiesCollector::add_callback_groups_from_nodes_associated_to_executor()
473 {
474  for (const auto & weak_node : weak_nodes_) {
475  auto node = weak_node.lock();
476  if (node) {
477  node->for_each_callback_group(
478  [this, node](rclcpp::CallbackGroup::SharedPtr shared_group_ptr)
479  {
480  if (shared_group_ptr->automatically_add_to_executor_with_node() &&
481  !shared_group_ptr->get_associated_with_executor_atomic().load())
482  {
483  add_callback_group(
484  shared_group_ptr,
485  node,
486  weak_groups_to_nodes_associated_with_executor_);
487  }
488  });
489  }
490  }
491 }
492 
493 std::vector<rclcpp::CallbackGroup::WeakPtr>
494 StaticExecutorEntitiesCollector::get_all_callback_groups()
495 {
496  std::vector<rclcpp::CallbackGroup::WeakPtr> groups;
497  for (const auto & group_node_ptr : weak_groups_associated_with_executor_to_nodes_) {
498  groups.push_back(group_node_ptr.first);
499  }
500  for (const auto & group_node_ptr : weak_groups_to_nodes_associated_with_executor_) {
501  groups.push_back(group_node_ptr.first);
502  }
503  return groups;
504 }
505 
506 std::vector<rclcpp::CallbackGroup::WeakPtr>
508 {
509  std::vector<rclcpp::CallbackGroup::WeakPtr> groups;
510  for (const auto & group_node_ptr : weak_groups_associated_with_executor_to_nodes_) {
511  groups.push_back(group_node_ptr.first);
512  }
513  return groups;
514 }
515 
516 std::vector<rclcpp::CallbackGroup::WeakPtr>
518 {
519  std::vector<rclcpp::CallbackGroup::WeakPtr> groups;
520  for (const auto & group_node_ptr : weak_groups_to_nodes_associated_with_executor_) {
521  groups.push_back(group_node_ptr.first);
522  }
523  return groups;
524 }
A condition that can be waited on in a single wait set and asynchronously triggered.
RCLCPP_PUBLIC std::vector< rclcpp::CallbackGroup::WeakPtr > get_automatically_added_callback_groups_from_nodes()
Get callback groups that belong to executor.
RCLCPP_PUBLIC std::vector< rclcpp::CallbackGroup::WeakPtr > get_manually_added_callback_groups()
Get callback groups that belong to executor.
RCLCPP_PUBLIC bool remove_callback_group_from_map(rclcpp::CallbackGroup::SharedPtr group_ptr, WeakCallbackGroupsToNodesMap &weak_groups_to_nodes)
Remove a callback group from the executor.
RCLCPP_PUBLIC std::shared_ptr< void > take_data() override
Take the data so that it can be consumed with execute.
RCLCPP_PUBLIC void refresh_wait_set(std::chrono::nanoseconds timeout=std::chrono::nanoseconds(-1))
Function to add_handles_to_wait_set and wait for work and.
RCLCPP_PUBLIC size_t get_number_of_ready_guard_conditions() override
Get the number of ready guard_conditions.
RCLCPP_PUBLIC bool add_callback_group(rclcpp::CallbackGroup::SharedPtr group_ptr, rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr)
Add a callback group to an executor.
RCLCPP_PUBLIC bool is_ready(rcl_wait_set_t *wait_set) override
Complete all available queued work without blocking.
RCLCPP_PUBLIC bool remove_callback_group(rclcpp::CallbackGroup::SharedPtr group_ptr)
Remove a callback group from the executor.
RCLCPP_PUBLIC void add_to_wait_set(rcl_wait_set_t *wait_set) override
RCLCPP_PUBLIC void init(rcl_wait_set_t *p_wait_set, rclcpp::memory_strategy::MemoryStrategy::SharedPtr memory_strategy)
Initialize StaticExecutorEntitiesCollector.
RCLCPP_PUBLIC bool add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr)
RCLCPP_PUBLIC void execute(std::shared_ptr< void > &data) override
Execute the waitable.
RCLCPP_PUBLIC bool remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr)
This class contains subscriptionbase, timerbase, etc. which can be used to run callbacks.
Container for subscription's, guard condition's, etc to be waited on.
Definition: wait.h:42
size_t size_of_guard_conditions
Number of guard_conditions.
Definition: wait.h:50
const rcl_guard_condition_t ** guard_conditions
Storage for guard condition pointers.
Definition: wait.h:48
#define RCL_RET_WAIT_SET_EMPTY
Given rcl_wait_set_t is empty return code.
Definition: types.h:98
#define RCL_RET_OK
Success return code.
Definition: types.h:26
#define RCL_RET_TIMEOUT
Timeout occurred return code.
Definition: types.h:30
rmw_ret_t rcl_ret_t
The type that holds an rcl return code.
Definition: types.h:23
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_wait_set_clear(rcl_wait_set_t *wait_set)
Remove (sets to NULL) all entities in the wait set.
Definition: wait.c:334
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_wait(rcl_wait_set_t *wait_set, int64_t timeout)
Block until the wait set is ready or until the timeout has been exceeded.
Definition: wait.c:522
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_wait_set_resize(rcl_wait_set_t *wait_set, size_t subscriptions_size, size_t guard_conditions_size, size_t timers_size, size_t clients_size, size_t services_size, size_t events_size)
Reallocate space for entities in the wait set.
Definition: wait.c:376