ROS 2 rclcpp + rcl - humble  humble
ROS 2 C++ Client Library with ROS Client Library
wait_set_template.hpp
1 // Copyright 2020 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef RCLCPP__WAIT_SET_TEMPLATE_HPP_
16 #define RCLCPP__WAIT_SET_TEMPLATE_HPP_
17 
18 #include <chrono>
19 #include <memory>
20 #include <utility>
21 
22 #include "rcl/wait.h"
23 #include "rcpputils/scope_exit.hpp"
24 
25 #include "rclcpp/client.hpp"
26 #include "rclcpp/context.hpp"
27 #include "rclcpp/contexts/default_context.hpp"
28 #include "rclcpp/guard_condition.hpp"
29 #include "rclcpp/macros.hpp"
30 #include "rclcpp/service.hpp"
31 #include "rclcpp/subscription_base.hpp"
32 #include "rclcpp/subscription_wait_set_mask.hpp"
33 #include "rclcpp/timer.hpp"
34 #include "rclcpp/visibility_control.hpp"
35 #include "rclcpp/wait_result.hpp"
36 #include "rclcpp/waitable.hpp"
37 
38 namespace rclcpp
39 {
40 
42 
46 template<class SynchronizationPolicy, class StoragePolicy>
47 class WaitSetTemplate final : private SynchronizationPolicy, private StoragePolicy
48 {
49 public:
50  RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(WaitSetTemplate)
51 
52  using typename StoragePolicy::SubscriptionEntry;
53  using typename StoragePolicy::WaitableEntry;
54 
56 
70  explicit
72  const typename StoragePolicy::SubscriptionsIterable & subscriptions = {},
73  const typename StoragePolicy::GuardConditionsIterable & guard_conditions = {},
74  const typename StoragePolicy::TimersIterable & timers = {},
75  const typename StoragePolicy::ClientsIterable & clients = {},
76  const typename StoragePolicy::ServicesIterable & services = {},
77  const typename StoragePolicy::WaitablesIterable & waitables = {},
78  rclcpp::Context::SharedPtr context =
79  rclcpp::contexts::get_global_default_context())
80  : SynchronizationPolicy(context),
81  StoragePolicy(
82  subscriptions,
83  guard_conditions,
84  // this method comes from the SynchronizationPolicy
85  this->get_extra_guard_conditions(),
86  timers,
87  clients,
88  services,
89  waitables,
90  context)
91  {}
92 
94 
99  const rcl_wait_set_t &
101  {
102  // this method comes from the StoragePolicy
103  return this->storage_get_rcl_wait_set();
104  }
105 
107 
129  void
131  std::shared_ptr<rclcpp::SubscriptionBase> subscription,
133  {
134  if (nullptr == subscription) {
135  throw std::invalid_argument("subscription is nullptr");
136  }
137  // this method comes from the SynchronizationPolicy
138  this->sync_add_subscription(
139  std::move(subscription),
140  mask,
141  [this](
142  std::shared_ptr<rclcpp::SubscriptionBase> && inner_subscription,
143  const rclcpp::SubscriptionWaitSetMask & mask)
144  {
145  // These methods comes from the StoragePolicy, and may not exist for
146  // fixed sized storage policies.
147  // It will throw if the subscription has already been added.
148  if (mask.include_subscription) {
149  auto local_subscription = inner_subscription;
150  bool already_in_use =
151  local_subscription->exchange_in_use_by_wait_set_state(local_subscription.get(), true);
152  if (already_in_use) {
153  throw std::runtime_error("subscription already associated with a wait set");
154  }
155  this->storage_add_subscription(std::move(local_subscription));
156  }
157  if (mask.include_events) {
158  for (auto key_event_pair : inner_subscription->get_event_handlers()) {
159  auto event = key_event_pair.second;
160  auto local_subscription = inner_subscription;
161  bool already_in_use =
162  local_subscription->exchange_in_use_by_wait_set_state(event.get(), true);
163  if (already_in_use) {
164  throw std::runtime_error("subscription event already associated with a wait set");
165  }
166  this->storage_add_waitable(std::move(event), std::move(local_subscription));
167  }
168  }
170  auto local_subscription = inner_subscription;
171  auto waitable = inner_subscription->get_intra_process_waitable();
172  if (nullptr != waitable) {
173  bool already_in_use = local_subscription->exchange_in_use_by_wait_set_state(
174  waitable.get(),
175  true);
176  if (already_in_use) {
177  throw std::runtime_error(
178  "subscription intra-process waitable already associated with a wait set");
179  }
180  this->storage_add_waitable(
181  std::move(inner_subscription->get_intra_process_waitable()),
182  std::move(local_subscription));
183  }
184  }
185  });
186  }
187 
189 
204  void
206  std::shared_ptr<rclcpp::SubscriptionBase> subscription,
208  {
209  if (nullptr == subscription) {
210  throw std::invalid_argument("subscription is nullptr");
211  }
212  // this method comes from the SynchronizationPolicy
213  this->sync_remove_subscription(
214  std::move(subscription),
215  mask,
216  [this](
217  std::shared_ptr<rclcpp::SubscriptionBase> && inner_subscription,
218  const rclcpp::SubscriptionWaitSetMask & mask)
219  {
220  // This method comes from the StoragePolicy, and it may not exist for
221  // fixed sized storage policies.
222  // It will throw if the subscription is not in the wait set.
223  if (mask.include_subscription) {
224  auto local_subscription = inner_subscription;
225  local_subscription->exchange_in_use_by_wait_set_state(local_subscription.get(), false);
226  this->storage_remove_subscription(std::move(local_subscription));
227  }
228  if (mask.include_events) {
229  for (auto key_event_pair : inner_subscription->get_event_handlers()) {
230  auto event = key_event_pair.second;
231  auto local_subscription = inner_subscription;
232  local_subscription->exchange_in_use_by_wait_set_state(event.get(), false);
233  this->storage_remove_waitable(std::move(event));
234  }
235  }
237  auto local_waitable = inner_subscription->get_intra_process_waitable();
238  if (nullptr != local_waitable) {
239  // This is the case when intra process is enabled for the subscription.
240  inner_subscription->exchange_in_use_by_wait_set_state(local_waitable.get(), false);
241  this->storage_remove_waitable(std::move(local_waitable));
242  }
243  }
244  });
245  }
246 
248 
274  void
275  add_guard_condition(std::shared_ptr<rclcpp::GuardCondition> guard_condition)
276  {
277  if (nullptr == guard_condition) {
278  throw std::invalid_argument("guard_condition is nullptr");
279  }
280  // this method comes from the SynchronizationPolicy
281  this->sync_add_guard_condition(
282  std::move(guard_condition),
283  [this](std::shared_ptr<rclcpp::GuardCondition> && inner_guard_condition) {
284  bool already_in_use = inner_guard_condition->exchange_in_use_by_wait_set_state(true);
285  if (already_in_use) {
286  throw std::runtime_error("guard condition already in use by another wait set");
287  }
288  // This method comes from the StoragePolicy, and it may not exist for
289  // fixed sized storage policies.
290  // It will throw if the guard condition has already been added.
291  this->storage_add_guard_condition(std::move(inner_guard_condition));
292  });
293  }
294 
296 
314  void
315  remove_guard_condition(std::shared_ptr<rclcpp::GuardCondition> guard_condition)
316  {
317  if (nullptr == guard_condition) {
318  throw std::invalid_argument("guard_condition is nullptr");
319  }
320  // this method comes from the SynchronizationPolicy
321  this->sync_remove_guard_condition(
322  std::move(guard_condition),
323  [this](std::shared_ptr<rclcpp::GuardCondition> && inner_guard_condition) {
324  inner_guard_condition->exchange_in_use_by_wait_set_state(false);
325  // This method comes from the StoragePolicy, and it may not exist for
326  // fixed sized storage policies.
327  // It will throw if the guard condition is not in the wait set.
328  this->storage_remove_guard_condition(std::move(inner_guard_condition));
329  });
330  }
331 
333 
342  void
343  add_timer(std::shared_ptr<rclcpp::TimerBase> timer)
344  {
345  if (nullptr == timer) {
346  throw std::invalid_argument("timer is nullptr");
347  }
348  // this method comes from the SynchronizationPolicy
349  this->sync_add_timer(
350  std::move(timer),
351  [this](std::shared_ptr<rclcpp::TimerBase> && inner_timer) {
352  bool already_in_use = inner_timer->exchange_in_use_by_wait_set_state(true);
353  if (already_in_use) {
354  throw std::runtime_error("timer already in use by another wait set");
355  }
356  // This method comes from the StoragePolicy, and it may not exist for
357  // fixed sized storage policies.
358  // It will throw if the timer has already been added.
359  this->storage_add_timer(std::move(inner_timer));
360  });
361  }
362 
364 
372  void
373  remove_timer(std::shared_ptr<rclcpp::TimerBase> timer)
374  {
375  if (nullptr == timer) {
376  throw std::invalid_argument("timer is nullptr");
377  }
378  // this method comes from the SynchronizationPolicy
379  this->sync_remove_timer(
380  std::move(timer),
381  [this](std::shared_ptr<rclcpp::TimerBase> && inner_timer) {
382  inner_timer->exchange_in_use_by_wait_set_state(false);
383  // This method comes from the StoragePolicy, and it may not exist for
384  // fixed sized storage policies.
385  // It will throw if the timer is not in the wait set.
386  this->storage_remove_timer(std::move(inner_timer));
387  });
388  }
389 
391 
400  void
401  add_client(std::shared_ptr<rclcpp::ClientBase> client)
402  {
403  if (nullptr == client) {
404  throw std::invalid_argument("client is nullptr");
405  }
406  // this method comes from the SynchronizationPolicy
407  this->sync_add_client(
408  std::move(client),
409  [this](std::shared_ptr<rclcpp::ClientBase> && inner_client) {
410  bool already_in_use = inner_client->exchange_in_use_by_wait_set_state(true);
411  if (already_in_use) {
412  throw std::runtime_error("client already in use by another wait set");
413  }
414  // This method comes from the StoragePolicy, and it may not exist for
415  // fixed sized storage policies.
416  // It will throw if the client has already been added.
417  this->storage_add_client(std::move(inner_client));
418  });
419  }
420 
422 
430  void
431  remove_client(std::shared_ptr<rclcpp::ClientBase> client)
432  {
433  if (nullptr == client) {
434  throw std::invalid_argument("client is nullptr");
435  }
436  // this method comes from the SynchronizationPolicy
437  this->sync_remove_client(
438  std::move(client),
439  [this](std::shared_ptr<rclcpp::ClientBase> && inner_client) {
440  inner_client->exchange_in_use_by_wait_set_state(false);
441  // This method comes from the StoragePolicy, and it may not exist for
442  // fixed sized storage policies.
443  // It will throw if the client is not in the wait set.
444  this->storage_remove_client(std::move(inner_client));
445  });
446  }
447 
449 
458  void
459  add_service(std::shared_ptr<rclcpp::ServiceBase> service)
460  {
461  if (nullptr == service) {
462  throw std::invalid_argument("service is nullptr");
463  }
464  // this method comes from the SynchronizationPolicy
465  this->sync_add_service(
466  std::move(service),
467  [this](std::shared_ptr<rclcpp::ServiceBase> && inner_service) {
468  bool already_in_use = inner_service->exchange_in_use_by_wait_set_state(true);
469  if (already_in_use) {
470  throw std::runtime_error("service already in use by another wait set");
471  }
472  // This method comes from the StoragePolicy, and it may not exist for
473  // fixed sized storage policies.
474  // It will throw if the service has already been added.
475  this->storage_add_service(std::move(inner_service));
476  });
477  }
478 
480 
488  void
489  remove_service(std::shared_ptr<rclcpp::ServiceBase> service)
490  {
491  if (nullptr == service) {
492  throw std::invalid_argument("service is nullptr");
493  }
494  // this method comes from the SynchronizationPolicy
495  this->sync_remove_service(
496  std::move(service),
497  [this](std::shared_ptr<rclcpp::ServiceBase> && inner_service) {
498  inner_service->exchange_in_use_by_wait_set_state(false);
499  // This method comes from the StoragePolicy, and it may not exist for
500  // fixed sized storage policies.
501  // It will throw if the service is not in the wait set.
502  this->storage_remove_service(std::move(inner_service));
503  });
504  }
505 
507 
530  void
532  std::shared_ptr<rclcpp::Waitable> waitable,
533  std::shared_ptr<void> associated_entity = nullptr)
534  {
535  if (nullptr == waitable) {
536  throw std::invalid_argument("waitable is nullptr");
537  }
538  // this method comes from the SynchronizationPolicy
539  this->sync_add_waitable(
540  std::move(waitable),
541  std::move(associated_entity),
542  [this](
543  std::shared_ptr<rclcpp::Waitable> && inner_waitable,
544  std::shared_ptr<void> && associated_entity)
545  {
546  bool already_in_use = inner_waitable->exchange_in_use_by_wait_set_state(true);
547  if (already_in_use) {
548  throw std::runtime_error("waitable already in use by another wait set");
549  }
550  // This method comes from the StoragePolicy, and it may not exist for
551  // fixed sized storage policies.
552  // It will throw if the waitable has already been added.
553  this->storage_add_waitable(std::move(inner_waitable), std::move(associated_entity));
554  });
555  }
556 
558 
566  void
567  remove_waitable(std::shared_ptr<rclcpp::Waitable> waitable)
568  {
569  if (nullptr == waitable) {
570  throw std::invalid_argument("waitable is nullptr");
571  }
572  // this method comes from the SynchronizationPolicy
573  this->sync_remove_waitable(
574  std::move(waitable),
575  [this](std::shared_ptr<rclcpp::Waitable> && inner_waitable) {
576  inner_waitable->exchange_in_use_by_wait_set_state(false);
577  // This method comes from the StoragePolicy, and it may not exist for
578  // fixed sized storage policies.
579  // It will throw if the waitable is not in the wait set.
580  this->storage_remove_waitable(std::move(inner_waitable));
581  });
582  }
583 
585 
600  void
602  {
603  // this method comes from the SynchronizationPolicy
604  this->sync_prune_deleted_entities(
605  [this]() {
606  // This method comes from the StoragePolicy, and it may not exist for
607  // fixed sized storage policies.
608  this->storage_prune_deleted_entities();
609  });
610  }
611 
613 
653  template<class Rep = int64_t, class Period = std::milli>
654  RCUTILS_WARN_UNUSED
656  wait(std::chrono::duration<Rep, Period> time_to_wait = std::chrono::duration<Rep, Period>(-1))
657  {
658  auto time_to_wait_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(time_to_wait);
659 
660  // ensure the ownership of the entities in the wait set is shared for the duration of wait
661  this->storage_acquire_ownerships();
662  RCPPUTILS_SCOPE_EXIT({this->storage_release_ownerships();});
663 
664  // this method comes from the SynchronizationPolicy
665  return this->template sync_wait<WaitResult<WaitSetTemplate>>(
666  // pass along the time_to_wait duration as nanoseconds
667  time_to_wait_ns,
668  // this method provides the ability to rebuild the wait set, if needed
669  [this]() {
670  // This method comes from the StoragePolicy
671  this->storage_rebuild_rcl_wait_set(
672  // This method comes from the SynchronizationPolicy
673  this->get_extra_guard_conditions()
674  );
675  },
676  // this method provides access to the rcl wait set
677  [this]() -> rcl_wait_set_t & {
678  // This method comes from the StoragePolicy
679  return this->storage_get_rcl_wait_set();
680  },
681  // this method provides a way to create the WaitResult
682  [this](WaitResultKind wait_result_kind) -> WaitResult<WaitSetTemplate> {
683  // convert the result into a WaitResult
684  switch (wait_result_kind) {
685  case WaitResultKind::Ready:
687  case WaitResultKind::Timeout:
689  case WaitResultKind::Empty:
691  default:
692  auto msg = "unknown WaitResultKind with value: " + std::to_string(wait_result_kind);
693  throw std::runtime_error(msg);
694  }
695  }
696  );
697  }
698 
699 private:
700  // Add WaitResult type as a friend so it can call private methods for
701  // acquiring and releasing resources as the WaitResult is initialized and
702  // destructed, respectively.
704 
706 
711  void
712  wait_result_acquire()
713  {
714  if (wait_result_holding_) {
715  throw std::runtime_error("wait_result_acquire() called while already holding");
716  }
717  wait_result_holding_ = true;
718  // this method comes from the SynchronizationPolicy
719  this->sync_wait_result_acquire();
720  // this method comes from the StoragePolicy
721  this->storage_acquire_ownerships();
722  }
723 
725 
730  void
731  wait_result_release()
732  {
733  if (!wait_result_holding_) {
734  throw std::runtime_error("wait_result_release() called while not holding");
735  }
736  wait_result_holding_ = false;
737  // this method comes from the StoragePolicy
738  this->storage_release_ownerships();
739  // this method comes from the SynchronizationPolicy
740  this->sync_wait_result_release();
741  }
742 
743  bool wait_result_holding_ = false;
744 };
745 
746 } // namespace rclcpp
747 
748 #endif // RCLCPP__WAIT_SET_TEMPLATE_HPP_
Options used to determine what parts of a subscription get added to or removed from a wait set.
bool include_events
If true, include any events attached to the subscription.
bool include_subscription
If true, include the actual subscription.
bool include_intra_process_waitable
If true, include the waitable used to handle intra process communication.
Interface for introspecting a wait set after waiting on it.
Definition: wait_result.hpp:55
Encapsulates sets of waitable items which can be waited on as a group.
void prune_deleted_entities()
Remove any destroyed entities from the wait set.
void add_guard_condition(std::shared_ptr< rclcpp::GuardCondition > guard_condition)
Add a guard condition to this wait set.
void remove_waitable(std::shared_ptr< rclcpp::Waitable > waitable)
Remove a waitable from this wait set.
void add_service(std::shared_ptr< rclcpp::ServiceBase > service)
Add a service to this wait set.
void add_subscription(std::shared_ptr< rclcpp::SubscriptionBase > subscription, rclcpp::SubscriptionWaitSetMask mask={})
Add a subscription to this wait set.
void remove_client(std::shared_ptr< rclcpp::ClientBase > client)
Remove a client from this wait set.
WaitSetTemplate(const typename StoragePolicy::SubscriptionsIterable &subscriptions={}, const typename StoragePolicy::GuardConditionsIterable &guard_conditions={}, const typename StoragePolicy::TimersIterable &timers={}, const typename StoragePolicy::ClientsIterable &clients={}, const typename StoragePolicy::ServicesIterable &services={}, const typename StoragePolicy::WaitablesIterable &waitables={}, rclcpp::Context::SharedPtr context=rclcpp::contexts::get_global_default_context())
Construct a wait set with optional initial waitable entities and optional custom context.
void add_waitable(std::shared_ptr< rclcpp::Waitable > waitable, std::shared_ptr< void > associated_entity=nullptr)
Add a waitable to this wait set.
void remove_subscription(std::shared_ptr< rclcpp::SubscriptionBase > subscription, rclcpp::SubscriptionWaitSetMask mask={})
Remove a subscription from this wait set.
void add_client(std::shared_ptr< rclcpp::ClientBase > client)
Add a client to this wait set.
void remove_guard_condition(std::shared_ptr< rclcpp::GuardCondition > guard_condition)
Remove a guard condition from this wait set.
void remove_timer(std::shared_ptr< rclcpp::TimerBase > timer)
Remove a timer from this wait set.
RCUTILS_WARN_UNUSED WaitResult< WaitSetTemplate > wait(std::chrono::duration< Rep, Period > time_to_wait=std::chrono::duration< Rep, Period >(-1))
Wait for any of the entities in the wait set to be ready, or a period of time to pass.
void add_timer(std::shared_ptr< rclcpp::TimerBase > timer)
Add a timer to this wait set.
void remove_service(std::shared_ptr< rclcpp::ServiceBase > service)
Remove a service from this wait set.
const rcl_wait_set_t & get_rcl_wait_set() const
Return the internal rcl wait set object.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
Container for subscription's, guard condition's, etc to be waited on.
Definition: wait.h:42