15 #include "rclcpp/exceptions.hpp"
16 #include "rclcpp/executors/executor_notify_waitable.hpp"
24 std::function<
void(
void)> on_execute_callback,
25 const rclcpp::Context::SharedPtr & context)
26 : execute_callback_(on_execute_callback),
33 std::lock_guard<std::mutex> lock(other.guard_condition_mutex_);
34 this->execute_callback_ = other.execute_callback_;
35 this->notify_guard_conditions_ = other.notify_guard_conditions_;
36 this->guard_condition_ = other.guard_condition_;
37 this->idxs_of_added_guard_condition_ = other.idxs_of_added_guard_condition_;
38 this->needs_processing = other.needs_processing;
41 ExecutorNotifyWaitable & ExecutorNotifyWaitable::operator=(ExecutorNotifyWaitable & other)
44 std::lock_guard<std::mutex> lock(other.guard_condition_mutex_);
45 this->execute_callback_ = other.execute_callback_;
46 this->notify_guard_conditions_ = other.notify_guard_conditions_;
47 this->guard_condition_ = other.guard_condition_;
48 this->idxs_of_added_guard_condition_ = other.idxs_of_added_guard_condition_;
49 this->needs_processing = other.needs_processing;
57 std::lock_guard<std::mutex> lock(guard_condition_mutex_);
59 idxs_of_added_guard_condition_.clear();
60 idxs_of_added_guard_condition_.reserve(notify_guard_conditions_.size());
62 if(needs_processing) {
68 rclcpp::exceptions::throw_from_rcl_error(
69 ret,
"failed to add guard condition to wait set");
72 idxs_of_added_guard_condition_.push_back(rcl_index);
75 guard_condition_->trigger();
84 for (
const auto & guard_condition : this->notify_guard_conditions_) {
90 rclcpp::exceptions::throw_from_rcl_error(
91 ret,
"failed to add guard condition to wait set");
94 idxs_of_added_guard_condition_.push_back(rcl_index);
101 std::lock_guard<std::mutex> lock(guard_condition_mutex_);
103 bool any_ready =
false;
104 for (
size_t rcl_index : idxs_of_added_guard_condition_) {
106 throw std::runtime_error(
107 "ExecutorNotifyWaitable::is_ready: Internal error, got index out of range");
112 if (
nullptr == rcl_guard_condition) {
117 needs_processing =
true;
127 std::lock_guard<std::mutex> lock(execute_mutex_);
129 needs_processing =
false;
131 this->execute_callback_();
134 std::shared_ptr<void>
140 std::shared_ptr<void>
153 auto gc_callback = [callback](
size_t count) {
157 std::lock_guard<std::mutex> lock(guard_condition_mutex_);
159 on_ready_callback_ = gc_callback;
160 for (
const auto & gc : notify_guard_conditions_) {
161 gc->set_on_trigger_callback(on_ready_callback_);
169 std::lock_guard<std::mutex> lock(guard_condition_mutex_);
171 on_ready_callback_ =
nullptr;
172 for (
const auto & gc : notify_guard_conditions_) {
173 gc->set_on_trigger_callback(
nullptr);
181 std::lock_guard<std::mutex> lock(execute_mutex_);
182 execute_callback_ = on_execute_callback;
188 std::lock_guard<std::mutex> lock(guard_condition_mutex_);
189 const auto & guard_condition = weak_guard_condition.lock();
190 if (guard_condition && notify_guard_conditions_.count(guard_condition) == 0) {
191 notify_guard_conditions_.insert(guard_condition);
192 if (on_ready_callback_) {
193 guard_condition->set_on_trigger_callback(on_ready_callback_);
201 std::lock_guard<std::mutex> lock(guard_condition_mutex_);
202 const auto & guard_condition = weak_guard_condition.lock();
203 if (!guard_condition) {
208 auto it = notify_guard_conditions_.find(guard_condition);
209 if (it != notify_guard_conditions_.end()) {
210 notify_guard_conditions_.erase(it);
212 if (on_ready_callback_) {
213 guard_condition->set_on_trigger_callback(
nullptr);
221 std::lock_guard<std::mutex> lock(guard_condition_mutex_);
222 return notify_guard_conditions_.size();
225 std::vector<std::shared_ptr<rclcpp::TimerBase>>
A condition that can be waited on in a single wait set and asynchronously triggered.
RCLCPP_PUBLIC ExecutorNotifyWaitable(std::function< void(void)> on_execute_callback={}, const rclcpp::Context::SharedPtr &context=rclcpp::contexts::get_global_default_context())
RCLCPP_PUBLIC std::shared_ptr< void > take_data_by_entity_id(size_t id) override
Take the data from an entity ID so that it can be consumed with execute.
RCLCPP_PUBLIC void set_execute_callback(std::function< void(void)> on_execute_callback)
Set a new callback to be called whenever this waitable is executed.
RCLCPP_PUBLIC void execute(const std::shared_ptr< void > &data) override
Perform work associated with the waitable.
RCLCPP_PUBLIC void add_guard_condition(rclcpp::GuardCondition::WeakPtr guard_condition)
Add a guard condition to be waited on.
RCLCPP_PUBLIC size_t get_number_of_ready_guard_conditions() override
Get the number of ready guard_conditions.
RCLCPP_PUBLIC std::shared_ptr< void > take_data() override
Retrieve data to be used in the next execute call.
RCLCPP_PUBLIC void add_to_wait_set(rcl_wait_set_t &wait_set) override
Add conditions to the wait set.
RCLCPP_PUBLIC void set_on_ready_callback(std::function< void(size_t, int)> callback) override
Set a callback to be called whenever the waitable becomes ready.
RCLCPP_PUBLIC void clear_on_ready_callback() override
Unset any callback registered via set_on_ready_callback.
RCLCPP_PUBLIC bool is_ready(const rcl_wait_set_t &wait_set) override
Check conditions against the wait set.
RCLCPP_PUBLIC std::vector< std::shared_ptr< rclcpp::TimerBase > > get_timers() const override
Returns the number of used Timers.
RCLCPP_PUBLIC void remove_guard_condition(rclcpp::GuardCondition::WeakPtr weak_guard_condition)
Remove a guard condition from being waited on.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
Handle for a rcl guard condition.
Container for subscription's, guard condition's, etc to be waited on.
size_t size_of_guard_conditions
Number of guard_conditions.
const rcl_guard_condition_t ** guard_conditions
Storage for guard condition pointers.
#define RCL_RET_OK
Success return code.
rmw_ret_t rcl_ret_t
The type that holds an rcl return code.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_wait_set_add_guard_condition(rcl_wait_set_t *wait_set, const rcl_guard_condition_t *guard_condition, size_t *index)
Store a pointer to the guard condition in the next empty spot in the set.