ROS 2 rclcpp + rcl - humble  humble
ROS 2 C++ Client Library with ROS Client Library
callback_group.cpp
1 // Copyright 2015 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <algorithm>
16 #include <atomic>
17 #include <functional>
18 #include <memory>
19 #include <mutex>
20 #include <stdexcept>
21 
22 #include "rclcpp/callback_group.hpp"
23 #include "rclcpp/client.hpp"
24 #include "rclcpp/service.hpp"
25 #include "rclcpp/subscription_base.hpp"
26 #include "rclcpp/timer.hpp"
27 #include "rclcpp/waitable.hpp"
28 
30 using rclcpp::CallbackGroupType;
31 
32 CallbackGroup::CallbackGroup(
33  CallbackGroupType group_type,
34  bool automatically_add_to_executor_with_node)
35 : type_(group_type), associated_with_executor_(false),
36  can_be_taken_from_(true),
37  automatically_add_to_executor_with_node_(automatically_add_to_executor_with_node)
38 {}
39 
41 {
43 }
44 
45 std::atomic_bool &
46 CallbackGroup::can_be_taken_from()
47 {
48  return can_be_taken_from_;
49 }
50 
51 const CallbackGroupType &
52 CallbackGroup::type() const
53 {
54  return type_;
55 }
56 
57 void CallbackGroup::collect_all_ptrs(
58  std::function<void(const rclcpp::SubscriptionBase::SharedPtr &)> sub_func,
59  std::function<void(const rclcpp::ServiceBase::SharedPtr &)> service_func,
60  std::function<void(const rclcpp::ClientBase::SharedPtr &)> client_func,
61  std::function<void(const rclcpp::TimerBase::SharedPtr &)> timer_func,
62  std::function<void(const rclcpp::Waitable::SharedPtr &)> waitable_func) const
63 {
64  std::lock_guard<std::mutex> lock(mutex_);
65 
66  for (const rclcpp::SubscriptionBase::WeakPtr & weak_ptr : subscription_ptrs_) {
67  rclcpp::SubscriptionBase::SharedPtr ref_ptr = weak_ptr.lock();
68  if (ref_ptr) {
69  sub_func(ref_ptr);
70  }
71  }
72 
73  for (const rclcpp::ServiceBase::WeakPtr & weak_ptr : service_ptrs_) {
74  rclcpp::ServiceBase::SharedPtr ref_ptr = weak_ptr.lock();
75  if (ref_ptr) {
76  service_func(ref_ptr);
77  }
78  }
79 
80  for (const rclcpp::ClientBase::WeakPtr & weak_ptr : client_ptrs_) {
81  rclcpp::ClientBase::SharedPtr ref_ptr = weak_ptr.lock();
82  if (ref_ptr) {
83  client_func(ref_ptr);
84  }
85  }
86 
87  for (const rclcpp::TimerBase::WeakPtr & weak_ptr : timer_ptrs_) {
88  rclcpp::TimerBase::SharedPtr ref_ptr = weak_ptr.lock();
89  if (ref_ptr) {
90  timer_func(ref_ptr);
91  }
92  }
93 
94  for (const rclcpp::Waitable::WeakPtr & weak_ptr : waitable_ptrs_) {
95  rclcpp::Waitable::SharedPtr ref_ptr = weak_ptr.lock();
96  if (ref_ptr) {
97  waitable_func(ref_ptr);
98  }
99  }
100 }
101 
102 std::atomic_bool &
104 {
105  return associated_with_executor_;
106 }
107 
108 bool
110 {
111  return automatically_add_to_executor_with_node_;
112 }
113 
114 rclcpp::GuardCondition::SharedPtr
115 CallbackGroup::get_notify_guard_condition(const rclcpp::Context::SharedPtr context_ptr)
116 {
117  std::lock_guard<std::recursive_mutex> lock(notify_guard_condition_mutex_);
118  if (notify_guard_condition_ && context_ptr != notify_guard_condition_->get_context()) {
119  if (associated_with_executor_) {
121  }
122  notify_guard_condition_ = nullptr;
123  }
124 
125  if (!notify_guard_condition_) {
126  notify_guard_condition_ = std::make_shared<rclcpp::GuardCondition>(context_ptr);
127  }
128 
129  return notify_guard_condition_;
130 }
131 
132 void
134 {
135  std::lock_guard<std::recursive_mutex> lock(notify_guard_condition_mutex_);
136  if (notify_guard_condition_) {
137  notify_guard_condition_->trigger();
138  }
139 }
140 
141 void
142 CallbackGroup::add_subscription(
143  const rclcpp::SubscriptionBase::SharedPtr subscription_ptr)
144 {
145  std::lock_guard<std::mutex> lock(mutex_);
146  subscription_ptrs_.push_back(subscription_ptr);
147  subscription_ptrs_.erase(
148  std::remove_if(
149  subscription_ptrs_.begin(),
150  subscription_ptrs_.end(),
151  [](rclcpp::SubscriptionBase::WeakPtr x) {return x.expired();}),
152  subscription_ptrs_.end());
153 }
154 
155 void
156 CallbackGroup::add_timer(const rclcpp::TimerBase::SharedPtr timer_ptr)
157 {
158  std::lock_guard<std::mutex> lock(mutex_);
159  timer_ptrs_.push_back(timer_ptr);
160  timer_ptrs_.erase(
161  std::remove_if(
162  timer_ptrs_.begin(),
163  timer_ptrs_.end(),
164  [](rclcpp::TimerBase::WeakPtr x) {return x.expired();}),
165  timer_ptrs_.end());
166 }
167 
168 void
169 CallbackGroup::add_service(const rclcpp::ServiceBase::SharedPtr service_ptr)
170 {
171  std::lock_guard<std::mutex> lock(mutex_);
172  service_ptrs_.push_back(service_ptr);
173  service_ptrs_.erase(
174  std::remove_if(
175  service_ptrs_.begin(),
176  service_ptrs_.end(),
177  [](rclcpp::ServiceBase::WeakPtr x) {return x.expired();}),
178  service_ptrs_.end());
179 }
180 
181 void
182 CallbackGroup::add_client(const rclcpp::ClientBase::SharedPtr client_ptr)
183 {
184  std::lock_guard<std::mutex> lock(mutex_);
185  client_ptrs_.push_back(client_ptr);
186  client_ptrs_.erase(
187  std::remove_if(
188  client_ptrs_.begin(),
189  client_ptrs_.end(),
190  [](rclcpp::ClientBase::WeakPtr x) {return x.expired();}),
191  client_ptrs_.end());
192 }
193 
194 void
195 CallbackGroup::add_waitable(const rclcpp::Waitable::SharedPtr waitable_ptr)
196 {
197  std::lock_guard<std::mutex> lock(mutex_);
198  waitable_ptrs_.push_back(waitable_ptr);
199  waitable_ptrs_.erase(
200  std::remove_if(
201  waitable_ptrs_.begin(),
202  waitable_ptrs_.end(),
203  [](rclcpp::Waitable::WeakPtr x) {return x.expired();}),
204  waitable_ptrs_.end());
205 }
206 
207 void
208 CallbackGroup::remove_waitable(const rclcpp::Waitable::SharedPtr waitable_ptr) noexcept
209 {
210  std::lock_guard<std::mutex> lock(mutex_);
211  for (auto iter = waitable_ptrs_.begin(); iter != waitable_ptrs_.end(); ++iter) {
212  const auto shared_ptr = iter->lock();
213  if (shared_ptr.get() == waitable_ptr.get()) {
214  waitable_ptrs_.erase(iter);
215  break;
216  }
217  }
218 }
RCLCPP_PUBLIC ~CallbackGroup()
Default destructor.
RCLCPP_PUBLIC std::atomic_bool & get_associated_with_executor_atomic()
Return a reference to the 'associated with executor' atomic boolean.
RCLCPP_PUBLIC rclcpp::GuardCondition::SharedPtr get_notify_guard_condition(const rclcpp::Context::SharedPtr context_ptr)
Defer creating the notify guard condition and return it.
RCLCPP_PUBLIC void trigger_notify_guard_condition()
Trigger the notify guard condition.
RCLCPP_PUBLIC bool automatically_add_to_executor_with_node() const
Return true if this callback group should be automatically added to an executor by the node.