ROS 2 rclcpp + rcl - rolling  rolling-a919a6e5
ROS 2 C++ Client Library with ROS Client Library
timers_manager.cpp
1 // Copyright 2023 iRobot Corporation.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "rclcpp/experimental/timers_manager.hpp"
16 
17 #include <inttypes.h>
18 
19 #include <ctime>
20 #include <iostream>
21 #include <memory>
22 #include <stdexcept>
23 
24 #include "rcpputils/scope_exit.hpp"
25 
27 
28 TimersManager::TimersManager(
29  std::shared_ptr<rclcpp::Context> context,
30  std::function<void(const rclcpp::TimerBase *, const std::shared_ptr<void> &)> on_ready_callback)
31 : on_ready_callback_(on_ready_callback),
32  context_(context)
33 {
34 }
35 
37 {
38  // Remove all timers
39  this->clear();
40 
41  // Make sure timers thread is stopped before destroying this object
42  this->stop();
43 }
44 
45 void TimersManager::add_timer(rclcpp::TimerBase::SharedPtr timer)
46 {
47  if (!timer) {
48  throw std::invalid_argument("TimersManager::add_timer() trying to add nullptr timer");
49  }
50 
51  bool added = false;
52  {
53  std::unique_lock<std::mutex> lock(timers_mutex_);
54  added = weak_timers_heap_.add_timer(timer);
55  timers_updated_ = timers_updated_ || added;
56  }
57 
58  timer->set_on_reset_callback(
59  [this](size_t arg) {
60  {
61  (void)arg;
62  std::unique_lock<std::mutex> lock(timers_mutex_);
63  timers_updated_ = true;
64  }
65  timers_cv_.notify_one();
66  });
67 
68  if (added) {
69  // Notify that a timer has been added
70  timers_cv_.notify_one();
71  }
72 }
73 
75 {
76  // Make sure that the thread is not already running
77  if (running_.exchange(true)) {
78  throw std::runtime_error("TimersManager::start() can't start timers thread as already running");
79  }
80 
81  timers_thread_ = std::thread(&TimersManager::run_timers, this);
82 }
83 
85 {
86  // Lock stop() function to prevent race condition in destructor
87  std::unique_lock<std::mutex> lock(stop_mutex_);
88  running_ = false;
89 
90  // Notify the timers manager thread to wake up
91  {
92  std::unique_lock<std::mutex> lock(timers_mutex_);
93  timers_updated_ = true;
94  }
95  timers_cv_.notify_one();
96 
97  // Join timers thread if it's running
98  if (timers_thread_.joinable()) {
99  timers_thread_.join();
100  }
101 }
102 
103 std::optional<std::chrono::nanoseconds> TimersManager::get_head_timeout()
104 {
105  // Do not allow to interfere with the thread running
106  if (running_) {
107  throw std::runtime_error(
108  "get_head_timeout() can't be used while timers thread is running");
109  }
110 
111  std::unique_lock<std::mutex> lock(timers_mutex_);
112  return this->get_head_timeout_unsafe();
113 }
114 
116 {
117  // Do not allow to interfere with the thread running
118  if (running_) {
119  throw std::runtime_error(
120  "get_number_ready_timers() can't be used while timers thread is running");
121  }
122 
123  std::unique_lock<std::mutex> lock(timers_mutex_);
124  TimersHeap locked_heap = weak_timers_heap_.validate_and_lock();
125  return locked_heap.get_number_ready_timers();
126 }
127 
129 {
130  // Do not allow to interfere with the thread running
131  if (running_) {
132  throw std::runtime_error(
133  "execute_head_timer() can't be used while timers thread is running");
134  }
135 
136  std::unique_lock<std::mutex> lock(timers_mutex_);
137 
138  TimersHeap timers_heap = weak_timers_heap_.validate_and_lock();
139 
140  // Nothing to do if we don't have any timer
141  if (timers_heap.empty()) {
142  return false;
143  }
144 
145  TimerPtr head_timer = timers_heap.front();
146 
147  const bool timer_ready = head_timer->is_ready();
148  if (timer_ready) {
149  // NOTE: here we always execute the timer, regardless of whether the
150  // on_ready_callback is set or not.
151  auto data = head_timer->call();
152  if (!data) {
153  // someone canceled the timer between is_ready and call
154  return false;
155  }
156  head_timer->execute_callback(data);
157  timers_heap.heapify_root();
158  weak_timers_heap_.store(timers_heap);
159  }
160 
161  return timer_ready;
162 }
163 
165  const rclcpp::TimerBase * timer_id,
166  const std::shared_ptr<void> & data)
167 {
168  TimerPtr ready_timer;
169  {
170  std::unique_lock<std::mutex> lock(timers_mutex_);
171  ready_timer = weak_timers_heap_.get_timer(timer_id);
172  }
173  if (ready_timer) {
174  ready_timer->execute_callback(data);
175  }
176 }
177 
178 std::optional<std::chrono::nanoseconds> TimersManager::get_head_timeout_unsafe()
179 {
180  // If we don't have any weak pointer, then we just return maximum timeout
181  if (weak_timers_heap_.empty()) {
182  return std::chrono::nanoseconds::max();
183  }
184  // Weak heap is not empty, so try to lock the first element.
185  // If it is still a valid pointer, it is guaranteed to be the correct head
186  TimerPtr head_timer = weak_timers_heap_.front().lock();
187 
188  if (!head_timer) {
189  // The first element has expired, we can't make other assumptions on the heap
190  // and we need to entirely validate it.
191  TimersHeap locked_heap = weak_timers_heap_.validate_and_lock();
192  // NOTE: the following operations will not modify any element in the heap, so we
193  // don't have to call `weak_timers_heap_.store(locked_heap)` at the end.
194 
195  if (locked_heap.empty()) {
196  return std::chrono::nanoseconds::max();
197  }
198  head_timer = locked_heap.front();
199  }
200  if (head_timer->is_canceled()) {
201  return std::nullopt;
202  }
203  return head_timer->time_until_trigger();
204 }
205 
206 void TimersManager::execute_ready_timers_unsafe()
207 {
208  // We start by locking the timers
209  TimersHeap locked_heap = weak_timers_heap_.validate_and_lock();
210 
211  // Nothing to do if we don't have any timer
212  if (locked_heap.empty()) {
213  return;
214  }
215 
216  // Keep executing timers until they are ready and they were already ready when we started.
217  // The two checks prevent this function from blocking indefinitely if the
218  // time required for executing the timers is longer than their period.
219 
220  TimerPtr head_timer = locked_heap.front();
221  const size_t number_ready_timers = locked_heap.get_number_ready_timers();
222  size_t executed_timers = 0;
223  while (executed_timers < number_ready_timers && head_timer->is_ready()) {
224  auto data = head_timer->call();
225  if (data) {
226  if (on_ready_callback_) {
227  on_ready_callback_(head_timer.get(), data);
228  } else {
229  head_timer->execute_callback(data);
230  }
231  } else {
232  // someone canceled the timer between is_ready and call
233  // we don't do anything, as the timer is now 'processed'
234  }
235 
236  executed_timers++;
237  // Executing a timer will result in updating its time_until_trigger, so re-heapify
238  locked_heap.heapify_root();
239  // Get new head timer
240  head_timer = locked_heap.front();
241  }
242 
243  // After having performed work on the locked heap we reflect the changes to weak one.
244  // Timers will be already sorted the next time we need them if none went out of scope.
245  weak_timers_heap_.store(locked_heap);
246 }
247 
248 void TimersManager::run_timers()
249 {
250  // Make sure the running flag is set to false when we exit from this function
251  // to allow restarting the timers thread.
252  RCPPUTILS_SCOPE_EXIT(this->running_.store(false); );
253 
254  while (rclcpp::ok(context_) && running_) {
255  // Lock mutex
256  std::unique_lock<std::mutex> lock(timers_mutex_);
257 
258  std::optional<std::chrono::nanoseconds> time_to_sleep = get_head_timeout_unsafe();
259 
260  // If head timer was cancelled, try to reheap and get a new head.
261  // This avoids an edge condition where head timer is cancelled, but other
262  // valid timers remain in the heap.
263  if (!time_to_sleep.has_value()) {
264  // Re-heap to (possibly) move cancelled timer from head of heap. If
265  // entire heap is cancelled, this will still result in a nullopt.
266  TimersHeap locked_heap = weak_timers_heap_.validate_and_lock();
267  locked_heap.heapify();
268  weak_timers_heap_.store(locked_heap);
269  time_to_sleep = get_head_timeout_unsafe();
270  }
271 
272  // If no timers, or all timers cancelled, wait for an update.
273  if (!time_to_sleep.has_value() || (time_to_sleep.value() == std::chrono::nanoseconds::max()) ) {
274  // Wait until notification that timers have been updated
275  timers_cv_.wait(lock, [this]() {return timers_updated_;});
276 
277  // Re-heap in case ordering changed due to a cancelled timer
278  // re-activating.
279  TimersHeap locked_heap = weak_timers_heap_.validate_and_lock();
280  locked_heap.heapify();
281  weak_timers_heap_.store(locked_heap);
282  } else if (time_to_sleep.value() != std::chrono::nanoseconds::zero()) {
283  // If time_to_sleep is zero, we immediately execute. Otherwise, wait
284  // until timeout or notification that timers have been updated
285  timers_cv_.wait_for(lock, time_to_sleep.value(), [this]() {return timers_updated_;});
286  }
287 
288  // Reset timers updated flag
289  timers_updated_ = false;
290 
291  // Execute timers
292  this->execute_ready_timers_unsafe();
293  }
294 }
295 
297 {
298  {
299  // Lock mutex and then clear all data structures
300  std::unique_lock<std::mutex> lock(timers_mutex_);
301 
302  TimersHeap locked_heap = weak_timers_heap_.validate_and_lock();
303  locked_heap.clear_timers_on_reset_callbacks();
304 
305  weak_timers_heap_.clear();
306 
307  timers_updated_ = true;
308  }
309 
310  // Notify timers thread such that it can re-compute its timeout
311  timers_cv_.notify_one();
312 }
313 
314 void TimersManager::remove_timer(TimerPtr timer)
315 {
316  bool removed = false;
317  {
318  std::unique_lock<std::mutex> lock(timers_mutex_);
319  removed = weak_timers_heap_.remove_timer(timer);
320 
321  timers_updated_ = timers_updated_ || removed;
322  }
323 
324  if (removed) {
325  // Notify timers thread such that it can re-compute its timeout
326  timers_cv_.notify_one();
327  timer->clear_on_reset_callback();
328  }
329 }
This class provides a way for storing and executing timer objects. It provides APIs to suit the needs...
RCLCPP_PUBLIC void start()
Starts a thread that takes care of executing the timers stored in this object. Function will throw an...
RCLCPP_PUBLIC ~TimersManager()
Destruct the TimersManager object making sure to stop thread and release memory.
RCLCPP_PUBLIC void remove_timer(rclcpp::TimerBase::SharedPtr timer)
Remove a single timer from the object storage. Will do nothing if the timer was not being stored here...
RCLCPP_PUBLIC std::optional< std::chrono::nanoseconds > get_head_timeout()
Get the amount of time before the next timer triggers. This function is thread safe.
RCLCPP_PUBLIC bool execute_head_timer()
Executes head timer if ready. This function is thread safe. This function will try to execute the tim...
RCLCPP_PUBLIC void clear()
Remove all the timers stored in the object. Function is thread safe and it can be called regardless o...
RCLCPP_PUBLIC size_t get_number_ready_timers()
Get the number of timers that are currently ready. This function is thread safe.
RCLCPP_PUBLIC void add_timer(rclcpp::TimerBase::SharedPtr timer)
Adds a new timer to the storage, maintaining weak ownership of it. Function is thread safe and it can...
RCLCPP_PUBLIC void stop()
Stops the timers thread. Will do nothing if the timer thread was not running.
RCLCPP_PUBLIC void execute_ready_timer(const rclcpp::TimerBase *timer_id, const std::shared_ptr< void > &data)
Executes timer identified by its ID. This function is thread safe. This function will try to execute ...
RCLCPP_PUBLIC bool ok(rclcpp::Context::SharedPtr context=nullptr)
Check rclcpp's status.