15 #ifndef RCLCPP__WAIT_SET_TEMPLATE_HPP_
16 #define RCLCPP__WAIT_SET_TEMPLATE_HPP_
23 #include "rcpputils/scope_exit.hpp"
25 #include "rclcpp/client.hpp"
26 #include "rclcpp/context.hpp"
27 #include "rclcpp/contexts/default_context.hpp"
28 #include "rclcpp/guard_condition.hpp"
29 #include "rclcpp/macros.hpp"
30 #include "rclcpp/service.hpp"
31 #include "rclcpp/subscription_base.hpp"
32 #include "rclcpp/subscription_wait_set_mask.hpp"
33 #include "rclcpp/timer.hpp"
34 #include "rclcpp/visibility_control.hpp"
35 #include "rclcpp/wait_result.hpp"
36 #include "rclcpp/waitable.hpp"
46 template<
class SynchronizationPolicy,
class StoragePolicy>
47 class WaitSetTemplate final :
private SynchronizationPolicy,
private StoragePolicy
52 using typename StoragePolicy::SubscriptionEntry;
53 using typename StoragePolicy::WaitableEntry;
72 const typename StoragePolicy::SubscriptionsIterable & subscriptions = {},
73 const typename StoragePolicy::GuardConditionsIterable & guard_conditions = {},
74 const typename StoragePolicy::TimersIterable & timers = {},
75 const typename StoragePolicy::ClientsIterable & clients = {},
76 const typename StoragePolicy::ServicesIterable & services = {},
77 const typename StoragePolicy::WaitablesIterable & waitables = {},
78 rclcpp::Context::SharedPtr context =
79 rclcpp::contexts::get_global_default_context())
80 : SynchronizationPolicy(context),
85 this->get_extra_guard_conditions(),
103 return this->storage_get_rcl_wait_set();
131 std::shared_ptr<rclcpp::SubscriptionBase> subscription,
134 if (
nullptr == subscription) {
135 throw std::invalid_argument(
"subscription is nullptr");
138 this->sync_add_subscription(
139 std::move(subscription),
142 std::shared_ptr<rclcpp::SubscriptionBase> && inner_subscription,
149 auto local_subscription = inner_subscription;
150 bool already_in_use =
151 local_subscription->exchange_in_use_by_wait_set_state(local_subscription.get(), true);
152 if (already_in_use) {
153 throw std::runtime_error(
"subscription already associated with a wait set");
155 this->storage_add_subscription(std::move(local_subscription));
156 if (this->wait_result_holding_) {this->wait_result_dirty_ =
true;}
159 for (auto key_event_pair : inner_subscription->get_event_handlers()) {
160 auto event = key_event_pair.second;
161 auto local_subscription = inner_subscription;
162 bool already_in_use =
163 local_subscription->exchange_in_use_by_wait_set_state(event.get(), true);
164 if (already_in_use) {
165 throw std::runtime_error(
"subscription event already associated with a wait set");
167 this->storage_add_waitable(std::move(event), std::move(local_subscription));
168 if (this->wait_result_holding_) {this->wait_result_dirty_ = true;}
172 auto local_subscription = inner_subscription;
173 auto waitable = inner_subscription->get_intra_process_waitable();
174 if (nullptr != waitable) {
175 bool already_in_use = local_subscription->exchange_in_use_by_wait_set_state(
178 if (already_in_use) {
179 throw std::runtime_error(
180 "subscription intra-process waitable already associated with a wait set");
182 this->storage_add_waitable(
183 std::move(inner_subscription->get_intra_process_waitable()),
184 std::move(local_subscription));
185 if (this->wait_result_holding_) {this->wait_result_dirty_ = true;}
209 std::shared_ptr<rclcpp::SubscriptionBase> subscription,
212 if (
nullptr == subscription) {
213 throw std::invalid_argument(
"subscription is nullptr");
216 this->sync_remove_subscription(
217 std::move(subscription),
220 std::shared_ptr<rclcpp::SubscriptionBase> && inner_subscription,
227 auto local_subscription = inner_subscription;
228 local_subscription->exchange_in_use_by_wait_set_state(local_subscription.get(), false);
229 this->storage_remove_subscription(std::move(local_subscription));
230 if (this->wait_result_holding_) {this->wait_result_dirty_ = true;}
233 for (auto key_event_pair : inner_subscription->get_event_handlers()) {
234 auto event = key_event_pair.second;
235 auto local_subscription = inner_subscription;
236 local_subscription->exchange_in_use_by_wait_set_state(event.get(), false);
237 this->storage_remove_waitable(std::move(event));
238 if (this->wait_result_holding_) {this->wait_result_dirty_ = true;}
242 auto local_waitable = inner_subscription->get_intra_process_waitable();
243 if (nullptr != local_waitable) {
245 inner_subscription->exchange_in_use_by_wait_set_state(local_waitable.get(), false);
246 this->storage_remove_waitable(std::move(local_waitable));
247 if (this->wait_result_holding_) {this->wait_result_dirty_ = true;}
283 if (
nullptr == guard_condition) {
284 throw std::invalid_argument(
"guard_condition is nullptr");
287 this->sync_add_guard_condition(
288 std::move(guard_condition),
289 [
this](std::shared_ptr<rclcpp::GuardCondition> && inner_guard_condition) {
290 bool already_in_use = inner_guard_condition->exchange_in_use_by_wait_set_state(
true);
291 if (already_in_use) {
292 throw std::runtime_error(
"guard condition already in use by another wait set");
297 this->storage_add_guard_condition(std::move(inner_guard_condition));
298 if (this->wait_result_holding_) {this->wait_result_dirty_ =
true;}
324 if (
nullptr == guard_condition) {
325 throw std::invalid_argument(
"guard_condition is nullptr");
328 this->sync_remove_guard_condition(
329 std::move(guard_condition),
330 [
this](std::shared_ptr<rclcpp::GuardCondition> && inner_guard_condition) {
331 inner_guard_condition->exchange_in_use_by_wait_set_state(
false);
335 this->storage_remove_guard_condition(std::move(inner_guard_condition));
336 if (this->wait_result_holding_) {this->wait_result_dirty_ =
true;}
353 if (
nullptr == timer) {
354 throw std::invalid_argument(
"timer is nullptr");
357 this->sync_add_timer(
359 [
this](std::shared_ptr<rclcpp::TimerBase> && inner_timer) {
360 bool already_in_use = inner_timer->exchange_in_use_by_wait_set_state(
true);
361 if (already_in_use) {
362 throw std::runtime_error(
"timer already in use by another wait set");
367 this->storage_add_timer(std::move(inner_timer));
368 if (this->wait_result_holding_) {this->wait_result_dirty_ =
true;}
384 if (
nullptr == timer) {
385 throw std::invalid_argument(
"timer is nullptr");
388 this->sync_remove_timer(
390 [
this](std::shared_ptr<rclcpp::TimerBase> && inner_timer) {
391 inner_timer->exchange_in_use_by_wait_set_state(
false);
395 this->storage_remove_timer(std::move(inner_timer));
396 if (this->wait_result_holding_) {this->wait_result_dirty_ =
true;}
413 if (
nullptr == client) {
414 throw std::invalid_argument(
"client is nullptr");
417 this->sync_add_client(
419 [
this](std::shared_ptr<rclcpp::ClientBase> && inner_client) {
420 bool already_in_use = inner_client->exchange_in_use_by_wait_set_state(
true);
421 if (already_in_use) {
422 throw std::runtime_error(
"client already in use by another wait set");
427 this->storage_add_client(std::move(inner_client));
428 if (this->wait_result_holding_) {this->wait_result_dirty_ =
true;}
444 if (
nullptr == client) {
445 throw std::invalid_argument(
"client is nullptr");
448 this->sync_remove_client(
450 [
this](std::shared_ptr<rclcpp::ClientBase> && inner_client) {
451 inner_client->exchange_in_use_by_wait_set_state(
false);
455 this->storage_remove_client(std::move(inner_client));
456 if (this->wait_result_holding_) {this->wait_result_dirty_ =
true;}
473 if (
nullptr == service) {
474 throw std::invalid_argument(
"service is nullptr");
477 this->sync_add_service(
479 [
this](std::shared_ptr<rclcpp::ServiceBase> && inner_service) {
480 bool already_in_use = inner_service->exchange_in_use_by_wait_set_state(
true);
481 if (already_in_use) {
482 throw std::runtime_error(
"service already in use by another wait set");
487 this->storage_add_service(std::move(inner_service));
488 if (this->wait_result_holding_) {this->wait_result_dirty_ =
true;}
504 if (
nullptr == service) {
505 throw std::invalid_argument(
"service is nullptr");
508 this->sync_remove_service(
510 [
this](std::shared_ptr<rclcpp::ServiceBase> && inner_service) {
511 inner_service->exchange_in_use_by_wait_set_state(
false);
515 this->storage_remove_service(std::move(inner_service));
516 if (this->wait_result_holding_) {this->wait_result_dirty_ =
true;}
546 std::shared_ptr<rclcpp::Waitable> waitable,
547 std::shared_ptr<void> associated_entity =
nullptr)
549 if (
nullptr == waitable) {
550 throw std::invalid_argument(
"waitable is nullptr");
553 this->sync_add_waitable(
555 std::move(associated_entity),
557 std::shared_ptr<rclcpp::Waitable> && inner_waitable,
558 std::shared_ptr<void> && associated_entity)
560 bool already_in_use = inner_waitable->exchange_in_use_by_wait_set_state(
true);
561 if (already_in_use) {
562 throw std::runtime_error(
"waitable already in use by another wait set");
567 this->storage_add_waitable(std::move(inner_waitable), std::move(associated_entity));
568 if (this->wait_result_holding_) {this->wait_result_dirty_ =
true;}
584 if (
nullptr == waitable) {
585 throw std::invalid_argument(
"waitable is nullptr");
588 this->sync_remove_waitable(
590 [
this](std::shared_ptr<rclcpp::Waitable> && inner_waitable) {
591 inner_waitable->exchange_in_use_by_wait_set_state(
false);
595 this->storage_remove_waitable(std::move(inner_waitable));
596 if (this->wait_result_holding_) {this->wait_result_dirty_ =
true;}
620 this->sync_prune_deleted_entities(
624 this->storage_prune_deleted_entities();
669 template<
class Rep =
int64_t,
class Period = std::milli>
672 wait(std::chrono::duration<Rep, Period> time_to_wait = std::chrono::duration<Rep, Period>(-1))
674 auto time_to_wait_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(time_to_wait);
677 this->storage_acquire_ownerships();
678 RCPPUTILS_SCOPE_EXIT({this->storage_release_ownerships();});
681 return this->
template sync_wait<WaitResult<WaitSetTemplate>>(
687 this->storage_rebuild_rcl_wait_set(
689 this->get_extra_guard_conditions()
695 return this->storage_get_rcl_wait_set();
700 switch (wait_result_kind) {
701 case WaitResultKind::Ready:
703 case WaitResultKind::Timeout:
705 case WaitResultKind::Empty:
708 auto msg =
"unknown WaitResultKind with value: " + std::to_string(wait_result_kind);
709 throw std::runtime_error(msg);
728 wait_result_acquire()
730 if (wait_result_holding_) {
731 throw std::runtime_error(
"wait_result_acquire() called while already holding");
733 wait_result_holding_ =
true;
734 wait_result_dirty_ =
false;
736 this->sync_wait_result_acquire();
746 wait_result_release()
748 if (!wait_result_holding_) {
749 throw std::runtime_error(
"wait_result_release() called while not holding");
751 wait_result_holding_ =
false;
752 wait_result_dirty_ =
false;
754 this->sync_wait_result_release();
757 bool wait_result_holding_ =
false;
758 bool wait_result_dirty_ =
false;
Options used to determine what parts of a subscription get added to or removed from a wait set.
bool include_events
If true, include any events attached to the subscription.
bool include_subscription
If true, include the actual subscription.
bool include_intra_process_waitable
If true, include the waitable used to handle intra process communication.
Interface for introspecting a wait set after waiting on it.
Encapsulates sets of waitable items which can be waited on as a group.
void prune_deleted_entities()
Remove any destroyed entities from the wait set.
void add_guard_condition(std::shared_ptr< rclcpp::GuardCondition > guard_condition)
Add a guard condition to this wait set.
void remove_waitable(std::shared_ptr< rclcpp::Waitable > waitable)
Remove a waitable from this wait set.
void add_service(std::shared_ptr< rclcpp::ServiceBase > service)
Add a service to this wait set.
void add_subscription(std::shared_ptr< rclcpp::SubscriptionBase > subscription, rclcpp::SubscriptionWaitSetMask mask={})
Add a subscription to this wait set.
void remove_client(std::shared_ptr< rclcpp::ClientBase > client)
Remove a client from this wait set.
WaitSetTemplate(const typename StoragePolicy::SubscriptionsIterable &subscriptions={}, const typename StoragePolicy::GuardConditionsIterable &guard_conditions={}, const typename StoragePolicy::TimersIterable &timers={}, const typename StoragePolicy::ClientsIterable &clients={}, const typename StoragePolicy::ServicesIterable &services={}, const typename StoragePolicy::WaitablesIterable &waitables={}, rclcpp::Context::SharedPtr context=rclcpp::contexts::get_global_default_context())
Construct a wait set with optional initial waitable entities and optional custom context.
void add_waitable(std::shared_ptr< rclcpp::Waitable > waitable, std::shared_ptr< void > associated_entity=nullptr)
Add a waitable to this wait set.
void remove_subscription(std::shared_ptr< rclcpp::SubscriptionBase > subscription, rclcpp::SubscriptionWaitSetMask mask={})
Remove a subscription from this wait set.
void add_client(std::shared_ptr< rclcpp::ClientBase > client)
Add a client to this wait set.
void remove_guard_condition(std::shared_ptr< rclcpp::GuardCondition > guard_condition)
Remove a guard condition from this wait set.
void remove_timer(std::shared_ptr< rclcpp::TimerBase > timer)
Remove a timer from this wait set.
RCUTILS_WARN_UNUSED WaitResult< WaitSetTemplate > wait(std::chrono::duration< Rep, Period > time_to_wait=std::chrono::duration< Rep, Period >(-1))
Wait for any of the entities in the wait set to be ready, or a period of time to pass.
void add_timer(std::shared_ptr< rclcpp::TimerBase > timer)
Add a timer to this wait set.
void remove_service(std::shared_ptr< rclcpp::ServiceBase > service)
Remove a service from this wait set.
const rcl_wait_set_t & get_rcl_wait_set() const
Return the internal rcl wait set object.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
Container for subscription's, guard condition's, etc to be waited on.