ROS 2 rclcpp + rcl - rolling  rolling-a919a6e5
ROS 2 C++ Client Library with ROS Client Library
executor_entities_collector.cpp
1 // Copyright 2023 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <set>
16 
17 #include "rclcpp/executors/executor_entities_collector.hpp"
18 #include "rclcpp/executors/executor_notify_waitable.hpp"
19 #include "rclcpp/node_interfaces/node_base_interface.hpp"
20 
21 namespace rclcpp
22 {
23 namespace executors
24 {
25 
27  std::shared_ptr<ExecutorNotifyWaitable> notify_waitable)
28 : notify_waitable_(notify_waitable)
29 {
30 }
31 
33 {
34  for (auto weak_node_it = weak_nodes_.begin(); weak_node_it != weak_nodes_.end(); ) {
35  weak_node_it = remove_weak_node(weak_node_it);
36  }
37 
38  for (auto weak_group_it = automatically_added_groups_.begin();
39  weak_group_it != automatically_added_groups_.end(); )
40  {
41  weak_group_it = remove_weak_callback_group(weak_group_it, automatically_added_groups_);
42  }
43 
44  for (auto weak_group_it = manually_added_groups_.begin();
45  weak_group_it != manually_added_groups_.end(); )
46  {
47  weak_group_it = remove_weak_callback_group(weak_group_it, manually_added_groups_);
48  }
49 
50  for (auto weak_node_ptr : pending_added_nodes_) {
51  auto node_ptr = weak_node_ptr.lock();
52  if (node_ptr) {
53  node_ptr->get_associated_with_executor_atomic().store(false);
54  }
55  }
56  pending_added_nodes_.clear();
57  pending_removed_nodes_.clear();
58 
59  for (auto weak_group_ptr : pending_manually_added_groups_) {
60  auto group_ptr = weak_group_ptr.lock();
61  if (group_ptr) {
62  group_ptr->get_associated_with_executor_atomic().store(false);
63  }
64  // Disassociate the guard condition from the executor notify waitable
65  auto guard_condition_it = weak_groups_to_guard_conditions_.find(weak_group_ptr);
66  if (guard_condition_it != weak_groups_to_guard_conditions_.end()) {
67  this->notify_waitable_->remove_guard_condition(guard_condition_it->second);
68  weak_groups_to_guard_conditions_.erase(guard_condition_it);
69  }
70  }
71  pending_manually_added_groups_.clear();
72  pending_manually_removed_groups_.clear();
73 }
74 
75 bool
77 {
78  std::lock_guard<std::mutex> lock(mutex_);
79  return pending_manually_added_groups_.size() != 0 ||
80  pending_manually_removed_groups_.size() != 0 ||
81  pending_added_nodes_.size() != 0 ||
82  pending_removed_nodes_.size() != 0;
83 }
84 
85 void
86 ExecutorEntitiesCollector::add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr)
87 {
88  // If the node already has an executor
89  std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic();
90  if (has_executor.exchange(true)) {
91  throw std::runtime_error(
92  std::string("Node '") + node_ptr->get_fully_qualified_name() +
93  "' has already been added to an executor.");
94  }
95 
96  std::lock_guard<std::mutex> lock(mutex_);
97  bool associated = weak_nodes_.count(node_ptr) != 0;
98  bool add_queued = pending_added_nodes_.count(node_ptr) != 0;
99  bool remove_queued = pending_removed_nodes_.count(node_ptr) != 0;
100 
101  if ((associated || add_queued) && !remove_queued) {
102  throw std::runtime_error(
103  std::string("Node '") + node_ptr->get_fully_qualified_name() +
104  "' has already been added to this executor.");
105  }
106 
107  this->pending_added_nodes_.insert(node_ptr);
108 }
109 
110 void
112  rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr)
113 {
114  std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic();
115  if (!has_executor.exchange(false)) {
116  throw std::runtime_error(
117  std::string("Node '") + node_ptr->get_fully_qualified_name() +
118  "' needs to be associated with an executor.");
119  }
120 
121  std::lock_guard<std::mutex> lock(mutex_);
122  bool associated = weak_nodes_.count(node_ptr) != 0;
123  bool add_queued = pending_added_nodes_.count(node_ptr) != 0;
124  bool remove_queued = pending_removed_nodes_.count(node_ptr) != 0;
125 
126  if (!(associated || add_queued) || remove_queued) {
127  throw std::runtime_error(
128  std::string("Node '") + node_ptr->get_fully_qualified_name() +
129  "' needs to be associated with this executor.");
130  }
131 
132  this->pending_removed_nodes_.insert(node_ptr);
133 }
134 
135 void
136 ExecutorEntitiesCollector::add_callback_group(rclcpp::CallbackGroup::SharedPtr group_ptr)
137 {
138  std::atomic_bool & has_executor = group_ptr->get_associated_with_executor_atomic();
139  if (has_executor.exchange(true)) {
140  throw std::runtime_error("Callback group has already been added to an executor.");
141  }
142 
143  std::lock_guard<std::mutex> lock(mutex_);
144  bool associated = manually_added_groups_.count(group_ptr) != 0;
145  bool add_queued = pending_manually_added_groups_.count(group_ptr) != 0;
146  bool remove_queued = pending_manually_removed_groups_.count(group_ptr) != 0;
147 
148  if ((associated || add_queued) && !remove_queued) {
149  throw std::runtime_error("Callback group has already been added to this executor.");
150  }
151 
152  this->pending_manually_added_groups_.insert(group_ptr);
153 
154  // Store callback group notify guard condition in map and add it to the notify waitable
155  auto group_guard_condition = group_ptr->get_notify_guard_condition();
156  weak_groups_to_guard_conditions_.insert({group_ptr, group_guard_condition});
157  this->notify_waitable_->add_guard_condition(group_guard_condition);
158 }
159 
160 void
161 ExecutorEntitiesCollector::remove_callback_group(rclcpp::CallbackGroup::SharedPtr group_ptr)
162 {
163  if (!group_ptr->get_associated_with_executor_atomic().load()) {
164  throw std::runtime_error("Callback group needs to be associated with an executor.");
165  }
176  auto weak_group_ptr = rclcpp::CallbackGroup::WeakPtr(group_ptr);
177  std::lock_guard<std::mutex> lock(mutex_);
178  bool associated = manually_added_groups_.count(group_ptr) != 0;
179  bool add_queued = pending_manually_added_groups_.count(group_ptr) != 0;
180  bool remove_queued = pending_manually_removed_groups_.count(group_ptr) != 0;
181 
182  if (!(associated || add_queued) || remove_queued) {
183  throw std::runtime_error("Callback group needs to be associated with this executor.");
184  }
185 
186  this->pending_manually_removed_groups_.insert(group_ptr);
187 }
188 
189 std::vector<rclcpp::CallbackGroup::WeakPtr>
191 {
192  std::vector<rclcpp::CallbackGroup::WeakPtr> groups;
193  std::lock_guard<std::mutex> lock(mutex_);
194  for (const auto & group_ptr : manually_added_groups_) {
195  groups.push_back(group_ptr);
196  }
197  for (auto const & group_ptr : automatically_added_groups_) {
198  groups.push_back(group_ptr);
199  }
200  return groups;
201 }
202 
203 std::vector<rclcpp::CallbackGroup::WeakPtr>
205 {
206  std::vector<rclcpp::CallbackGroup::WeakPtr> groups;
207  std::lock_guard<std::mutex> lock(mutex_);
208  for (const auto & group_ptr : manually_added_groups_) {
209  groups.push_back(group_ptr);
210  }
211  return groups;
212 }
213 
214 std::vector<rclcpp::CallbackGroup::WeakPtr>
216 {
217  std::vector<rclcpp::CallbackGroup::WeakPtr> groups;
218  std::lock_guard<std::mutex> lock(mutex_);
219  for (auto const & group_ptr : automatically_added_groups_) {
220  groups.push_back(group_ptr);
221  }
222  return groups;
223 }
224 
225 void
227 {
228  std::lock_guard<std::mutex> lock(mutex_);
229  this->process_queues();
230  this->add_automatically_associated_callback_groups(this->weak_nodes_);
232 }
233 
234 ExecutorEntitiesCollector::NodeCollection::iterator
235 ExecutorEntitiesCollector::remove_weak_node(NodeCollection::iterator weak_node)
236 {
237  // Disassociate the guard condition from the executor notify waitable
238  auto guard_condition_it = weak_nodes_to_guard_conditions_.find(*weak_node);
239  if (guard_condition_it != weak_nodes_to_guard_conditions_.end()) {
240  this->notify_waitable_->remove_guard_condition(guard_condition_it->second);
241  weak_nodes_to_guard_conditions_.erase(guard_condition_it);
242  }
243 
244  // Mark the node as disassociated (if the node is still valid)
245  auto node_ptr = weak_node->lock();
246  if (node_ptr) {
247  std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic();
248  has_executor.store(false);
249  }
250 
251  // Remove the node from tracked nodes
252  return weak_nodes_.erase(weak_node);
253 }
254 
255 ExecutorEntitiesCollector::CallbackGroupCollection::iterator
257  CallbackGroupCollection::iterator weak_group_it,
258  CallbackGroupCollection & collection
259 )
260 {
261  // Disassociate the guard condition from the executor notify waitable
262  auto guard_condition_it = weak_groups_to_guard_conditions_.find(*weak_group_it);
263  if (guard_condition_it != weak_groups_to_guard_conditions_.end()) {
264  this->notify_waitable_->remove_guard_condition(guard_condition_it->second);
265  weak_groups_to_guard_conditions_.erase(guard_condition_it);
266  }
267 
268  // Mark the node as disassociated (if the group is still valid)
269  auto group_ptr = weak_group_it->lock();
270  if (group_ptr) {
281  std::atomic_bool & has_executor = group_ptr->get_associated_with_executor_atomic();
282  has_executor.store(false);
283  }
284 
285  // Remove the node from tracked nodes
286  return collection.erase(weak_group_it);
287 }
288 
289 void
291  rclcpp::CallbackGroup::SharedPtr group_ptr,
292  CallbackGroupCollection & collection)
293 {
294  auto iter = collection.insert(group_ptr);
295  if (iter.second == false) {
296  throw std::runtime_error("Callback group has already been added to this executor.");
297  }
298 
299  // Store node guard condition in map and add it to the notify waitable
300  auto group_guard_condition = group_ptr->get_notify_guard_condition();
301  weak_groups_to_guard_conditions_.insert({group_ptr, group_guard_condition});
302  this->notify_waitable_->add_guard_condition(group_guard_condition);
303 }
304 
305 void
307 {
308  for (auto weak_node_ptr : pending_added_nodes_) {
309  auto node_ptr = weak_node_ptr.lock();
310  if (!node_ptr) {
311  continue;
312  }
313  weak_nodes_.insert(weak_node_ptr);
314  this->add_automatically_associated_callback_groups({weak_node_ptr});
315 
316  // Store node guard condition in map and add it to the notify waitable
317  auto node_guard_condition = node_ptr->get_shared_notify_guard_condition();
318  weak_nodes_to_guard_conditions_.insert({weak_node_ptr, node_guard_condition});
319  this->notify_waitable_->add_guard_condition(node_guard_condition);
320  }
321  pending_added_nodes_.clear();
322 
323  for (auto weak_node_ptr : pending_removed_nodes_) {
324  auto node_it = weak_nodes_.find(weak_node_ptr);
325  if (node_it != weak_nodes_.end()) {
326  remove_weak_node(node_it);
327  } else {
328  // The node may have been destroyed and removed from the colletion before
329  // we processed the queues. Don't throw if the pointer is already expired.
330  if (!weak_node_ptr.expired()) {
331  throw std::runtime_error("Node needs to be associated with this executor.");
332  }
333  }
334 
335  auto node_ptr = weak_node_ptr.lock();
336  if (node_ptr) {
337  for (auto group_it = automatically_added_groups_.begin();
338  group_it != automatically_added_groups_.end(); )
339  {
340  auto group_ptr = group_it->lock();
341  if (node_ptr->callback_group_in_node(group_ptr)) {
342  group_it = remove_weak_callback_group(group_it, automatically_added_groups_);
343  } else {
344  ++group_it;
345  }
346  }
347  }
348  }
349  pending_removed_nodes_.clear();
350 
351  for (auto weak_group_ptr : pending_manually_added_groups_) {
352  auto group_ptr = weak_group_ptr.lock();
353  if (group_ptr) {
354  this->add_callback_group_to_collection(group_ptr, manually_added_groups_);
355  } else {
356  // Disassociate the guard condition from the executor notify waitable
357  auto guard_condition_it = weak_groups_to_guard_conditions_.find(weak_group_ptr);
358  if (guard_condition_it != weak_groups_to_guard_conditions_.end()) {
359  this->notify_waitable_->remove_guard_condition(guard_condition_it->second);
360  weak_groups_to_guard_conditions_.erase(guard_condition_it);
361  }
362  }
363  }
364  pending_manually_added_groups_.clear();
365 
366  for (auto weak_group_ptr : pending_manually_removed_groups_) {
367  auto group_ptr = weak_group_ptr.lock();
368  if (group_ptr) {
369  auto group_it = manually_added_groups_.find(group_ptr);
370  if (group_it != manually_added_groups_.end()) {
371  remove_weak_callback_group(group_it, manually_added_groups_);
372  } else {
373  throw std::runtime_error(
374  "Attempting to remove a callback group not added to this executor.");
375  }
376  }
377  }
378  pending_manually_removed_groups_.clear();
379 }
380 
381 void
383  const NodeCollection & nodes_to_check)
384 {
385  for (auto & weak_node : nodes_to_check) {
386  auto node = weak_node.lock();
387  if (node) {
388  node->for_each_callback_group(
389  [this, node](rclcpp::CallbackGroup::SharedPtr group_ptr)
390  {
391  if (!group_ptr->get_associated_with_executor_atomic().load() &&
392  group_ptr->automatically_add_to_executor_with_node())
393  {
394  std::atomic_bool & has_executor = group_ptr->get_associated_with_executor_atomic();
395  if (has_executor.exchange(true)) {
396  throw std::runtime_error("Callback group has already been added to an executor.");
397  }
398  this->add_callback_group_to_collection(group_ptr, this->automatically_added_groups_);
399  }
400  });
401  }
402  }
403 }
404 
405 void
406 ExecutorEntitiesCollector::prune_invalid_nodes_and_groups()
407 {
408  for (auto node_it = weak_nodes_.begin();
409  node_it != weak_nodes_.end(); )
410  {
411  if (node_it->expired()) {
412  node_it = remove_weak_node(node_it);
413  } else {
414  node_it++;
415  }
416  }
417  for (auto group_it = automatically_added_groups_.begin();
418  group_it != automatically_added_groups_.end(); )
419  {
420  if (group_it->expired()) {
421  group_it = remove_weak_callback_group(group_it, automatically_added_groups_);
422  } else {
423  group_it++;
424  }
425  }
426  for (auto group_it = manually_added_groups_.begin();
427  group_it != manually_added_groups_.end(); )
428  {
429  if (group_it->expired()) {
430  group_it = remove_weak_callback_group(group_it, manually_added_groups_);
431  } else {
432  group_it++;
433  }
434  }
435 }
436 
437 } // namespace executors
438 } // namespace rclcpp
RCLCPP_PUBLIC NodeCollection::iterator remove_weak_node(NodeCollection::iterator weak_node) RCPPUTILS_TSA_REQUIRES(mutex_)
Implementation of removing a node from the collector.
RCLCPP_PUBLIC void add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr)
Add a node to the entity collector.
RCLCPP_PUBLIC void prune_invalid_nodes_and_groups() RCPPUTILS_TSA_REQUIRES(mutex_)
Check all nodes and group for expired weak pointers and remove them.
std::shared_ptr< ExecutorNotifyWaitable > notify_waitable_
Waitable to add guard conditions to.
RCLCPP_PUBLIC void update_collections()
Update the underlying collections.
RCLCPP_PUBLIC void remove_callback_group(rclcpp::CallbackGroup::SharedPtr group_ptr)
Remove a callback group from the entity collector.
RCLCPP_PUBLIC std::vector< rclcpp::CallbackGroup::WeakPtr > get_all_callback_groups() const
Get all callback groups known to this entity collector.
bool has_pending() const
Indicate if the entities collector has pending additions or removals.
RCLCPP_PUBLIC void process_queues() RCPPUTILS_TSA_REQUIRES(mutex_)
Iterate over queued added/remove nodes and callback_groups.
RCLCPP_PUBLIC ExecutorEntitiesCollector(std::shared_ptr< ExecutorNotifyWaitable > notify_waitable)
Constructor.
std::mutex mutex_
mutex to protect collections and pending queues
RCLCPP_PUBLIC void add_automatically_associated_callback_groups(const NodeCollection &nodes_to_check) RCPPUTILS_TSA_REQUIRES(mutex_)
RCLCPP_PUBLIC std::vector< rclcpp::CallbackGroup::WeakPtr > get_automatically_added_callback_groups() const
Get automatically-added callback groups known to this entity collector.
RCLCPP_PUBLIC CallbackGroupCollection::iterator remove_weak_callback_group(CallbackGroupCollection::iterator weak_group_it, CallbackGroupCollection &collection) RCPPUTILS_TSA_REQUIRES(mutex_)
Implementation of removing a callback group from the collector.
RCLCPP_PUBLIC void add_callback_group_to_collection(rclcpp::CallbackGroup::SharedPtr group_ptr, CallbackGroupCollection &collection) RCPPUTILS_TSA_REQUIRES(mutex_)
Implementation of adding a callback group.
RCLCPP_PUBLIC void remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr)
Remove a node from the entity collector.
RCLCPP_PUBLIC std::vector< rclcpp::CallbackGroup::WeakPtr > get_manually_added_callback_groups() const
Get manually-added callback groups known to this entity collector.
RCLCPP_PUBLIC void add_callback_group(rclcpp::CallbackGroup::SharedPtr group_ptr)
Add a callback group to the entity collector.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.