22 #include "rclcpp/callback_group.hpp"
23 #include "rclcpp/client.hpp"
24 #include "rclcpp/service.hpp"
25 #include "rclcpp/subscription_base.hpp"
26 #include "rclcpp/timer.hpp"
27 #include "rclcpp/waitable.hpp"
30 using rclcpp::CallbackGroupType;
32 CallbackGroup::CallbackGroup(
33 CallbackGroupType group_type,
34 rclcpp::Context::WeakPtr context,
35 bool automatically_add_to_executor_with_node)
36 : type_(group_type), associated_with_executor_(false),
37 can_be_taken_from_(true),
38 automatically_add_to_executor_with_node_(automatically_add_to_executor_with_node),
50 return can_be_taken_from_;
53 const CallbackGroupType &
63 subscription_ptrs_.size() +
64 service_ptrs_.size() +
67 waitable_ptrs_.size();
71 std::function<
void(
const rclcpp::SubscriptionBase::SharedPtr &)> sub_func,
72 std::function<
void(
const rclcpp::ServiceBase::SharedPtr &)> service_func,
73 std::function<
void(
const rclcpp::ClientBase::SharedPtr &)> client_func,
74 std::function<
void(
const rclcpp::TimerBase::SharedPtr &)> timer_func,
75 std::function<
void(
const rclcpp::Waitable::SharedPtr &)> waitable_func)
const
77 std::lock_guard<std::mutex> lock(mutex_);
79 for (
const rclcpp::SubscriptionBase::WeakPtr & weak_ptr : subscription_ptrs_) {
80 rclcpp::SubscriptionBase::SharedPtr ref_ptr = weak_ptr.lock();
86 for (
const rclcpp::ServiceBase::WeakPtr & weak_ptr : service_ptrs_) {
87 rclcpp::ServiceBase::SharedPtr ref_ptr = weak_ptr.lock();
89 service_func(ref_ptr);
93 for (
const rclcpp::ClientBase::WeakPtr & weak_ptr : client_ptrs_) {
94 rclcpp::ClientBase::SharedPtr ref_ptr = weak_ptr.lock();
100 for (
const rclcpp::TimerBase::WeakPtr & weak_ptr : timer_ptrs_) {
101 rclcpp::TimerBase::SharedPtr ref_ptr = weak_ptr.lock();
107 for (
const rclcpp::Waitable::WeakPtr & weak_ptr : waitable_ptrs_) {
108 rclcpp::Waitable::SharedPtr ref_ptr = weak_ptr.lock();
110 waitable_func(ref_ptr);
118 return associated_with_executor_;
124 return automatically_add_to_executor_with_node_;
127 rclcpp::GuardCondition::SharedPtr
130 std::lock_guard<std::recursive_mutex> lock(notify_guard_condition_mutex_);
131 rclcpp::Context::SharedPtr context_ptr = context_.lock();
132 if (context_ptr && context_ptr->is_valid()) {
133 if (!notify_guard_condition_) {
134 notify_guard_condition_ = std::make_shared<rclcpp::GuardCondition>(context_ptr);
136 return notify_guard_condition_;
144 std::lock_guard<std::recursive_mutex> lock(notify_guard_condition_mutex_);
145 if (notify_guard_condition_) {
146 notify_guard_condition_->trigger();
151 CallbackGroup::add_subscription(
152 const rclcpp::SubscriptionBase::SharedPtr subscription_ptr)
154 std::lock_guard<std::mutex> lock(mutex_);
155 subscription_ptrs_.push_back(subscription_ptr);
156 subscription_ptrs_.erase(
158 subscription_ptrs_.begin(),
159 subscription_ptrs_.end(),
160 [](rclcpp::SubscriptionBase::WeakPtr x) {return x.expired();}),
161 subscription_ptrs_.end());
165 CallbackGroup::add_timer(
const rclcpp::TimerBase::SharedPtr timer_ptr)
167 std::lock_guard<std::mutex> lock(mutex_);
168 timer_ptrs_.push_back(timer_ptr);
173 [](rclcpp::TimerBase::WeakPtr x) {return x.expired();}),
178 CallbackGroup::add_service(
const rclcpp::ServiceBase::SharedPtr service_ptr)
180 std::lock_guard<std::mutex> lock(mutex_);
181 service_ptrs_.push_back(service_ptr);
184 service_ptrs_.begin(),
186 [](rclcpp::ServiceBase::WeakPtr x) {return x.expired();}),
187 service_ptrs_.end());
191 CallbackGroup::add_client(
const rclcpp::ClientBase::SharedPtr client_ptr)
193 std::lock_guard<std::mutex> lock(mutex_);
194 client_ptrs_.push_back(client_ptr);
197 client_ptrs_.begin(),
199 [](rclcpp::ClientBase::WeakPtr x) {return x.expired();}),
204 CallbackGroup::add_waitable(
const rclcpp::Waitable::SharedPtr waitable_ptr)
206 std::lock_guard<std::mutex> lock(mutex_);
207 waitable_ptrs_.push_back(waitable_ptr);
208 waitable_ptrs_.erase(
210 waitable_ptrs_.begin(),
211 waitable_ptrs_.end(),
212 [](rclcpp::Waitable::WeakPtr x) {return x.expired();}),
213 waitable_ptrs_.end());
217 CallbackGroup::remove_waitable(
const rclcpp::Waitable::SharedPtr waitable_ptr) noexcept
219 std::lock_guard<std::mutex> lock(mutex_);
220 for (
auto iter = waitable_ptrs_.begin(); iter != waitable_ptrs_.end(); ++iter) {
221 const auto shared_ptr = iter->lock();
222 if (shared_ptr.get() == waitable_ptr.get()) {
223 waitable_ptrs_.erase(iter);
RCLCPP_PUBLIC ~CallbackGroup()
Default destructor.
RCLCPP_PUBLIC rclcpp::GuardCondition::SharedPtr get_notify_guard_condition()
Retrieve the guard condition used to signal changes to this callback group.
RCLCPP_PUBLIC std::atomic_bool & get_associated_with_executor_atomic()
Return a reference to the 'associated with executor' atomic boolean.
RCLCPP_PUBLIC void trigger_notify_guard_condition()
Trigger the notify guard condition.
RCLCPP_PUBLIC void collect_all_ptrs(std::function< void(const rclcpp::SubscriptionBase::SharedPtr &)> sub_func, std::function< void(const rclcpp::ServiceBase::SharedPtr &)> service_func, std::function< void(const rclcpp::ClientBase::SharedPtr &)> client_func, std::function< void(const rclcpp::TimerBase::SharedPtr &)> timer_func, std::function< void(const rclcpp::Waitable::SharedPtr &)> waitable_func) const
Collect all of the entity pointers contained in this callback group.
RCLCPP_PUBLIC bool automatically_add_to_executor_with_node() const
Return true if this callback group should be automatically added to an executor by the node.
RCLCPP_PUBLIC const CallbackGroupType & type() const
Get the group type.
RCLCPP_PUBLIC std::atomic_bool & can_be_taken_from()
Return a reference to the 'can be taken' atomic boolean.
RCLCPP_PUBLIC size_t size() const
Get the total number of entities in this callback group.