ROS 2 rclcpp + rcl - kilted  kilted
ROS 2 C++ Client Library with ROS Client Library
executor_notify_waitable.cpp
1 // Copyright 2023 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "rclcpp/exceptions.hpp"
16 #include "rclcpp/executors/executor_notify_waitable.hpp"
17 
18 namespace rclcpp
19 {
20 namespace executors
21 {
22 
24  std::function<void(void)> on_execute_callback,
25  const rclcpp::Context::SharedPtr & context)
26 : execute_callback_(on_execute_callback),
27  guard_condition_(std::make_shared<rclcpp::GuardCondition>(context))
28 {
29 }
30 
32 {
33  std::lock_guard<std::mutex> lock(other.guard_condition_mutex_);
34  this->execute_callback_ = other.execute_callback_;
35  this->notify_guard_conditions_ = other.notify_guard_conditions_;
36  this->guard_condition_ = other.guard_condition_;
37  this->idxs_of_added_guard_condition_ = other.idxs_of_added_guard_condition_;
38  this->needs_processing = other.needs_processing;
39 }
40 
41 ExecutorNotifyWaitable & ExecutorNotifyWaitable::operator=(ExecutorNotifyWaitable & other)
42 {
43  if (this != &other) {
44  std::lock_guard<std::mutex> lock(other.guard_condition_mutex_);
45  this->execute_callback_ = other.execute_callback_;
46  this->notify_guard_conditions_ = other.notify_guard_conditions_;
47  this->guard_condition_ = other.guard_condition_;
48  this->idxs_of_added_guard_condition_ = other.idxs_of_added_guard_condition_;
49  this->needs_processing = other.needs_processing;
50  }
51  return *this;
52 }
53 
54 void
56 {
57  std::lock_guard<std::mutex> lock(guard_condition_mutex_);
58 
59  idxs_of_added_guard_condition_.clear();
60  idxs_of_added_guard_condition_.reserve(notify_guard_conditions_.size());
61 
62  if(needs_processing) {
63  rcl_guard_condition_t * cond = &guard_condition_->get_rcl_guard_condition();
64  size_t rcl_index;
65  rcl_ret_t ret = rcl_wait_set_add_guard_condition(&wait_set, cond, &rcl_index);
66 
67  if (RCL_RET_OK != ret) {
68  rclcpp::exceptions::throw_from_rcl_error(
69  ret, "failed to add guard condition to wait set");
70  }
71 
72  idxs_of_added_guard_condition_.push_back(rcl_index);
73 
74  // we want to directly wake up any way, not need to add the other guard conditions
75  guard_condition_->trigger();
76 
77  return;
78  }
79 
80  // Note: no guard conditions need to be re-triggered, since the guard
81  // conditions in this class are not tracking a stateful condition, but instead
82  // only serve to interrupt the wait set when new information is available to
83  // consider.
84  for (const auto & guard_condition : this->notify_guard_conditions_) {
85  rcl_guard_condition_t * cond = &guard_condition->get_rcl_guard_condition();
86  size_t rcl_index;
87  rcl_ret_t ret = rcl_wait_set_add_guard_condition(&wait_set, cond, &rcl_index);
88 
89  if (RCL_RET_OK != ret) {
90  rclcpp::exceptions::throw_from_rcl_error(
91  ret, "failed to add guard condition to wait set");
92  }
93 
94  idxs_of_added_guard_condition_.push_back(rcl_index);
95  }
96 }
97 
98 bool
100 {
101  std::lock_guard<std::mutex> lock(guard_condition_mutex_);
102 
103  bool any_ready = false;
104  for (size_t rcl_index : idxs_of_added_guard_condition_) {
105  if(rcl_index >= wait_set.size_of_guard_conditions) {
106  throw std::runtime_error(
107  "ExecutorNotifyWaitable::is_ready: Internal error, got index out of range");
108  }
109 
110  const auto * rcl_guard_condition = wait_set.guard_conditions[rcl_index];
111 
112  if (nullptr == rcl_guard_condition) {
113  continue;
114  }
115 
116  any_ready = true;
117  needs_processing = true;
118  break;
119  }
120 
121  return any_ready;
122 }
123 
124 void
125 ExecutorNotifyWaitable::execute(const std::shared_ptr<void> & /*data*/)
126 {
127  std::lock_guard<std::mutex> lock(execute_mutex_);
128 
129  needs_processing = false;
130 
131  this->execute_callback_();
132 }
133 
134 std::shared_ptr<void>
136 {
137  return nullptr;
138 }
139 
140 std::shared_ptr<void>
142 {
143  return nullptr;
144 }
145 
146 void
147 ExecutorNotifyWaitable::set_on_ready_callback(std::function<void(size_t, int)> callback)
148 {
149  // The second argument of the callback could be used to identify which guard condition
150  // triggered the event.
151  // We could indicate which of the guard conditions was triggered, but the executor
152  // is already going to check that.
153  auto gc_callback = [callback](size_t count) {
154  callback(count, 0);
155  };
156 
157  std::lock_guard<std::mutex> lock(guard_condition_mutex_);
158 
159  on_ready_callback_ = gc_callback;
160  for (const auto & gc : notify_guard_conditions_) {
161  gc->set_on_trigger_callback(on_ready_callback_);
162  }
163 }
164 
165 RCLCPP_PUBLIC
166 void
168 {
169  std::lock_guard<std::mutex> lock(guard_condition_mutex_);
170 
171  on_ready_callback_ = nullptr;
172  for (const auto & gc : notify_guard_conditions_) {
173  gc->set_on_trigger_callback(nullptr);
174  }
175 }
176 
177 RCLCPP_PUBLIC
178 void
179 ExecutorNotifyWaitable::set_execute_callback(std::function<void(void)> on_execute_callback)
180 {
181  std::lock_guard<std::mutex> lock(execute_mutex_);
182  execute_callback_ = on_execute_callback;
183 }
184 
185 void
186 ExecutorNotifyWaitable::add_guard_condition(rclcpp::GuardCondition::WeakPtr weak_guard_condition)
187 {
188  std::lock_guard<std::mutex> lock(guard_condition_mutex_);
189  const auto & guard_condition = weak_guard_condition.lock();
190  if (guard_condition && notify_guard_conditions_.count(guard_condition) == 0) {
191  notify_guard_conditions_.insert(guard_condition);
192  if (on_ready_callback_) {
193  guard_condition->set_on_trigger_callback(on_ready_callback_);
194  }
195  }
196 }
197 
198 void
199 ExecutorNotifyWaitable::remove_guard_condition(rclcpp::GuardCondition::WeakPtr weak_guard_condition)
200 {
201  std::lock_guard<std::mutex> lock(guard_condition_mutex_);
202  const auto & guard_condition = weak_guard_condition.lock();
203  if (!guard_condition) {
204  // If the lock did not work, the guard condition can't be
205  // saved in the sets, therefore we don't need to remove it
206  return;
207  }
208  auto it = notify_guard_conditions_.find(guard_condition);
209  if (it != notify_guard_conditions_.end()) {
210  notify_guard_conditions_.erase(it);
211  // If this notify waitable doesn't have an on_ready_callback, then there's nothing to unset
212  if (on_ready_callback_) {
213  guard_condition->set_on_trigger_callback(nullptr);
214  }
215  }
216 }
217 
218 size_t
220 {
221  std::lock_guard<std::mutex> lock(guard_condition_mutex_);
222  return notify_guard_conditions_.size();
223 }
224 
225 std::vector<std::shared_ptr<rclcpp::TimerBase>>
227 {
228  return {};
229 }
230 
231 
232 } // namespace executors
233 } // namespace rclcpp
A condition that can be waited on in a single wait set and asynchronously triggered.
RCLCPP_PUBLIC ExecutorNotifyWaitable(std::function< void(void)> on_execute_callback={}, const rclcpp::Context::SharedPtr &context=rclcpp::contexts::get_global_default_context())
RCLCPP_PUBLIC std::shared_ptr< void > take_data_by_entity_id(size_t id) override
Take the data from an entity ID so that it can be consumed with execute.
RCLCPP_PUBLIC void set_execute_callback(std::function< void(void)> on_execute_callback)
Set a new callback to be called whenever this waitable is executed.
RCLCPP_PUBLIC void execute(const std::shared_ptr< void > &data) override
Perform work associated with the waitable.
RCLCPP_PUBLIC void add_guard_condition(rclcpp::GuardCondition::WeakPtr guard_condition)
Add a guard condition to be waited on.
RCLCPP_PUBLIC size_t get_number_of_ready_guard_conditions() override
Get the number of ready guard_conditions.
RCLCPP_PUBLIC std::shared_ptr< void > take_data() override
Retrieve data to be used in the next execute call.
RCLCPP_PUBLIC void add_to_wait_set(rcl_wait_set_t &wait_set) override
Add conditions to the wait set.
RCLCPP_PUBLIC void set_on_ready_callback(std::function< void(size_t, int)> callback) override
Set a callback to be called whenever the waitable becomes ready.
RCLCPP_PUBLIC void clear_on_ready_callback() override
Unset any callback registered via set_on_ready_callback.
RCLCPP_PUBLIC bool is_ready(const rcl_wait_set_t &wait_set) override
Check conditions against the wait set.
RCLCPP_PUBLIC std::vector< std::shared_ptr< rclcpp::TimerBase > > get_timers() const override
Returns the number of used Timers.
RCLCPP_PUBLIC void remove_guard_condition(rclcpp::GuardCondition::WeakPtr weak_guard_condition)
Remove a guard condition from being waited on.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
Handle for a rcl guard condition.
Container for subscription's, guard condition's, etc to be waited on.
Definition: wait.h:42
size_t size_of_guard_conditions
Number of guard_conditions.
Definition: wait.h:50
const rcl_guard_condition_t ** guard_conditions
Storage for guard condition pointers.
Definition: wait.h:48
#define RCL_RET_OK
Success return code.
Definition: types.h:27
rmw_ret_t rcl_ret_t
The type that holds an rcl return code.
Definition: types.h:24
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_wait_set_add_guard_condition(rcl_wait_set_t *wait_set, const rcl_guard_condition_t *guard_condition, size_t *index)
Store a pointer to the guard condition in the next empty spot in the set.
Definition: wait.c:441