15 #include "rclcpp/clock.hpp"
17 #include <condition_variable>
21 #include "rclcpp/exceptions.hpp"
22 #include "rclcpp/utilities.hpp"
24 #include "rcpputils/scope_exit.hpp"
25 #include "rcutils/logging_macros.h"
38 exceptions::throw_from_rcl_error(ret,
"failed to initialize rcl clock");
46 RCUTILS_LOG_ERROR(
"Failed to fini rcl clock.");
52 bool stop_sleeping_ =
false;
53 bool shutdown_ =
false;
54 std::condition_variable cv_;
55 std::mutex wait_mutex_;
56 std::mutex clock_mutex_;
59 JumpHandler::JumpHandler(
60 pre_callback_t pre_callback,
61 post_callback_t post_callback,
63 : pre_callback(pre_callback),
64 post_callback(post_callback),
65 notice_threshold(threshold)
76 Time now(0, 0, impl_->rcl_clock_.type);
80 exceptions::throw_from_rcl_error(ret,
"could not get current time stamp");
90 std::unique_lock lock(impl_->wait_mutex_);
91 impl_->stop_sleeping_ =
true;
93 impl_->cv_.notify_one();
99 Context::SharedPtr context)
101 if (!context || !context->is_valid()) {
102 throw std::runtime_error(
"context cannot be slept with because it's invalid");
104 const auto this_clock_type = get_clock_type();
106 throw std::runtime_error(
"until's clock type does not match this clock's type");
108 bool time_source_changed =
false;
114 std::unique_lock lock(impl_->wait_mutex_);
115 impl_->shutdown_ =
true;
117 impl_->cv_.notify_one();
120 auto callback_remover = rcpputils::scope_exit(
121 [&context, &shutdown_cb_handle]() {
122 context->remove_on_shutdown_callback(shutdown_cb_handle);
128 const std::chrono::steady_clock::time_point chrono_entry = std::chrono::steady_clock::now();
129 const Duration delta_t = until - rcl_entry;
130 const std::chrono::steady_clock::time_point chrono_until =
131 chrono_entry + std::chrono::nanoseconds(delta_t.
nanoseconds());
134 std::unique_lock lock(impl_->wait_mutex_);
135 while (
now() < until && !impl_->stop_sleeping_ && !impl_->shutdown_ && context->is_valid()) {
136 impl_->cv_.wait_until(lock, chrono_until);
138 impl_->stop_sleeping_ =
false;
140 auto system_time = std::chrono::system_clock::time_point(
142 std::chrono::duration_cast<std::chrono::system_clock::duration>(
146 std::unique_lock lock(impl_->wait_mutex_);
147 while (
now() < until && !impl_->stop_sleeping_ && !impl_->shutdown_ && context->is_valid()) {
148 impl_->cv_.wait_until(lock, system_time);
150 impl_->stop_sleeping_ =
false;
164 std::lock_guard<std::mutex> lk(impl_->wait_mutex_);
165 time_source_changed = true;
167 impl_->cv_.notify_one();
172 auto system_time = std::chrono::system_clock::time_point(
174 std::chrono::duration_cast<std::chrono::system_clock::duration>(
178 std::unique_lock lock(impl_->wait_mutex_);
179 while (
now() < until && !impl_->stop_sleeping_ && !impl_->shutdown_ && context->is_valid() &&
180 !time_source_changed)
182 impl_->cv_.wait_until(lock, system_time);
184 impl_->stop_sleeping_ =
false;
189 std::unique_lock lock(impl_->wait_mutex_);
190 while (
now() < until && !impl_->stop_sleeping_ && !impl_->shutdown_ && context->is_valid() &&
191 !time_source_changed)
193 impl_->cv_.wait(lock);
195 impl_->stop_sleeping_ =
false;
199 if (!context->is_valid() || time_source_changed) {
203 return now() >= until;
216 throw std::runtime_error(
"clock is not rcl_clock_valid");
224 if (!context || !context->is_valid()) {
225 throw std::runtime_error(
"context cannot be slept with because it's invalid");
228 throw std::runtime_error(
"clock cannot be waited on as it is not rcl_clock_valid");
242 Context::SharedPtr context,
245 if (!context || !context->is_valid()) {
246 throw std::runtime_error(
"context cannot be slept with because it's invalid");
249 throw std::runtime_error(
"clock cannot be waited on as it is not rcl_clock_valid");
253 Time start = timeout_clock.
now();
257 while (!
started() && context->is_valid()) {
258 if (timeout < wait_tick_ns) {
261 Duration time_left = start + timeout - timeout_clock.
now();
262 if (time_left > wait_tick_ns) {
269 if (timeout_clock.
now() - start > timeout) {
281 RCUTILS_LOG_ERROR(
"ROS time not valid!");
285 bool is_enabled =
false;
288 exceptions::throw_from_rcl_error(
289 ret,
"Failed to check ros_time_override_status");
297 return &impl_->rcl_clock_;
301 Clock::get_clock_type() const noexcept
303 return impl_->rcl_clock_.type;
309 return impl_->clock_mutex_;
318 const auto * handler =
static_cast<JumpHandler *
>(user_data);
319 if (
nullptr == handler) {
322 if (before_jump && handler->pre_callback) {
323 handler->pre_callback();
324 }
else if (!before_jump && handler->post_callback) {
325 handler->post_callback(*time_jump);
329 JumpHandler::SharedPtr
331 JumpHandler::pre_callback_t pre_callback,
332 JumpHandler::post_callback_t post_callback,
336 JumpHandler::UniquePtr handler(
new JumpHandler(pre_callback, post_callback, threshold));
337 if (
nullptr == handler) {
338 throw std::bad_alloc{};
342 std::lock_guard<std::mutex> clock_guard(impl_->clock_mutex_);
345 &impl_->rcl_clock_, threshold, Clock::on_time_jump,
348 exceptions::throw_from_rcl_error(ret,
"Failed to add time jump callback");
352 std::weak_ptr<Clock::Impl> weak_impl = impl_;
355 return JumpHandler::SharedPtr(handler.release(), [weak_impl](
JumpHandler * handler) noexcept {
356 auto shared_impl = weak_impl.lock();
358 std::lock_guard<std::mutex> clock_guard(shared_impl->clock_mutex_);
359 rcl_ret_t ret = rcl_clock_remove_jump_callback(&shared_impl->rcl_clock_,
360 Clock::on_time_jump, handler);
361 if (RCL_RET_OK != ret) {
362 RCUTILS_LOG_ERROR(
"Failed to remove time jump callback");
373 std::condition_variable cv_;
375 rclcpp::Clock::SharedPtr clock_;
376 bool time_source_changed_ =
false;
380 wait_until_system_time(
381 std::unique_lock<std::mutex> & lock,
382 const rclcpp::Time & abs_time,
const std::function<
bool ()> & pred)
384 auto system_time = std::chrono::system_clock::time_point(
386 std::chrono::duration_cast<std::chrono::system_clock::duration>(
387 std::chrono::nanoseconds(abs_time.
nanoseconds())));
389 return cv_.wait_until(lock, system_time, pred);
393 wait_until_steady_time(
394 std::unique_lock<std::mutex> & lock,
395 const rclcpp::Time & abs_time,
const std::function<
bool ()> & pred)
399 const std::chrono::steady_clock::time_point chrono_entry = std::chrono::steady_clock::now();
401 const std::chrono::steady_clock::time_point chrono_until =
402 chrono_entry + std::chrono::nanoseconds(delta_t.
nanoseconds());
404 return cv_.wait_until(lock, chrono_until, pred);
410 std::unique_lock<std::mutex> & lock,
411 const rclcpp::Time & abs_time,
const std::function<
bool ()> & pred)
422 time_source_changed_ =
false;
427 std::lock_guard<std::mutex> lk(*lock.mutex());
428 time_source_changed_ =
true;
438 auto clock_handler = clock_->create_jump_callback(
440 post_time_jump_callback,
443 if (!clock_->ros_time_is_active()) {
444 auto system_time = std::chrono::system_clock::time_point(
446 std::chrono::duration_cast<std::chrono::system_clock::duration>(
447 std::chrono::nanoseconds(abs_time.
nanoseconds())));
449 return cv_.wait_until(lock, system_time, [
this, &pred] () {
450 return time_source_changed_ || pred();
457 cv_.wait(lock, [
this, &pred, &abs_time] () {
458 return clock_->now() >= abs_time || time_source_changed_ || pred();
461 return clock_->now() < abs_time;
472 std::unique_lock<std::mutex> & lock,
473 const rclcpp::Time & abs_time,
const std::function<
bool ()> & pred)
475 switch(clock_->get_clock_type()) {
477 throw std::runtime_error(
"Error, wait on uninitialized clock called");
479 return wait_until_ros_time(lock, abs_time, pred);
482 return wait_until_steady_time(lock, abs_time, pred);
485 return wait_until_system_time(lock, abs_time, pred);
499 ClockWaiter::ClockWaiter(
const rclcpp::Clock::SharedPtr & clock)
504 ClockWaiter::~ClockWaiter() =
default;
508 std::unique_lock<std::mutex> & lock,
509 const rclcpp::Time & abs_time,
const std::function<
bool ()> & pred)
511 return impl_->wait_until(lock, abs_time, pred);
522 std::mutex pred_mutex_;
523 bool shutdown_ =
false;
524 rclcpp::Context::SharedPtr context_;
526 ClockWaiter::UniquePtr clock_;
529 Impl(
const rclcpp::Clock::SharedPtr & clock, rclcpp::Context::SharedPtr context)
531 clock_(std::make_unique<ClockWaiter>(clock))
533 if (!context_ || !context_->is_valid()) {
534 throw std::runtime_error(
"context cannot be slept with because it's invalid");
537 shutdown_cb_handle_ = context_->add_on_shutdown_callback(
540 std::unique_lock lock(pred_mutex_);
543 clock_->notify_one();
549 context_->remove_on_shutdown_callback(shutdown_cb_handle_);
554 std::unique_lock<std::mutex> & lock,
rclcpp::Time until,
555 const std::function<
bool ()> & pred)
557 if(lock.mutex() != &pred_mutex_) {
558 throw std::runtime_error(
559 "ClockConditionalVariable::wait_until: Internal error, given lock does not use"
560 " mutex returned by this->mutex()");
563 clock_->wait_until(lock, until, [
this, &pred] () ->
bool {
564 return shutdown_ || pred();
572 clock_->notify_one();
582 ClockConditionalVariable::ClockConditionalVariable(
583 const rclcpp::Clock::SharedPtr & clock,
584 rclcpp::Context::SharedPtr context)
585 :impl_(std::make_unique<
Impl>(clock, context))
589 ClockConditionalVariable::~ClockConditionalVariable() =
default;
599 std::unique_lock<std::mutex> & lock,
rclcpp::Time until,
600 const std::function<
bool ()> & pred)
602 return impl_->wait_until(lock, until, pred);
608 return impl_->mutex();
#define rcl_get_default_allocator
Return a properly initialized rcl_allocator_t with default values.
rcutils_allocator_t rcl_allocator_t
Encapsulation of an allocator.
RCLCPP_PUBLIC void notify_one()
RCLCPP_PUBLIC std::mutex & mutex()
RCLCPP_PUBLIC bool wait_until(std::unique_lock< std::mutex > &lock, rclcpp::Time until, const std::function< bool()> &pred)
RCLCPP_PUBLIC bool wait_until(std::unique_lock< std::mutex > &lock, const rclcpp::Time &abs_time, const std::function< bool()> &pred)
RCLCPP_PUBLIC void notify_one()
RCLCPP_PUBLIC rcl_clock_t * get_clock_handle() noexcept
Return the rcl_clock_t clock handle.
RCLCPP_PUBLIC bool ros_time_is_active()
RCLCPP_PUBLIC bool wait_until_started(Context::SharedPtr context=contexts::get_global_default_context())
RCLCPP_PUBLIC Time now() const
RCLCPP_PUBLIC bool started()
RCLCPP_PUBLIC JumpHandler::SharedPtr create_jump_callback(JumpHandler::pre_callback_t pre_callback, JumpHandler::post_callback_t post_callback, const rcl_jump_threshold_t &threshold)
RCLCPP_PUBLIC Clock(rcl_clock_type_t clock_type=RCL_SYSTEM_TIME)
Default c'tor.
RCLCPP_PUBLIC bool sleep_until(Time until, Context::SharedPtr context=contexts::get_global_default_context())
RCLCPP_PUBLIC void cancel_sleep_or_wait()
RCLCPP_PUBLIC std::mutex & get_clock_mutex() noexcept
Get the clock's mutex.
RCLCPP_PUBLIC bool sleep_for(Duration rel_time, Context::SharedPtr context=contexts::get_global_default_context())
rcl_duration_value_t nanoseconds() const
Get duration in nanosecods.
RCLCPP_PUBLIC rcl_time_point_value_t nanoseconds() const
Get the nanoseconds since epoch.
RCLCPP_PUBLIC rcl_clock_type_t get_clock_type() const
Get the clock type.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
Encapsulation of a time source.
rcl_duration_value_t nanoseconds
Duration in nanoseconds and its source.
Describe the prerequisites for calling a time jump callback.
rcl_duration_t min_forward
bool on_clock_change
True to call callback when the clock type changes.
rcl_duration_t min_backward
Struct to describe a jump in time.
rcl_clock_change_t clock_change
Indicate whether or not the source of time changed.
rcl_time_point_value_t nanoseconds
Nanoseconds of the point in time.
enum rcl_clock_type_e rcl_clock_type_t
Time source type, used to indicate the source of a time measurement.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_clock_get_now(rcl_clock_t *clock, rcl_time_point_value_t *time_point_value)
Fill the time point value with the current value of the associated clock.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_clock_fini(rcl_clock_t *clock)
Finalize a clock.
@ RCL_ROS_TIME_NO_CHANGE
The source before and after the jump is ROS_TIME.
RCL_PUBLIC RCL_WARN_UNUSED bool rcl_clock_time_started(rcl_clock_t *clock)
Check if the clock has started.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_clock_add_jump_callback(rcl_clock_t *clock, rcl_jump_threshold_t threshold, rcl_jump_callback_t callback, void *user_data)
Add a callback to be called when a time jump exceeds a threshold.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_is_enabled_ros_time_override(rcl_clock_t *clock, bool *is_enabled)
Check if the RCL_ROS_TIME time source has the override enabled.
@ RCL_ROS_TIME
Use ROS time.
@ RCL_SYSTEM_TIME
Use system time.
@ RCL_CLOCK_UNINITIALIZED
Clock uninitialized.
@ RCL_STEADY_TIME
Use a steady clock time.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_clock_init(rcl_clock_type_t clock_type, rcl_clock_t *clock, rcl_allocator_t *allocator)
Initialize a clock based on the passed type.
RCL_PUBLIC RCL_WARN_UNUSED bool rcl_clock_valid(rcl_clock_t *clock)
Check if the clock has valid values.
#define RCL_RET_OK
Success return code.
rmw_ret_t rcl_ret_t
The type that holds an rcl return code.