17 #include "rclcpp/executors/executor_entities_collector.hpp"
18 #include "rclcpp/executors/executor_notify_waitable.hpp"
19 #include "rclcpp/node_interfaces/node_base_interface.hpp"
27 std::shared_ptr<ExecutorNotifyWaitable> notify_waitable)
28 : notify_waitable_(notify_waitable)
34 for (
auto weak_node_it = weak_nodes_.begin(); weak_node_it != weak_nodes_.end(); ) {
38 for (
auto weak_group_it = automatically_added_groups_.begin();
39 weak_group_it != automatically_added_groups_.end(); )
44 for (
auto weak_group_it = manually_added_groups_.begin();
45 weak_group_it != manually_added_groups_.end(); )
50 for (
auto weak_node_ptr : pending_added_nodes_) {
51 auto node_ptr = weak_node_ptr.lock();
53 node_ptr->get_associated_with_executor_atomic().store(
false);
56 pending_added_nodes_.clear();
57 pending_removed_nodes_.clear();
59 for (
auto weak_group_ptr : pending_manually_added_groups_) {
60 auto group_ptr = weak_group_ptr.lock();
62 group_ptr->get_associated_with_executor_atomic().store(
false);
65 auto guard_condition_it = weak_groups_to_guard_conditions_.find(weak_group_ptr);
66 if (guard_condition_it != weak_groups_to_guard_conditions_.end()) {
68 weak_groups_to_guard_conditions_.erase(guard_condition_it);
71 pending_manually_added_groups_.clear();
72 pending_manually_removed_groups_.clear();
78 std::lock_guard<std::mutex> lock(
mutex_);
79 return pending_manually_added_groups_.size() != 0 ||
80 pending_manually_removed_groups_.size() != 0 ||
81 pending_added_nodes_.size() != 0 ||
82 pending_removed_nodes_.size() != 0;
89 std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic();
90 if (has_executor.exchange(
true)) {
91 throw std::runtime_error(
92 std::string(
"Node '") + node_ptr->get_fully_qualified_name() +
93 "' has already been added to an executor.");
96 std::lock_guard<std::mutex> lock(
mutex_);
97 bool associated = weak_nodes_.count(node_ptr) != 0;
98 bool add_queued = pending_added_nodes_.count(node_ptr) != 0;
99 bool remove_queued = pending_removed_nodes_.count(node_ptr) != 0;
101 if ((associated || add_queued) && !remove_queued) {
102 throw std::runtime_error(
103 std::string(
"Node '") + node_ptr->get_fully_qualified_name() +
104 "' has already been added to this executor.");
107 this->pending_added_nodes_.insert(node_ptr);
112 rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr)
114 std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic();
115 if (!has_executor.exchange(
false)) {
116 throw std::runtime_error(
117 std::string(
"Node '") + node_ptr->get_fully_qualified_name() +
118 "' needs to be associated with an executor.");
121 std::lock_guard<std::mutex> lock(
mutex_);
122 bool associated = weak_nodes_.count(node_ptr) != 0;
123 bool add_queued = pending_added_nodes_.count(node_ptr) != 0;
124 bool remove_queued = pending_removed_nodes_.count(node_ptr) != 0;
126 if (!(associated || add_queued) || remove_queued) {
127 throw std::runtime_error(
128 std::string(
"Node '") + node_ptr->get_fully_qualified_name() +
129 "' needs to be associated with this executor.");
132 this->pending_removed_nodes_.insert(node_ptr);
138 std::atomic_bool & has_executor = group_ptr->get_associated_with_executor_atomic();
139 if (has_executor.exchange(
true)) {
140 throw std::runtime_error(
"Callback group has already been added to an executor.");
143 std::lock_guard<std::mutex> lock(
mutex_);
144 bool associated = manually_added_groups_.count(group_ptr) != 0;
145 bool add_queued = pending_manually_added_groups_.count(group_ptr) != 0;
146 bool remove_queued = pending_manually_removed_groups_.count(group_ptr) != 0;
148 if ((associated || add_queued) && !remove_queued) {
149 throw std::runtime_error(
"Callback group has already been added to this executor.");
152 this->pending_manually_added_groups_.insert(group_ptr);
155 auto group_guard_condition = group_ptr->get_notify_guard_condition();
156 weak_groups_to_guard_conditions_.insert({group_ptr, group_guard_condition});
163 if (!group_ptr->get_associated_with_executor_atomic().load()) {
164 throw std::runtime_error(
"Callback group needs to be associated with an executor.");
176 auto weak_group_ptr = rclcpp::CallbackGroup::WeakPtr(group_ptr);
177 std::lock_guard<std::mutex> lock(
mutex_);
178 bool associated = manually_added_groups_.count(group_ptr) != 0;
179 bool add_queued = pending_manually_added_groups_.count(group_ptr) != 0;
180 bool remove_queued = pending_manually_removed_groups_.count(group_ptr) != 0;
182 if (!(associated || add_queued) || remove_queued) {
183 throw std::runtime_error(
"Callback group needs to be associated with this executor.");
186 this->pending_manually_removed_groups_.insert(group_ptr);
189 std::vector<rclcpp::CallbackGroup::WeakPtr>
192 std::vector<rclcpp::CallbackGroup::WeakPtr> groups;
193 std::lock_guard<std::mutex> lock(
mutex_);
194 for (
const auto & group_ptr : manually_added_groups_) {
195 groups.push_back(group_ptr);
197 for (
auto const & group_ptr : automatically_added_groups_) {
198 groups.push_back(group_ptr);
203 std::vector<rclcpp::CallbackGroup::WeakPtr>
206 std::vector<rclcpp::CallbackGroup::WeakPtr> groups;
207 std::lock_guard<std::mutex> lock(
mutex_);
208 for (
const auto & group_ptr : manually_added_groups_) {
209 groups.push_back(group_ptr);
214 std::vector<rclcpp::CallbackGroup::WeakPtr>
217 std::vector<rclcpp::CallbackGroup::WeakPtr> groups;
218 std::lock_guard<std::mutex> lock(
mutex_);
219 for (
auto const & group_ptr : automatically_added_groups_) {
220 groups.push_back(group_ptr);
228 std::lock_guard<std::mutex> lock(
mutex_);
234 ExecutorEntitiesCollector::NodeCollection::iterator
238 auto guard_condition_it = weak_nodes_to_guard_conditions_.find(*weak_node);
239 if (guard_condition_it != weak_nodes_to_guard_conditions_.end()) {
241 weak_nodes_to_guard_conditions_.erase(guard_condition_it);
245 auto node_ptr = weak_node->lock();
247 std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic();
248 has_executor.store(
false);
252 return weak_nodes_.erase(weak_node);
255 ExecutorEntitiesCollector::CallbackGroupCollection::iterator
257 CallbackGroupCollection::iterator weak_group_it,
258 CallbackGroupCollection & collection
262 auto guard_condition_it = weak_groups_to_guard_conditions_.find(*weak_group_it);
263 if (guard_condition_it != weak_groups_to_guard_conditions_.end()) {
265 weak_groups_to_guard_conditions_.erase(guard_condition_it);
269 auto group_ptr = weak_group_it->lock();
281 std::atomic_bool & has_executor = group_ptr->get_associated_with_executor_atomic();
282 has_executor.store(
false);
286 return collection.erase(weak_group_it);
291 rclcpp::CallbackGroup::SharedPtr group_ptr,
292 CallbackGroupCollection & collection)
294 auto iter = collection.insert(group_ptr);
295 if (iter.second ==
false) {
296 throw std::runtime_error(
"Callback group has already been added to this executor.");
300 auto group_guard_condition = group_ptr->get_notify_guard_condition();
301 weak_groups_to_guard_conditions_.insert({group_ptr, group_guard_condition});
308 for (
auto weak_node_ptr : pending_added_nodes_) {
309 auto node_ptr = weak_node_ptr.lock();
313 weak_nodes_.insert(weak_node_ptr);
317 auto node_guard_condition = node_ptr->get_shared_notify_guard_condition();
318 weak_nodes_to_guard_conditions_.insert({weak_node_ptr, node_guard_condition});
321 pending_added_nodes_.clear();
323 for (
auto weak_node_ptr : pending_removed_nodes_) {
324 auto node_it = weak_nodes_.find(weak_node_ptr);
325 if (node_it != weak_nodes_.end()) {
330 if (!weak_node_ptr.expired()) {
331 throw std::runtime_error(
"Node needs to be associated with this executor.");
335 auto node_ptr = weak_node_ptr.lock();
337 for (
auto group_it = automatically_added_groups_.begin();
338 group_it != automatically_added_groups_.end(); )
340 auto group_ptr = group_it->lock();
341 if (node_ptr->callback_group_in_node(group_ptr)) {
349 pending_removed_nodes_.clear();
351 for (
auto weak_group_ptr : pending_manually_added_groups_) {
352 auto group_ptr = weak_group_ptr.lock();
357 auto guard_condition_it = weak_groups_to_guard_conditions_.find(weak_group_ptr);
358 if (guard_condition_it != weak_groups_to_guard_conditions_.end()) {
360 weak_groups_to_guard_conditions_.erase(guard_condition_it);
364 pending_manually_added_groups_.clear();
366 for (
auto weak_group_ptr : pending_manually_removed_groups_) {
367 auto group_ptr = weak_group_ptr.lock();
369 auto group_it = manually_added_groups_.find(group_ptr);
370 if (group_it != manually_added_groups_.end()) {
373 throw std::runtime_error(
374 "Attempting to remove a callback group not added to this executor.");
378 pending_manually_removed_groups_.clear();
383 const NodeCollection & nodes_to_check)
385 for (
auto & weak_node : nodes_to_check) {
386 auto node = weak_node.lock();
388 node->for_each_callback_group(
389 [
this, node](rclcpp::CallbackGroup::SharedPtr group_ptr)
391 if (!group_ptr->get_associated_with_executor_atomic().load() &&
392 group_ptr->automatically_add_to_executor_with_node())
394 std::atomic_bool & has_executor = group_ptr->get_associated_with_executor_atomic();
395 if (has_executor.exchange(true)) {
396 throw std::runtime_error(
"Callback group has already been added to an executor.");
398 this->add_callback_group_to_collection(group_ptr, this->automatically_added_groups_);
406 ExecutorEntitiesCollector::prune_invalid_nodes_and_groups()
408 for (
auto node_it = weak_nodes_.begin();
409 node_it != weak_nodes_.end(); )
411 if (node_it->expired()) {
412 node_it = remove_weak_node(node_it);
417 for (
auto group_it = automatically_added_groups_.begin();
418 group_it != automatically_added_groups_.end(); )
420 if (group_it->expired()) {
421 group_it = remove_weak_callback_group(group_it, automatically_added_groups_);
426 for (
auto group_it = manually_added_groups_.begin();
427 group_it != manually_added_groups_.end(); )
429 if (group_it->expired()) {
430 group_it = remove_weak_callback_group(group_it, manually_added_groups_);
RCLCPP_PUBLIC NodeCollection::iterator remove_weak_node(NodeCollection::iterator weak_node) RCPPUTILS_TSA_REQUIRES(mutex_)
Implementation of removing a node from the collector.
RCLCPP_PUBLIC void add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr)
Add a node to the entity collector.
RCLCPP_PUBLIC void prune_invalid_nodes_and_groups() RCPPUTILS_TSA_REQUIRES(mutex_)
Check all nodes and group for expired weak pointers and remove them.
std::shared_ptr< ExecutorNotifyWaitable > notify_waitable_
Waitable to add guard conditions to.
RCLCPP_PUBLIC void update_collections()
Update the underlying collections.
RCLCPP_PUBLIC void remove_callback_group(rclcpp::CallbackGroup::SharedPtr group_ptr)
Remove a callback group from the entity collector.
RCLCPP_PUBLIC std::vector< rclcpp::CallbackGroup::WeakPtr > get_all_callback_groups() const
Get all callback groups known to this entity collector.
bool has_pending() const
Indicate if the entities collector has pending additions or removals.
RCLCPP_PUBLIC void process_queues() RCPPUTILS_TSA_REQUIRES(mutex_)
Iterate over queued added/remove nodes and callback_groups.
RCLCPP_PUBLIC ExecutorEntitiesCollector(std::shared_ptr< ExecutorNotifyWaitable > notify_waitable)
Constructor.
RCLCPP_PUBLIC ~ExecutorEntitiesCollector()
Destructor.
std::mutex mutex_
mutex to protect collections and pending queues
RCLCPP_PUBLIC void add_automatically_associated_callback_groups(const NodeCollection &nodes_to_check) RCPPUTILS_TSA_REQUIRES(mutex_)
RCLCPP_PUBLIC std::vector< rclcpp::CallbackGroup::WeakPtr > get_automatically_added_callback_groups() const
Get automatically-added callback groups known to this entity collector.
RCLCPP_PUBLIC CallbackGroupCollection::iterator remove_weak_callback_group(CallbackGroupCollection::iterator weak_group_it, CallbackGroupCollection &collection) RCPPUTILS_TSA_REQUIRES(mutex_)
Implementation of removing a callback group from the collector.
RCLCPP_PUBLIC void add_callback_group_to_collection(rclcpp::CallbackGroup::SharedPtr group_ptr, CallbackGroupCollection &collection) RCPPUTILS_TSA_REQUIRES(mutex_)
Implementation of adding a callback group.
RCLCPP_PUBLIC void remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr)
Remove a node from the entity collector.
RCLCPP_PUBLIC std::vector< rclcpp::CallbackGroup::WeakPtr > get_manually_added_callback_groups() const
Get manually-added callback groups known to this entity collector.
RCLCPP_PUBLIC void add_callback_group(rclcpp::CallbackGroup::SharedPtr group_ptr)
Add a callback group to the entity collector.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.