15 #include "rclcpp/experimental/timers_manager.hpp"
24 #include "rcpputils/scope_exit.hpp"
28 TimersManager::TimersManager(
29 std::shared_ptr<rclcpp::Context> context,
30 std::function<
void(
const rclcpp::TimerBase *,
const std::shared_ptr<void> &)> on_ready_callback)
31 : on_ready_callback_(on_ready_callback),
48 throw std::invalid_argument(
"TimersManager::add_timer() trying to add nullptr timer");
53 std::unique_lock<std::mutex> lock(timers_mutex_);
54 added = weak_timers_heap_.add_timer(timer);
55 timers_updated_ = timers_updated_ || added;
58 timer->set_on_reset_callback(
62 std::unique_lock<std::mutex> lock(timers_mutex_);
63 timers_updated_ =
true;
65 timers_cv_.notify_one();
70 timers_cv_.notify_one();
77 if (running_.exchange(
true)) {
78 throw std::runtime_error(
"TimersManager::start() can't start timers thread as already running");
81 timers_thread_ = std::thread(&TimersManager::run_timers,
this);
87 std::unique_lock<std::mutex> lock(stop_mutex_);
92 std::unique_lock<std::mutex> lock(timers_mutex_);
93 timers_updated_ =
true;
95 timers_cv_.notify_one();
98 if (timers_thread_.joinable()) {
99 timers_thread_.join();
107 throw std::runtime_error(
108 "get_head_timeout() can't be used while timers thread is running");
111 std::unique_lock<std::mutex> lock(timers_mutex_);
112 return this->get_head_timeout_unsafe();
119 throw std::runtime_error(
120 "get_number_ready_timers() can't be used while timers thread is running");
123 std::unique_lock<std::mutex> lock(timers_mutex_);
124 TimersHeap locked_heap = weak_timers_heap_.validate_and_lock();
125 return locked_heap.get_number_ready_timers();
132 throw std::runtime_error(
133 "execute_head_timer() can't be used while timers thread is running");
136 std::unique_lock<std::mutex> lock(timers_mutex_);
138 TimersHeap timers_heap = weak_timers_heap_.validate_and_lock();
141 if (timers_heap.empty()) {
145 TimerPtr head_timer = timers_heap.front();
147 const bool timer_ready = head_timer->is_ready();
151 auto data = head_timer->call();
156 head_timer->execute_callback(data);
157 timers_heap.heapify_root();
158 weak_timers_heap_.store(timers_heap);
166 const std::shared_ptr<void> & data)
168 TimerPtr ready_timer;
170 std::unique_lock<std::mutex> lock(timers_mutex_);
171 ready_timer = weak_timers_heap_.get_timer(timer_id);
174 ready_timer->execute_callback(data);
178 std::optional<std::chrono::nanoseconds> TimersManager::get_head_timeout_unsafe()
181 if (weak_timers_heap_.empty()) {
182 return std::chrono::nanoseconds::max();
186 TimerPtr head_timer = weak_timers_heap_.front().lock();
191 TimersHeap locked_heap = weak_timers_heap_.validate_and_lock();
195 if (locked_heap.empty()) {
196 return std::chrono::nanoseconds::max();
198 head_timer = locked_heap.front();
200 if (head_timer->is_canceled()) {
203 return head_timer->time_until_trigger();
206 void TimersManager::execute_ready_timers_unsafe()
209 TimersHeap locked_heap = weak_timers_heap_.validate_and_lock();
212 if (locked_heap.empty()) {
220 TimerPtr head_timer = locked_heap.front();
221 const size_t number_ready_timers = locked_heap.get_number_ready_timers();
222 size_t executed_timers = 0;
223 while (executed_timers < number_ready_timers && head_timer->is_ready()) {
224 auto data = head_timer->call();
226 if (on_ready_callback_) {
227 on_ready_callback_(head_timer.get(), data);
229 head_timer->execute_callback(data);
238 locked_heap.heapify_root();
240 head_timer = locked_heap.front();
245 weak_timers_heap_.store(locked_heap);
248 void TimersManager::run_timers()
252 RCPPUTILS_SCOPE_EXIT(this->running_.store(
false); );
256 std::unique_lock<std::mutex> lock(timers_mutex_);
258 std::optional<std::chrono::nanoseconds> time_to_sleep = get_head_timeout_unsafe();
263 if (!time_to_sleep.has_value()) {
266 TimersHeap locked_heap = weak_timers_heap_.validate_and_lock();
267 locked_heap.heapify();
268 weak_timers_heap_.store(locked_heap);
269 time_to_sleep = get_head_timeout_unsafe();
273 if (!time_to_sleep.has_value() || (time_to_sleep.value() == std::chrono::nanoseconds::max()) ) {
275 timers_cv_.wait(lock, [
this]() {
return timers_updated_;});
279 TimersHeap locked_heap = weak_timers_heap_.validate_and_lock();
280 locked_heap.heapify();
281 weak_timers_heap_.store(locked_heap);
282 }
else if (time_to_sleep.value() != std::chrono::nanoseconds::zero()) {
285 timers_cv_.wait_for(lock, time_to_sleep.value(), [
this]() {return timers_updated_;});
289 timers_updated_ =
false;
292 this->execute_ready_timers_unsafe();
300 std::unique_lock<std::mutex> lock(timers_mutex_);
302 TimersHeap locked_heap = weak_timers_heap_.validate_and_lock();
303 locked_heap.clear_timers_on_reset_callbacks();
305 weak_timers_heap_.clear();
307 timers_updated_ =
true;
311 timers_cv_.notify_one();
316 bool removed =
false;
318 std::unique_lock<std::mutex> lock(timers_mutex_);
319 removed = weak_timers_heap_.remove_timer(timer);
321 timers_updated_ = timers_updated_ || removed;
326 timers_cv_.notify_one();
327 timer->clear_on_reset_callback();
This class provides a way for storing and executing timer objects. It provides APIs to suit the needs...
RCLCPP_PUBLIC void start()
Starts a thread that takes care of executing the timers stored in this object. Function will throw an...
RCLCPP_PUBLIC ~TimersManager()
Destruct the TimersManager object making sure to stop thread and release memory.
RCLCPP_PUBLIC void remove_timer(rclcpp::TimerBase::SharedPtr timer)
Remove a single timer from the object storage. Will do nothing if the timer was not being stored here...
RCLCPP_PUBLIC std::optional< std::chrono::nanoseconds > get_head_timeout()
Get the amount of time before the next timer triggers. This function is thread safe.
RCLCPP_PUBLIC bool execute_head_timer()
Executes head timer if ready. This function is thread safe. This function will try to execute the tim...
RCLCPP_PUBLIC void clear()
Remove all the timers stored in the object. Function is thread safe and it can be called regardless o...
RCLCPP_PUBLIC size_t get_number_ready_timers()
Get the number of timers that are currently ready. This function is thread safe.
RCLCPP_PUBLIC void add_timer(rclcpp::TimerBase::SharedPtr timer)
Adds a new timer to the storage, maintaining weak ownership of it. Function is thread safe and it can...
RCLCPP_PUBLIC void stop()
Stops the timers thread. Will do nothing if the timer thread was not running.
RCLCPP_PUBLIC void execute_ready_timer(const rclcpp::TimerBase *timer_id, const std::shared_ptr< void > &data)
Executes timer identified by its ID. This function is thread safe. This function will try to execute ...
RCLCPP_PUBLIC bool ok(rclcpp::Context::SharedPtr context=nullptr)
Check rclcpp's status.