ROS 2 rclcpp + rcl - jazzy  jazzy
ROS 2 C++ Client Library with ROS Client Library
timers_manager.hpp
1 // Copyright 2023 iRobot Corporation.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef RCLCPP__EXPERIMENTAL__TIMERS_MANAGER_HPP_
16 #define RCLCPP__EXPERIMENTAL__TIMERS_MANAGER_HPP_
17 
18 #include <algorithm>
19 #include <atomic>
20 #include <chrono>
21 #include <condition_variable>
22 #include <functional>
23 #include <memory>
24 #include <mutex>
25 #include <optional>
26 #include <thread>
27 #include <utility>
28 #include <vector>
29 #include "rclcpp/context.hpp"
30 #include "rclcpp/timer.hpp"
31 
32 namespace rclcpp
33 {
34 namespace experimental
35 {
36 
66 {
67 public:
68  RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(TimersManager)
69 
70 
86  RCLCPP_PUBLIC
88  std::shared_ptr<rclcpp::Context> context,
89  std::function<void(const rclcpp::TimerBase *,
90  const std::shared_ptr<void> &)> on_ready_callback = nullptr);
91 
95  RCLCPP_PUBLIC
97 
105  RCLCPP_PUBLIC
106  void add_timer(rclcpp::TimerBase::SharedPtr timer);
107 
115  RCLCPP_PUBLIC
116  void remove_timer(rclcpp::TimerBase::SharedPtr timer);
117 
122  RCLCPP_PUBLIC
123  void clear();
124 
129  RCLCPP_PUBLIC
130  void start();
131 
136  RCLCPP_PUBLIC
137  void stop();
138 
146  RCLCPP_PUBLIC
147  size_t get_number_ready_timers();
148 
158  RCLCPP_PUBLIC
159  bool execute_head_timer();
160 
170  RCLCPP_PUBLIC
171  void execute_ready_timer(const rclcpp::TimerBase * timer_id, const std::shared_ptr<void> & data);
172 
183  RCLCPP_PUBLIC
184  std::optional<std::chrono::nanoseconds> get_head_timeout();
185 
186 private:
187  RCLCPP_DISABLE_COPY(TimersManager)
188 
189  using TimerPtr = rclcpp::TimerBase::SharedPtr;
190  using WeakTimerPtr = rclcpp::TimerBase::WeakPtr;
191 
192  // Forward declaration
193  class TimersHeap;
194 
204  class WeakTimersHeap
205  {
206 public:
213  bool add_timer(TimerPtr timer)
214  {
215  TimersHeap locked_heap = this->validate_and_lock();
216  bool added = locked_heap.add_timer(std::move(timer));
217 
218  if (added) {
219  // Re-create the weak heap with the new timer added
220  this->store(locked_heap);
221  }
222 
223  return added;
224  }
225 
232  bool remove_timer(TimerPtr timer)
233  {
234  TimersHeap locked_heap = this->validate_and_lock();
235  bool removed = locked_heap.remove_timer(std::move(timer));
236 
237  if (removed) {
238  // Re-create the weak heap with the timer removed
239  this->store(locked_heap);
240  }
241 
242  return removed;
243  }
244 
250  TimerPtr get_timer(const rclcpp::TimerBase * timer_id)
251  {
252  for (auto & weak_timer : weak_heap_) {
253  auto timer = weak_timer.lock();
254  if (timer.get() == timer_id) {
255  return timer;
256  }
257  }
258  return nullptr;
259  }
260 
264  const WeakTimerPtr & front() const
265  {
266  return weak_heap_.front();
267  }
268 
272  bool empty() const
273  {
274  return weak_heap_.empty();
275  }
276 
285  TimersHeap validate_and_lock()
286  {
287  TimersHeap locked_heap;
288  bool any_timer_destroyed = false;
289 
290  for (auto weak_timer : weak_heap_) {
291  auto timer = weak_timer.lock();
292  if (timer) {
293  // This timer is valid, so add it to the locked heap
294  // Note: we access friend private `owned_heap_` member field.
295  locked_heap.owned_heap_.push_back(std::move(timer));
296  } else {
297  // This timer went out of scope, so we don't add it to locked heap
298  // and we mark the corresponding flag.
299  // It's not needed to erase it from weak heap, as we are going to re-heapify.
300  // Note: we can't exit from the loop here, as we need to find all valid timers.
301  any_timer_destroyed = true;
302  }
303  }
304 
305  // If a timer has gone out of scope, then the remaining elements do not represent
306  // a valid heap anymore. We need to re-heapify the timers heap.
307  if (any_timer_destroyed) {
308  locked_heap.heapify();
309  // Re-create the weak heap now that elements have been heapified again
310  this->store(locked_heap);
311  }
312 
313  return locked_heap;
314  }
315 
324  void store(const TimersHeap & heap)
325  {
326  weak_heap_.clear();
327  // Note: we access friend private `owned_heap_` member field.
328  for (auto t : heap.owned_heap_) {
329  weak_heap_.push_back(t);
330  }
331  }
332 
336  void clear()
337  {
338  weak_heap_.clear();
339  }
340 
341 private:
342  std::vector<WeakTimerPtr> weak_heap_;
343  };
344 
351  class TimersHeap
352  {
353 public:
360  bool add_timer(TimerPtr timer)
361  {
362  // Nothing to do if the timer is already stored here
363  auto it = std::find(owned_heap_.begin(), owned_heap_.end(), timer);
364  if (it != owned_heap_.end()) {
365  return false;
366  }
367 
368  owned_heap_.push_back(std::move(timer));
369  std::push_heap(owned_heap_.begin(), owned_heap_.end(), timer_greater);
370 
371  return true;
372  }
373 
380  bool remove_timer(TimerPtr timer)
381  {
382  // Nothing to do if the timer is not stored here
383  auto it = std::find(owned_heap_.begin(), owned_heap_.end(), timer);
384  if (it == owned_heap_.end()) {
385  return false;
386  }
387 
388  owned_heap_.erase(it);
389  this->heapify();
390 
391  return true;
392  }
393 
398  TimerPtr & front()
399  {
400  return owned_heap_.front();
401  }
402 
407  const TimerPtr & front() const
408  {
409  return owned_heap_.front();
410  }
411 
416  bool empty() const
417  {
418  return owned_heap_.empty();
419  }
420 
425  size_t size() const
426  {
427  return owned_heap_.size();
428  }
429 
434  size_t get_number_ready_timers() const
435  {
436  size_t ready_timers = 0;
437 
438  for (TimerPtr t : owned_heap_) {
439  if (t->is_ready()) {
440  ready_timers++;
441  }
442  }
443 
444  return ready_timers;
445  }
446 
450  void heapify_root()
451  {
452  // The following code is a more efficient version than doing
453  // pop_heap, pop_back, push_back, push_heap
454  // as it removes the need for the last push_heap
455 
456  // Push the modified element (i.e. the current root) at the bottom of the heap
457  owned_heap_.push_back(owned_heap_[0]);
458  // Exchange first and last-1 elements and reheapify
459  std::pop_heap(owned_heap_.begin(), owned_heap_.end(), timer_greater);
460  // Remove last element
461  owned_heap_.pop_back();
462  }
463 
467  void heapify()
468  {
469  std::make_heap(owned_heap_.begin(), owned_heap_.end(), timer_greater);
470  }
471 
475  void clear_timers_on_reset_callbacks()
476  {
477  for (TimerPtr & t : owned_heap_) {
478  t->clear_on_reset_callback();
479  }
480  }
481 
486  friend TimersHeap WeakTimersHeap::validate_and_lock();
487 
492  friend void WeakTimersHeap::store(const TimersHeap & heap);
493 
494 private:
499  static bool timer_greater(TimerPtr a, TimerPtr b)
500  {
501  // TODO(alsora): this can cause an error if timers are using different clocks
502  return a->time_until_trigger() > b->time_until_trigger();
503  }
504 
505  std::vector<TimerPtr> owned_heap_;
506  };
507 
512  void run_timers();
513 
524  std::optional<std::chrono::nanoseconds> get_head_timeout_unsafe();
525 
531  void execute_ready_timers_unsafe();
532 
533  // Callback to be called when timer is ready
534  std::function<void(const rclcpp::TimerBase *,
535  const std::shared_ptr<void> &)> on_ready_callback_ = nullptr;
536 
537  // Thread used to run the timers execution task
538  std::thread timers_thread_;
539  // Protects access to timers
540  std::mutex timers_mutex_;
541  // Protects access to stop()
542  std::mutex stop_mutex_;
543  // Notifies the timers thread whenever timers are added/removed
544  std::condition_variable timers_cv_;
545  // Flag used as predicate by timers_cv_ that denotes one or more timers being added/removed
546  bool timers_updated_ {false};
547  // Indicates whether the timers thread is currently running or not
548  std::atomic<bool> running_ {false};
549  // Parent context used to understand if ROS is still active
550  std::shared_ptr<rclcpp::Context> context_;
551  // Timers heap storage with weak ownership
552  WeakTimersHeap weak_timers_heap_;
553 };
554 
555 } // namespace experimental
556 } // namespace rclcpp
557 
558 #endif // RCLCPP__EXPERIMENTAL__TIMERS_MANAGER_HPP_
This class provides a way for storing and executing timer objects. It provides APIs to suit the needs...
RCLCPP_PUBLIC TimersManager(std::shared_ptr< rclcpp::Context > context, std::function< void(const rclcpp::TimerBase *, const std::shared_ptr< void > &)> on_ready_callback=nullptr)
Construct a new TimersManager object.
RCLCPP_PUBLIC void start()
Starts a thread that takes care of executing the timers stored in this object. Function will throw an...
RCLCPP_PUBLIC ~TimersManager()
Destruct the TimersManager object making sure to stop thread and release memory.
RCLCPP_PUBLIC void remove_timer(rclcpp::TimerBase::SharedPtr timer)
Remove a single timer from the object storage. Will do nothing if the timer was not being stored here...
RCLCPP_PUBLIC std::optional< std::chrono::nanoseconds > get_head_timeout()
Get the amount of time before the next timer triggers. This function is thread safe.
RCLCPP_PUBLIC bool execute_head_timer()
Executes head timer if ready. This function is thread safe. This function will try to execute the tim...
RCLCPP_PUBLIC void clear()
Remove all the timers stored in the object. Function is thread safe and it can be called regardless o...
RCLCPP_PUBLIC size_t get_number_ready_timers()
Get the number of timers that are currently ready. This function is thread safe.
RCLCPP_PUBLIC void add_timer(rclcpp::TimerBase::SharedPtr timer)
Adds a new timer to the storage, maintaining weak ownership of it. Function is thread safe and it can...
RCLCPP_PUBLIC void stop()
Stops the timers thread. Will do nothing if the timer thread was not running.
RCLCPP_PUBLIC void execute_ready_timer(const rclcpp::TimerBase *timer_id, const std::shared_ptr< void > &data)
Executes timer identified by its ID. This function is thread safe. This function will try to execute ...
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.