15 #ifndef RCLCPP__WAIT_SET_POLICIES__THREAD_SAFE_SYNCHRONIZATION_HPP_
16 #define RCLCPP__WAIT_SET_POLICIES__THREAD_SAFE_SYNCHRONIZATION_HPP_
23 #include "rclcpp/client.hpp"
24 #include "rclcpp/exceptions.hpp"
25 #include "rclcpp/guard_condition.hpp"
26 #include "rclcpp/macros.hpp"
27 #include "rclcpp/service.hpp"
28 #include "rclcpp/subscription_base.hpp"
29 #include "rclcpp/subscription_wait_set_mask.hpp"
30 #include "rclcpp/timer.hpp"
31 #include "rclcpp/visibility_control.hpp"
32 #include "rclcpp/wait_result.hpp"
33 #include "rclcpp/wait_result_kind.hpp"
34 #include "rclcpp/wait_set_policies/detail/synchronization_policy_common.hpp"
35 #include "rclcpp/wait_set_policies/detail/write_preferring_read_write_lock.hpp"
36 #include "rclcpp/waitable.hpp"
40 namespace wait_set_policies
71 : extra_guard_conditions_{{std::make_shared<rclcpp::GuardCondition>(context)}},
81 const std::array<std::shared_ptr<rclcpp::GuardCondition>, 1> &
84 return extra_guard_conditions_;
94 extra_guard_conditions_[0]->trigger();
100 std::shared_ptr<rclcpp::SubscriptionBase> && subscription,
104 > add_subscription_function)
107 std::lock_guard<WritePreferringReadWriteLock::WriteMutex> lock(wprw_lock_.
get_write_mutex());
108 add_subscription_function(std::move(subscription), mask);
114 std::shared_ptr<rclcpp::SubscriptionBase> && subscription,
118 > remove_subscription_function)
121 std::lock_guard<WritePreferringReadWriteLock::WriteMutex> lock(wprw_lock_.
get_write_mutex());
122 remove_subscription_function(std::move(subscription), mask);
128 std::shared_ptr<rclcpp::GuardCondition> && guard_condition,
129 std::function<
void(std::shared_ptr<rclcpp::GuardCondition>&&)> add_guard_condition_function)
132 std::lock_guard<WritePreferringReadWriteLock::WriteMutex> lock(wprw_lock_.
get_write_mutex());
133 add_guard_condition_function(std::move(guard_condition));
139 std::shared_ptr<rclcpp::GuardCondition> && guard_condition,
140 std::function<
void(std::shared_ptr<rclcpp::GuardCondition>&&)> remove_guard_condition_function)
143 std::lock_guard<WritePreferringReadWriteLock::WriteMutex> lock(wprw_lock_.
get_write_mutex());
144 remove_guard_condition_function(std::move(guard_condition));
150 std::shared_ptr<rclcpp::TimerBase> && timer,
151 std::function<
void(std::shared_ptr<rclcpp::TimerBase>&&)> add_timer_function)
154 std::lock_guard<WritePreferringReadWriteLock::WriteMutex> lock(wprw_lock_.
get_write_mutex());
155 add_timer_function(std::move(timer));
161 std::shared_ptr<rclcpp::TimerBase> && timer,
162 std::function<
void(std::shared_ptr<rclcpp::TimerBase>&&)> remove_timer_function)
165 std::lock_guard<WritePreferringReadWriteLock::WriteMutex> lock(wprw_lock_.
get_write_mutex());
166 remove_timer_function(std::move(timer));
172 std::shared_ptr<rclcpp::ClientBase> && client,
173 std::function<
void(std::shared_ptr<rclcpp::ClientBase>&&)> add_client_function)
176 std::lock_guard<WritePreferringReadWriteLock::WriteMutex> lock(wprw_lock_.
get_write_mutex());
177 add_client_function(std::move(client));
183 std::shared_ptr<rclcpp::ClientBase> && client,
184 std::function<
void(std::shared_ptr<rclcpp::ClientBase>&&)> remove_client_function)
187 std::lock_guard<WritePreferringReadWriteLock::WriteMutex> lock(wprw_lock_.
get_write_mutex());
188 remove_client_function(std::move(client));
194 std::shared_ptr<rclcpp::ServiceBase> && service,
195 std::function<
void(std::shared_ptr<rclcpp::ServiceBase>&&)> add_service_function)
198 std::lock_guard<WritePreferringReadWriteLock::WriteMutex> lock(wprw_lock_.
get_write_mutex());
199 add_service_function(std::move(service));
205 std::shared_ptr<rclcpp::ServiceBase> && service,
206 std::function<
void(std::shared_ptr<rclcpp::ServiceBase>&&)> remove_service_function)
209 std::lock_guard<WritePreferringReadWriteLock::WriteMutex> lock(wprw_lock_.
get_write_mutex());
210 remove_service_function(std::move(service));
216 std::shared_ptr<rclcpp::Waitable> && waitable,
217 std::shared_ptr<void> && associated_entity,
219 void(std::shared_ptr<rclcpp::Waitable>&&, std::shared_ptr<void> &&)
220 > add_waitable_function)
223 std::lock_guard<WritePreferringReadWriteLock::WriteMutex> lock(wprw_lock_.
get_write_mutex());
224 add_waitable_function(std::move(waitable), std::move(associated_entity));
230 std::shared_ptr<rclcpp::Waitable> && waitable,
231 std::function<
void(std::shared_ptr<rclcpp::Waitable>&&)> remove_waitable_function)
234 std::lock_guard<WritePreferringReadWriteLock::WriteMutex> lock(wprw_lock_.
get_write_mutex());
235 remove_waitable_function(std::move(waitable));
243 std::lock_guard<WritePreferringReadWriteLock::WriteMutex> lock(wprw_lock_.
get_write_mutex());
244 prune_deleted_entities_function();
248 template<
class WaitResultT>
251 std::chrono::nanoseconds time_to_wait_ns,
252 std::function<
void()> rebuild_rcl_wait_set,
254 std::function<WaitResultT(WaitResultKind wait_result_kind)> create_wait_result)
266 auto start = std::chrono::steady_clock::now();
267 std::function<bool()> should_loop = this->create_loop_predicate(time_to_wait_ns, start);
275 std::lock_guard<WritePreferringReadWriteLock::ReadMutex> lock(wprw_lock_.
get_read_mutex());
282 rebuild_rcl_wait_set();
293 auto time_left_to_wait_ns = this->calculate_time_left_to_wait(time_to_wait_ns, start);
305 &(extra_guard_conditions_[0]->get_rcl_guard_condition());
306 bool was_interrupted_by_this_class =
false;
307 bool any_user_guard_conditions_triggered =
false;
310 if (
nullptr != current) {
312 if (rcl_wait_set.
guard_conditions[index] == interrupt_guard_condition_ptr) {
314 was_interrupted_by_this_class =
true;
317 any_user_guard_conditions_triggered =
true;
322 if (!was_interrupted_by_this_class || any_user_guard_conditions_triggered) {
337 return create_wait_result(WaitResultKind::Ready);
348 return create_wait_result(WaitResultKind::Empty);
351 rclcpp::exceptions::throw_from_rcl_error(ret);
353 }
while (should_loop());
356 return create_wait_result(WaitResultKind::Timeout);
360 sync_wait_result_acquire()
366 sync_wait_result_release()
372 std::array<std::shared_ptr<rclcpp::GuardCondition>, 1> extra_guard_conditions_;
Options used to determine what parts of a subscription get added to or removed from a wait set.
WaitSet policy that provides thread-safe synchronization for the wait set.
const std::array< std::shared_ptr< rclcpp::GuardCondition >, 1 > & get_extra_guard_conditions()
Return any "extra" guard conditions needed to implement the synchronization policy.
void sync_add_service(std::shared_ptr< rclcpp::ServiceBase > &&service, std::function< void(std::shared_ptr< rclcpp::ServiceBase > &&)> add_service_function)
Add service.
void sync_remove_guard_condition(std::shared_ptr< rclcpp::GuardCondition > &&guard_condition, std::function< void(std::shared_ptr< rclcpp::GuardCondition > &&)> remove_guard_condition_function)
Remove guard condition.
void sync_remove_timer(std::shared_ptr< rclcpp::TimerBase > &&timer, std::function< void(std::shared_ptr< rclcpp::TimerBase > &&)> remove_timer_function)
Remove timer.
void sync_remove_waitable(std::shared_ptr< rclcpp::Waitable > &&waitable, std::function< void(std::shared_ptr< rclcpp::Waitable > &&)> remove_waitable_function)
Remove waitable.
void sync_add_timer(std::shared_ptr< rclcpp::TimerBase > &&timer, std::function< void(std::shared_ptr< rclcpp::TimerBase > &&)> add_timer_function)
Add timer.
void sync_remove_subscription(std::shared_ptr< rclcpp::SubscriptionBase > &&subscription, const rclcpp::SubscriptionWaitSetMask &mask, std::function< void(std::shared_ptr< rclcpp::SubscriptionBase > &&, const rclcpp::SubscriptionWaitSetMask &) > remove_subscription_function)
Remove guard condition.
void sync_remove_service(std::shared_ptr< rclcpp::ServiceBase > &&service, std::function< void(std::shared_ptr< rclcpp::ServiceBase > &&)> remove_service_function)
Remove service.
void sync_remove_client(std::shared_ptr< rclcpp::ClientBase > &&client, std::function< void(std::shared_ptr< rclcpp::ClientBase > &&)> remove_client_function)
Remove client.
void sync_prune_deleted_entities(std::function< void()> prune_deleted_entities_function)
Prune deleted entities.
void sync_add_waitable(std::shared_ptr< rclcpp::Waitable > &&waitable, std::shared_ptr< void > &&associated_entity, std::function< void(std::shared_ptr< rclcpp::Waitable > &&, std::shared_ptr< void > &&) > add_waitable_function)
Add waitable.
void sync_add_client(std::shared_ptr< rclcpp::ClientBase > &&client, std::function< void(std::shared_ptr< rclcpp::ClientBase > &&)> add_client_function)
Add client.
void sync_add_guard_condition(std::shared_ptr< rclcpp::GuardCondition > &&guard_condition, std::function< void(std::shared_ptr< rclcpp::GuardCondition > &&)> add_guard_condition_function)
Add guard condition.
WaitResultT sync_wait(std::chrono::nanoseconds time_to_wait_ns, std::function< void()> rebuild_rcl_wait_set, std::function< rcl_wait_set_t &()> get_rcl_wait_set, std::function< WaitResultT(WaitResultKind wait_result_kind)> create_wait_result)
Implements wait.
void sync_add_subscription(std::shared_ptr< rclcpp::SubscriptionBase > &&subscription, const rclcpp::SubscriptionWaitSetMask &mask, std::function< void(std::shared_ptr< rclcpp::SubscriptionBase > &&, const rclcpp::SubscriptionWaitSetMask &) > add_subscription_function)
Add subscription.
void interrupt_waiting_wait_set()
Interrupt any waiting wait set.
Common structure for synchronization policies.
Writer-perferring read-write lock.
RCLCPP_PUBLIC WriteMutex & get_write_mutex()
Return write mutex which can be used with standard constructs like std::lock_guard.
RCLCPP_PUBLIC ReadMutex & get_read_mutex()
Return read mutex which can be used with standard constructs like std::lock_guard.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
Handle for a rcl guard condition.
Container for subscription's, guard condition's, etc to be waited on.
size_t size_of_guard_conditions
Number of guard_conditions.
const rcl_guard_condition_t ** guard_conditions
Storage for guard condition pointers.
#define RCL_RET_WAIT_SET_EMPTY
Given rcl_wait_set_t is empty return code.
#define RCL_RET_OK
Success return code.
#define RCL_RET_TIMEOUT
Timeout occurred return code.
rmw_ret_t rcl_ret_t
The type that holds an rcl return code.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_wait(rcl_wait_set_t *wait_set, int64_t timeout)
Block until the wait set is ready or until the timeout has been exceeded.