22 #include "rclcpp/callback_group.hpp"
23 #include "rclcpp/client.hpp"
24 #include "rclcpp/service.hpp"
25 #include "rclcpp/subscription_base.hpp"
26 #include "rclcpp/timer.hpp"
27 #include "rclcpp/waitable.hpp"
30 using rclcpp::CallbackGroupType;
32 CallbackGroup::CallbackGroup(
33 CallbackGroupType group_type,
34 bool automatically_add_to_executor_with_node)
35 : type_(group_type), associated_with_executor_(false),
36 can_be_taken_from_(true),
37 automatically_add_to_executor_with_node_(automatically_add_to_executor_with_node)
46 CallbackGroup::can_be_taken_from()
48 return can_be_taken_from_;
51 const CallbackGroupType &
52 CallbackGroup::type()
const
57 void CallbackGroup::collect_all_ptrs(
58 std::function<
void(
const rclcpp::SubscriptionBase::SharedPtr &)> sub_func,
59 std::function<
void(
const rclcpp::ServiceBase::SharedPtr &)> service_func,
60 std::function<
void(
const rclcpp::ClientBase::SharedPtr &)> client_func,
61 std::function<
void(
const rclcpp::TimerBase::SharedPtr &)> timer_func,
62 std::function<
void(
const rclcpp::Waitable::SharedPtr &)> waitable_func)
const
64 std::lock_guard<std::mutex> lock(mutex_);
66 for (
const rclcpp::SubscriptionBase::WeakPtr & weak_ptr : subscription_ptrs_) {
67 rclcpp::SubscriptionBase::SharedPtr ref_ptr = weak_ptr.lock();
73 for (
const rclcpp::ServiceBase::WeakPtr & weak_ptr : service_ptrs_) {
74 rclcpp::ServiceBase::SharedPtr ref_ptr = weak_ptr.lock();
76 service_func(ref_ptr);
80 for (
const rclcpp::ClientBase::WeakPtr & weak_ptr : client_ptrs_) {
81 rclcpp::ClientBase::SharedPtr ref_ptr = weak_ptr.lock();
87 for (
const rclcpp::TimerBase::WeakPtr & weak_ptr : timer_ptrs_) {
88 rclcpp::TimerBase::SharedPtr ref_ptr = weak_ptr.lock();
94 for (
const rclcpp::Waitable::WeakPtr & weak_ptr : waitable_ptrs_) {
95 rclcpp::Waitable::SharedPtr ref_ptr = weak_ptr.lock();
97 waitable_func(ref_ptr);
105 return associated_with_executor_;
111 return automatically_add_to_executor_with_node_;
114 rclcpp::GuardCondition::SharedPtr
117 std::lock_guard<std::recursive_mutex> lock(notify_guard_condition_mutex_);
118 if (notify_guard_condition_ && context_ptr != notify_guard_condition_->get_context()) {
119 if (associated_with_executor_) {
122 notify_guard_condition_ =
nullptr;
125 if (!notify_guard_condition_) {
126 notify_guard_condition_ = std::make_shared<rclcpp::GuardCondition>(context_ptr);
129 return notify_guard_condition_;
135 std::lock_guard<std::recursive_mutex> lock(notify_guard_condition_mutex_);
136 if (notify_guard_condition_) {
137 notify_guard_condition_->trigger();
142 CallbackGroup::add_subscription(
143 const rclcpp::SubscriptionBase::SharedPtr subscription_ptr)
145 std::lock_guard<std::mutex> lock(mutex_);
146 subscription_ptrs_.push_back(subscription_ptr);
147 subscription_ptrs_.erase(
149 subscription_ptrs_.begin(),
150 subscription_ptrs_.end(),
151 [](rclcpp::SubscriptionBase::WeakPtr x) {return x.expired();}),
152 subscription_ptrs_.end());
156 CallbackGroup::add_timer(
const rclcpp::TimerBase::SharedPtr timer_ptr)
158 std::lock_guard<std::mutex> lock(mutex_);
159 timer_ptrs_.push_back(timer_ptr);
164 [](rclcpp::TimerBase::WeakPtr x) {return x.expired();}),
169 CallbackGroup::add_service(
const rclcpp::ServiceBase::SharedPtr service_ptr)
171 std::lock_guard<std::mutex> lock(mutex_);
172 service_ptrs_.push_back(service_ptr);
175 service_ptrs_.begin(),
177 [](rclcpp::ServiceBase::WeakPtr x) {return x.expired();}),
178 service_ptrs_.end());
182 CallbackGroup::add_client(
const rclcpp::ClientBase::SharedPtr client_ptr)
184 std::lock_guard<std::mutex> lock(mutex_);
185 client_ptrs_.push_back(client_ptr);
188 client_ptrs_.begin(),
190 [](rclcpp::ClientBase::WeakPtr x) {return x.expired();}),
195 CallbackGroup::add_waitable(
const rclcpp::Waitable::SharedPtr waitable_ptr)
197 std::lock_guard<std::mutex> lock(mutex_);
198 waitable_ptrs_.push_back(waitable_ptr);
199 waitable_ptrs_.erase(
201 waitable_ptrs_.begin(),
202 waitable_ptrs_.end(),
203 [](rclcpp::Waitable::WeakPtr x) {return x.expired();}),
204 waitable_ptrs_.end());
208 CallbackGroup::remove_waitable(
const rclcpp::Waitable::SharedPtr waitable_ptr) noexcept
210 std::lock_guard<std::mutex> lock(mutex_);
211 for (
auto iter = waitable_ptrs_.begin(); iter != waitable_ptrs_.end(); ++iter) {
212 const auto shared_ptr = iter->lock();
213 if (shared_ptr.get() == waitable_ptr.get()) {
214 waitable_ptrs_.erase(iter);
RCLCPP_PUBLIC ~CallbackGroup()
Default destructor.
RCLCPP_PUBLIC std::atomic_bool & get_associated_with_executor_atomic()
Return a reference to the 'associated with executor' atomic boolean.
RCLCPP_PUBLIC rclcpp::GuardCondition::SharedPtr get_notify_guard_condition(const rclcpp::Context::SharedPtr context_ptr)
Defer creating the notify guard condition and return it.
RCLCPP_PUBLIC void trigger_notify_guard_condition()
Trigger the notify guard condition.
RCLCPP_PUBLIC bool automatically_add_to_executor_with_node() const
Return true if this callback group should be automatically added to an executor by the node.