ROS 2 rclcpp + rcl - jazzy  jazzy
ROS 2 C++ Client Library with ROS Client Library
callback_group.cpp
1 // Copyright 2015 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <algorithm>
16 #include <atomic>
17 #include <functional>
18 #include <memory>
19 #include <mutex>
20 #include <stdexcept>
21 
22 #include "rclcpp/callback_group.hpp"
23 #include "rclcpp/client.hpp"
24 #include "rclcpp/service.hpp"
25 #include "rclcpp/subscription_base.hpp"
26 #include "rclcpp/timer.hpp"
27 #include "rclcpp/waitable.hpp"
28 
30 using rclcpp::CallbackGroupType;
31 
32 CallbackGroup::CallbackGroup(
33  CallbackGroupType group_type,
34  rclcpp::Context::WeakPtr context,
35  bool automatically_add_to_executor_with_node)
36 : type_(group_type), associated_with_executor_(false),
37  can_be_taken_from_(true),
38  automatically_add_to_executor_with_node_(automatically_add_to_executor_with_node),
39  context_(context)
40 {}
41 
43 {
45 }
46 
47 std::atomic_bool &
49 {
50  return can_be_taken_from_;
51 }
52 
53 const CallbackGroupType &
55 {
56  return type_;
57 }
58 
59 size_t
61 {
62  return
63  subscription_ptrs_.size() +
64  service_ptrs_.size() +
65  client_ptrs_.size() +
66  timer_ptrs_.size() +
67  waitable_ptrs_.size();
68 }
69 
71  std::function<void(const rclcpp::SubscriptionBase::SharedPtr &)> sub_func,
72  std::function<void(const rclcpp::ServiceBase::SharedPtr &)> service_func,
73  std::function<void(const rclcpp::ClientBase::SharedPtr &)> client_func,
74  std::function<void(const rclcpp::TimerBase::SharedPtr &)> timer_func,
75  std::function<void(const rclcpp::Waitable::SharedPtr &)> waitable_func) const
76 {
77  std::lock_guard<std::mutex> lock(mutex_);
78 
79  for (const rclcpp::SubscriptionBase::WeakPtr & weak_ptr : subscription_ptrs_) {
80  rclcpp::SubscriptionBase::SharedPtr ref_ptr = weak_ptr.lock();
81  if (ref_ptr) {
82  sub_func(ref_ptr);
83  }
84  }
85 
86  for (const rclcpp::ServiceBase::WeakPtr & weak_ptr : service_ptrs_) {
87  rclcpp::ServiceBase::SharedPtr ref_ptr = weak_ptr.lock();
88  if (ref_ptr) {
89  service_func(ref_ptr);
90  }
91  }
92 
93  for (const rclcpp::ClientBase::WeakPtr & weak_ptr : client_ptrs_) {
94  rclcpp::ClientBase::SharedPtr ref_ptr = weak_ptr.lock();
95  if (ref_ptr) {
96  client_func(ref_ptr);
97  }
98  }
99 
100  for (const rclcpp::TimerBase::WeakPtr & weak_ptr : timer_ptrs_) {
101  rclcpp::TimerBase::SharedPtr ref_ptr = weak_ptr.lock();
102  if (ref_ptr) {
103  timer_func(ref_ptr);
104  }
105  }
106 
107  for (const rclcpp::Waitable::WeakPtr & weak_ptr : waitable_ptrs_) {
108  rclcpp::Waitable::SharedPtr ref_ptr = weak_ptr.lock();
109  if (ref_ptr) {
110  waitable_func(ref_ptr);
111  }
112  }
113 }
114 
115 std::atomic_bool &
117 {
118  return associated_with_executor_;
119 }
120 
121 bool
123 {
124  return automatically_add_to_executor_with_node_;
125 }
126 
127 rclcpp::GuardCondition::SharedPtr
129 {
130  std::lock_guard<std::recursive_mutex> lock(notify_guard_condition_mutex_);
131  rclcpp::Context::SharedPtr context_ptr = context_.lock();
132  if (context_ptr && context_ptr->is_valid()) {
133  if (!notify_guard_condition_) {
134  notify_guard_condition_ = std::make_shared<rclcpp::GuardCondition>(context_ptr);
135  }
136  return notify_guard_condition_;
137  }
138  return nullptr;
139 }
140 
141 void
143 {
144  std::lock_guard<std::recursive_mutex> lock(notify_guard_condition_mutex_);
145  if (notify_guard_condition_) {
146  notify_guard_condition_->trigger();
147  }
148 }
149 
150 void
151 CallbackGroup::add_subscription(
152  const rclcpp::SubscriptionBase::SharedPtr subscription_ptr)
153 {
154  std::lock_guard<std::mutex> lock(mutex_);
155  subscription_ptrs_.push_back(subscription_ptr);
156  subscription_ptrs_.erase(
157  std::remove_if(
158  subscription_ptrs_.begin(),
159  subscription_ptrs_.end(),
160  [](rclcpp::SubscriptionBase::WeakPtr x) {return x.expired();}),
161  subscription_ptrs_.end());
162 }
163 
164 void
165 CallbackGroup::add_timer(const rclcpp::TimerBase::SharedPtr timer_ptr)
166 {
167  std::lock_guard<std::mutex> lock(mutex_);
168  timer_ptrs_.push_back(timer_ptr);
169  timer_ptrs_.erase(
170  std::remove_if(
171  timer_ptrs_.begin(),
172  timer_ptrs_.end(),
173  [](rclcpp::TimerBase::WeakPtr x) {return x.expired();}),
174  timer_ptrs_.end());
175 }
176 
177 void
178 CallbackGroup::add_service(const rclcpp::ServiceBase::SharedPtr service_ptr)
179 {
180  std::lock_guard<std::mutex> lock(mutex_);
181  service_ptrs_.push_back(service_ptr);
182  service_ptrs_.erase(
183  std::remove_if(
184  service_ptrs_.begin(),
185  service_ptrs_.end(),
186  [](rclcpp::ServiceBase::WeakPtr x) {return x.expired();}),
187  service_ptrs_.end());
188 }
189 
190 void
191 CallbackGroup::add_client(const rclcpp::ClientBase::SharedPtr client_ptr)
192 {
193  std::lock_guard<std::mutex> lock(mutex_);
194  client_ptrs_.push_back(client_ptr);
195  client_ptrs_.erase(
196  std::remove_if(
197  client_ptrs_.begin(),
198  client_ptrs_.end(),
199  [](rclcpp::ClientBase::WeakPtr x) {return x.expired();}),
200  client_ptrs_.end());
201 }
202 
203 void
204 CallbackGroup::add_waitable(const rclcpp::Waitable::SharedPtr waitable_ptr)
205 {
206  std::lock_guard<std::mutex> lock(mutex_);
207  waitable_ptrs_.push_back(waitable_ptr);
208  waitable_ptrs_.erase(
209  std::remove_if(
210  waitable_ptrs_.begin(),
211  waitable_ptrs_.end(),
212  [](rclcpp::Waitable::WeakPtr x) {return x.expired();}),
213  waitable_ptrs_.end());
214 }
215 
216 void
217 CallbackGroup::remove_waitable(const rclcpp::Waitable::SharedPtr waitable_ptr) noexcept
218 {
219  std::lock_guard<std::mutex> lock(mutex_);
220  for (auto iter = waitable_ptrs_.begin(); iter != waitable_ptrs_.end(); ++iter) {
221  const auto shared_ptr = iter->lock();
222  if (shared_ptr.get() == waitable_ptr.get()) {
223  waitable_ptrs_.erase(iter);
224  break;
225  }
226  }
227 }
RCLCPP_PUBLIC ~CallbackGroup()
Default destructor.
RCLCPP_PUBLIC rclcpp::GuardCondition::SharedPtr get_notify_guard_condition()
Retrieve the guard condition used to signal changes to this callback group.
RCLCPP_PUBLIC std::atomic_bool & get_associated_with_executor_atomic()
Return a reference to the 'associated with executor' atomic boolean.
RCLCPP_PUBLIC void trigger_notify_guard_condition()
Trigger the notify guard condition.
RCLCPP_PUBLIC void collect_all_ptrs(std::function< void(const rclcpp::SubscriptionBase::SharedPtr &)> sub_func, std::function< void(const rclcpp::ServiceBase::SharedPtr &)> service_func, std::function< void(const rclcpp::ClientBase::SharedPtr &)> client_func, std::function< void(const rclcpp::TimerBase::SharedPtr &)> timer_func, std::function< void(const rclcpp::Waitable::SharedPtr &)> waitable_func) const
Collect all of the entity pointers contained in this callback group.
RCLCPP_PUBLIC bool automatically_add_to_executor_with_node() const
Return true if this callback group should be automatically added to an executor by the node.
RCLCPP_PUBLIC const CallbackGroupType & type() const
Get the group type.
RCLCPP_PUBLIC std::atomic_bool & can_be_taken_from()
Return a reference to the 'can be taken' atomic boolean.
RCLCPP_PUBLIC size_t size() const
Get the total number of entities in this callback group.