ROS 2 rclcpp + rcl - rolling  rolling-a919a6e5
ROS 2 C++ Client Library with ROS Client Library
wait_set_template.hpp
1 // Copyright 2020 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef RCLCPP__WAIT_SET_TEMPLATE_HPP_
16 #define RCLCPP__WAIT_SET_TEMPLATE_HPP_
17 
18 #include <chrono>
19 #include <memory>
20 #include <utility>
21 
22 #include "rcl/wait.h"
23 #include "rcpputils/scope_exit.hpp"
24 
25 #include "rclcpp/client.hpp"
26 #include "rclcpp/context.hpp"
27 #include "rclcpp/contexts/default_context.hpp"
28 #include "rclcpp/guard_condition.hpp"
29 #include "rclcpp/macros.hpp"
30 #include "rclcpp/service.hpp"
31 #include "rclcpp/subscription_base.hpp"
32 #include "rclcpp/subscription_wait_set_mask.hpp"
33 #include "rclcpp/timer.hpp"
34 #include "rclcpp/visibility_control.hpp"
35 #include "rclcpp/wait_result.hpp"
36 #include "rclcpp/waitable.hpp"
37 
38 namespace rclcpp
39 {
40 
42 
46 template<class SynchronizationPolicy, class StoragePolicy>
47 class WaitSetTemplate final : private SynchronizationPolicy, private StoragePolicy
48 {
49 public:
50  RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(WaitSetTemplate)
51 
52  using typename StoragePolicy::SubscriptionEntry;
53  using typename StoragePolicy::WaitableEntry;
54 
56 
70  explicit
72  const typename StoragePolicy::SubscriptionsIterable & subscriptions = {},
73  const typename StoragePolicy::GuardConditionsIterable & guard_conditions = {},
74  const typename StoragePolicy::TimersIterable & timers = {},
75  const typename StoragePolicy::ClientsIterable & clients = {},
76  const typename StoragePolicy::ServicesIterable & services = {},
77  const typename StoragePolicy::WaitablesIterable & waitables = {},
78  rclcpp::Context::SharedPtr context =
79  rclcpp::contexts::get_global_default_context())
80  : SynchronizationPolicy(context),
81  StoragePolicy(
82  subscriptions,
83  guard_conditions,
84  // this method comes from the SynchronizationPolicy
85  this->get_extra_guard_conditions(),
86  timers,
87  clients,
88  services,
89  waitables,
90  context)
91  {}
92 
94 
99  const rcl_wait_set_t &
101  {
102  // this method comes from the StoragePolicy
103  return this->storage_get_rcl_wait_set();
104  }
105 
107 
129  void
131  std::shared_ptr<rclcpp::SubscriptionBase> subscription,
133  {
134  if (nullptr == subscription) {
135  throw std::invalid_argument("subscription is nullptr");
136  }
137  // this method comes from the SynchronizationPolicy
138  this->sync_add_subscription(
139  std::move(subscription),
140  mask,
141  [this](
142  std::shared_ptr<rclcpp::SubscriptionBase> && inner_subscription,
143  const rclcpp::SubscriptionWaitSetMask & mask)
144  {
145  // These methods comes from the StoragePolicy, and may not exist for
146  // fixed sized storage policies.
147  // It will throw if the subscription has already been added.
148  if (mask.include_subscription) {
149  auto local_subscription = inner_subscription;
150  bool already_in_use =
151  local_subscription->exchange_in_use_by_wait_set_state(local_subscription.get(), true);
152  if (already_in_use) {
153  throw std::runtime_error("subscription already associated with a wait set");
154  }
155  this->storage_add_subscription(std::move(local_subscription));
156  if (this->wait_result_holding_) {this->wait_result_dirty_ = true;}
157  }
158  if (mask.include_events) {
159  for (auto key_event_pair : inner_subscription->get_event_handlers()) {
160  auto event = key_event_pair.second;
161  auto local_subscription = inner_subscription;
162  bool already_in_use =
163  local_subscription->exchange_in_use_by_wait_set_state(event.get(), true);
164  if (already_in_use) {
165  throw std::runtime_error("subscription event already associated with a wait set");
166  }
167  this->storage_add_waitable(std::move(event), std::move(local_subscription));
168  if (this->wait_result_holding_) {this->wait_result_dirty_ = true;}
169  }
170  }
172  auto local_subscription = inner_subscription;
173  auto waitable = inner_subscription->get_intra_process_waitable();
174  if (nullptr != waitable) {
175  bool already_in_use = local_subscription->exchange_in_use_by_wait_set_state(
176  waitable.get(),
177  true);
178  if (already_in_use) {
179  throw std::runtime_error(
180  "subscription intra-process waitable already associated with a wait set");
181  }
182  this->storage_add_waitable(
183  std::move(inner_subscription->get_intra_process_waitable()),
184  std::move(local_subscription));
185  if (this->wait_result_holding_) {this->wait_result_dirty_ = true;}
186  }
187  }
188  });
189  }
190 
192 
207  void
209  std::shared_ptr<rclcpp::SubscriptionBase> subscription,
211  {
212  if (nullptr == subscription) {
213  throw std::invalid_argument("subscription is nullptr");
214  }
215  // this method comes from the SynchronizationPolicy
216  this->sync_remove_subscription(
217  std::move(subscription),
218  mask,
219  [this](
220  std::shared_ptr<rclcpp::SubscriptionBase> && inner_subscription,
221  const rclcpp::SubscriptionWaitSetMask & mask)
222  {
223  // This method comes from the StoragePolicy, and it may not exist for
224  // fixed sized storage policies.
225  // It will throw if the subscription is not in the wait set.
226  if (mask.include_subscription) {
227  auto local_subscription = inner_subscription;
228  local_subscription->exchange_in_use_by_wait_set_state(local_subscription.get(), false);
229  this->storage_remove_subscription(std::move(local_subscription));
230  if (this->wait_result_holding_) {this->wait_result_dirty_ = true;}
231  }
232  if (mask.include_events) {
233  for (auto key_event_pair : inner_subscription->get_event_handlers()) {
234  auto event = key_event_pair.second;
235  auto local_subscription = inner_subscription;
236  local_subscription->exchange_in_use_by_wait_set_state(event.get(), false);
237  this->storage_remove_waitable(std::move(event));
238  if (this->wait_result_holding_) {this->wait_result_dirty_ = true;}
239  }
240  }
242  auto local_waitable = inner_subscription->get_intra_process_waitable();
243  if (nullptr != local_waitable) {
244  // This is the case when intra process is enabled for the subscription.
245  inner_subscription->exchange_in_use_by_wait_set_state(local_waitable.get(), false);
246  this->storage_remove_waitable(std::move(local_waitable));
247  if (this->wait_result_holding_) {this->wait_result_dirty_ = true;}
248  }
249  }
250  });
251  }
252 
254 
280  void
281  add_guard_condition(std::shared_ptr<rclcpp::GuardCondition> guard_condition)
282  {
283  if (nullptr == guard_condition) {
284  throw std::invalid_argument("guard_condition is nullptr");
285  }
286  // this method comes from the SynchronizationPolicy
287  this->sync_add_guard_condition(
288  std::move(guard_condition),
289  [this](std::shared_ptr<rclcpp::GuardCondition> && inner_guard_condition) {
290  bool already_in_use = inner_guard_condition->exchange_in_use_by_wait_set_state(true);
291  if (already_in_use) {
292  throw std::runtime_error("guard condition already in use by another wait set");
293  }
294  // This method comes from the StoragePolicy, and it may not exist for
295  // fixed sized storage policies.
296  // It will throw if the guard condition has already been added.
297  this->storage_add_guard_condition(std::move(inner_guard_condition));
298  if (this->wait_result_holding_) {this->wait_result_dirty_ = true;}
299  });
300  }
301 
303 
321  void
322  remove_guard_condition(std::shared_ptr<rclcpp::GuardCondition> guard_condition)
323  {
324  if (nullptr == guard_condition) {
325  throw std::invalid_argument("guard_condition is nullptr");
326  }
327  // this method comes from the SynchronizationPolicy
328  this->sync_remove_guard_condition(
329  std::move(guard_condition),
330  [this](std::shared_ptr<rclcpp::GuardCondition> && inner_guard_condition) {
331  inner_guard_condition->exchange_in_use_by_wait_set_state(false);
332  // This method comes from the StoragePolicy, and it may not exist for
333  // fixed sized storage policies.
334  // It will throw if the guard condition is not in the wait set.
335  this->storage_remove_guard_condition(std::move(inner_guard_condition));
336  if (this->wait_result_holding_) {this->wait_result_dirty_ = true;}
337  });
338  }
339 
341 
350  void
351  add_timer(std::shared_ptr<rclcpp::TimerBase> timer)
352  {
353  if (nullptr == timer) {
354  throw std::invalid_argument("timer is nullptr");
355  }
356  // this method comes from the SynchronizationPolicy
357  this->sync_add_timer(
358  std::move(timer),
359  [this](std::shared_ptr<rclcpp::TimerBase> && inner_timer) {
360  bool already_in_use = inner_timer->exchange_in_use_by_wait_set_state(true);
361  if (already_in_use) {
362  throw std::runtime_error("timer already in use by another wait set");
363  }
364  // This method comes from the StoragePolicy, and it may not exist for
365  // fixed sized storage policies.
366  // It will throw if the timer has already been added.
367  this->storage_add_timer(std::move(inner_timer));
368  if (this->wait_result_holding_) {this->wait_result_dirty_ = true;}
369  });
370  }
371 
373 
381  void
382  remove_timer(std::shared_ptr<rclcpp::TimerBase> timer)
383  {
384  if (nullptr == timer) {
385  throw std::invalid_argument("timer is nullptr");
386  }
387  // this method comes from the SynchronizationPolicy
388  this->sync_remove_timer(
389  std::move(timer),
390  [this](std::shared_ptr<rclcpp::TimerBase> && inner_timer) {
391  inner_timer->exchange_in_use_by_wait_set_state(false);
392  // This method comes from the StoragePolicy, and it may not exist for
393  // fixed sized storage policies.
394  // It will throw if the timer is not in the wait set.
395  this->storage_remove_timer(std::move(inner_timer));
396  if (this->wait_result_holding_) {this->wait_result_dirty_ = true;}
397  });
398  }
399 
401 
410  void
411  add_client(std::shared_ptr<rclcpp::ClientBase> client)
412  {
413  if (nullptr == client) {
414  throw std::invalid_argument("client is nullptr");
415  }
416  // this method comes from the SynchronizationPolicy
417  this->sync_add_client(
418  std::move(client),
419  [this](std::shared_ptr<rclcpp::ClientBase> && inner_client) {
420  bool already_in_use = inner_client->exchange_in_use_by_wait_set_state(true);
421  if (already_in_use) {
422  throw std::runtime_error("client already in use by another wait set");
423  }
424  // This method comes from the StoragePolicy, and it may not exist for
425  // fixed sized storage policies.
426  // It will throw if the client has already been added.
427  this->storage_add_client(std::move(inner_client));
428  if (this->wait_result_holding_) {this->wait_result_dirty_ = true;}
429  });
430  }
431 
433 
441  void
442  remove_client(std::shared_ptr<rclcpp::ClientBase> client)
443  {
444  if (nullptr == client) {
445  throw std::invalid_argument("client is nullptr");
446  }
447  // this method comes from the SynchronizationPolicy
448  this->sync_remove_client(
449  std::move(client),
450  [this](std::shared_ptr<rclcpp::ClientBase> && inner_client) {
451  inner_client->exchange_in_use_by_wait_set_state(false);
452  // This method comes from the StoragePolicy, and it may not exist for
453  // fixed sized storage policies.
454  // It will throw if the client is not in the wait set.
455  this->storage_remove_client(std::move(inner_client));
456  if (this->wait_result_holding_) {this->wait_result_dirty_ = true;}
457  });
458  }
459 
461 
470  void
471  add_service(std::shared_ptr<rclcpp::ServiceBase> service)
472  {
473  if (nullptr == service) {
474  throw std::invalid_argument("service is nullptr");
475  }
476  // this method comes from the SynchronizationPolicy
477  this->sync_add_service(
478  std::move(service),
479  [this](std::shared_ptr<rclcpp::ServiceBase> && inner_service) {
480  bool already_in_use = inner_service->exchange_in_use_by_wait_set_state(true);
481  if (already_in_use) {
482  throw std::runtime_error("service already in use by another wait set");
483  }
484  // This method comes from the StoragePolicy, and it may not exist for
485  // fixed sized storage policies.
486  // It will throw if the service has already been added.
487  this->storage_add_service(std::move(inner_service));
488  if (this->wait_result_holding_) {this->wait_result_dirty_ = true;}
489  });
490  }
491 
493 
501  void
502  remove_service(std::shared_ptr<rclcpp::ServiceBase> service)
503  {
504  if (nullptr == service) {
505  throw std::invalid_argument("service is nullptr");
506  }
507  // this method comes from the SynchronizationPolicy
508  this->sync_remove_service(
509  std::move(service),
510  [this](std::shared_ptr<rclcpp::ServiceBase> && inner_service) {
511  inner_service->exchange_in_use_by_wait_set_state(false);
512  // This method comes from the StoragePolicy, and it may not exist for
513  // fixed sized storage policies.
514  // It will throw if the service is not in the wait set.
515  this->storage_remove_service(std::move(inner_service));
516  if (this->wait_result_holding_) {this->wait_result_dirty_ = true;}
517  });
518  }
519 
521 
544  void
546  std::shared_ptr<rclcpp::Waitable> waitable,
547  std::shared_ptr<void> associated_entity = nullptr)
548  {
549  if (nullptr == waitable) {
550  throw std::invalid_argument("waitable is nullptr");
551  }
552  // this method comes from the SynchronizationPolicy
553  this->sync_add_waitable(
554  std::move(waitable),
555  std::move(associated_entity),
556  [this](
557  std::shared_ptr<rclcpp::Waitable> && inner_waitable,
558  std::shared_ptr<void> && associated_entity)
559  {
560  bool already_in_use = inner_waitable->exchange_in_use_by_wait_set_state(true);
561  if (already_in_use) {
562  throw std::runtime_error("waitable already in use by another wait set");
563  }
564  // This method comes from the StoragePolicy, and it may not exist for
565  // fixed sized storage policies.
566  // It will throw if the waitable has already been added.
567  this->storage_add_waitable(std::move(inner_waitable), std::move(associated_entity));
568  if (this->wait_result_holding_) {this->wait_result_dirty_ = true;}
569  });
570  }
571 
573 
581  void
582  remove_waitable(std::shared_ptr<rclcpp::Waitable> waitable)
583  {
584  if (nullptr == waitable) {
585  throw std::invalid_argument("waitable is nullptr");
586  }
587  // this method comes from the SynchronizationPolicy
588  this->sync_remove_waitable(
589  std::move(waitable),
590  [this](std::shared_ptr<rclcpp::Waitable> && inner_waitable) {
591  inner_waitable->exchange_in_use_by_wait_set_state(false);
592  // This method comes from the StoragePolicy, and it may not exist for
593  // fixed sized storage policies.
594  // It will throw if the waitable is not in the wait set.
595  this->storage_remove_waitable(std::move(inner_waitable));
596  if (this->wait_result_holding_) {this->wait_result_dirty_ = true;}
597  });
598  }
599 
601 
616  void
618  {
619  // this method comes from the SynchronizationPolicy
620  this->sync_prune_deleted_entities(
621  [this]() {
622  // This method comes from the StoragePolicy, and it may not exist for
623  // fixed sized storage policies.
624  this->storage_prune_deleted_entities();
625  });
626  }
627 
629 
669  template<class Rep = int64_t, class Period = std::milli>
670  RCUTILS_WARN_UNUSED
672  wait(std::chrono::duration<Rep, Period> time_to_wait = std::chrono::duration<Rep, Period>(-1))
673  {
674  auto time_to_wait_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(time_to_wait);
675 
676  // ensure the ownership of the entities in the wait set is shared for the duration of wait
677  this->storage_acquire_ownerships();
678  RCPPUTILS_SCOPE_EXIT({this->storage_release_ownerships();});
679 
680  // this method comes from the SynchronizationPolicy
681  return this->template sync_wait<WaitResult<WaitSetTemplate>>(
682  // pass along the time_to_wait duration as nanoseconds
683  time_to_wait_ns,
684  // this method provides the ability to rebuild the wait set, if needed
685  [this]() {
686  // This method comes from the StoragePolicy
687  this->storage_rebuild_rcl_wait_set(
688  // This method comes from the SynchronizationPolicy
689  this->get_extra_guard_conditions()
690  );
691  },
692  // this method provides access to the rcl wait set
693  [this]() -> rcl_wait_set_t & {
694  // This method comes from the StoragePolicy
695  return this->storage_get_rcl_wait_set();
696  },
697  // this method provides a way to create the WaitResult
698  [this](WaitResultKind wait_result_kind) -> WaitResult<WaitSetTemplate> {
699  // convert the result into a WaitResult
700  switch (wait_result_kind) {
701  case WaitResultKind::Ready:
703  case WaitResultKind::Timeout:
705  case WaitResultKind::Empty:
707  default:
708  auto msg = "unknown WaitResultKind with value: " + std::to_string(wait_result_kind);
709  throw std::runtime_error(msg);
710  }
711  }
712  );
713  }
714 
715 private:
716  // Add WaitResult type as a friend so it can call private methods for
717  // acquiring and releasing resources as the WaitResult is initialized and
718  // destructed, respectively.
720 
722 
727  void
728  wait_result_acquire()
729  {
730  if (wait_result_holding_) {
731  throw std::runtime_error("wait_result_acquire() called while already holding");
732  }
733  wait_result_holding_ = true;
734  wait_result_dirty_ = false;
735  // this method comes from the SynchronizationPolicy
736  this->sync_wait_result_acquire();
737  }
738 
740 
745  void
746  wait_result_release()
747  {
748  if (!wait_result_holding_) {
749  throw std::runtime_error("wait_result_release() called while not holding");
750  }
751  wait_result_holding_ = false;
752  wait_result_dirty_ = false;
753  // this method comes from the SynchronizationPolicy
754  this->sync_wait_result_release();
755  }
756 
757  bool wait_result_holding_ = false;
758  bool wait_result_dirty_ = false;
759 };
760 
761 } // namespace rclcpp
762 
763 #endif // RCLCPP__WAIT_SET_TEMPLATE_HPP_
Options used to determine what parts of a subscription get added to or removed from a wait set.
bool include_events
If true, include any events attached to the subscription.
bool include_subscription
If true, include the actual subscription.
bool include_intra_process_waitable
If true, include the waitable used to handle intra process communication.
Interface for introspecting a wait set after waiting on it.
Definition: wait_result.hpp:63
Encapsulates sets of waitable items which can be waited on as a group.
void prune_deleted_entities()
Remove any destroyed entities from the wait set.
void add_guard_condition(std::shared_ptr< rclcpp::GuardCondition > guard_condition)
Add a guard condition to this wait set.
void remove_waitable(std::shared_ptr< rclcpp::Waitable > waitable)
Remove a waitable from this wait set.
void add_service(std::shared_ptr< rclcpp::ServiceBase > service)
Add a service to this wait set.
void add_subscription(std::shared_ptr< rclcpp::SubscriptionBase > subscription, rclcpp::SubscriptionWaitSetMask mask={})
Add a subscription to this wait set.
void remove_client(std::shared_ptr< rclcpp::ClientBase > client)
Remove a client from this wait set.
WaitSetTemplate(const typename StoragePolicy::SubscriptionsIterable &subscriptions={}, const typename StoragePolicy::GuardConditionsIterable &guard_conditions={}, const typename StoragePolicy::TimersIterable &timers={}, const typename StoragePolicy::ClientsIterable &clients={}, const typename StoragePolicy::ServicesIterable &services={}, const typename StoragePolicy::WaitablesIterable &waitables={}, rclcpp::Context::SharedPtr context=rclcpp::contexts::get_global_default_context())
Construct a wait set with optional initial waitable entities and optional custom context.
void add_waitable(std::shared_ptr< rclcpp::Waitable > waitable, std::shared_ptr< void > associated_entity=nullptr)
Add a waitable to this wait set.
void remove_subscription(std::shared_ptr< rclcpp::SubscriptionBase > subscription, rclcpp::SubscriptionWaitSetMask mask={})
Remove a subscription from this wait set.
void add_client(std::shared_ptr< rclcpp::ClientBase > client)
Add a client to this wait set.
void remove_guard_condition(std::shared_ptr< rclcpp::GuardCondition > guard_condition)
Remove a guard condition from this wait set.
void remove_timer(std::shared_ptr< rclcpp::TimerBase > timer)
Remove a timer from this wait set.
RCUTILS_WARN_UNUSED WaitResult< WaitSetTemplate > wait(std::chrono::duration< Rep, Period > time_to_wait=std::chrono::duration< Rep, Period >(-1))
Wait for any of the entities in the wait set to be ready, or a period of time to pass.
void add_timer(std::shared_ptr< rclcpp::TimerBase > timer)
Add a timer to this wait set.
void remove_service(std::shared_ptr< rclcpp::ServiceBase > service)
Remove a service from this wait set.
const rcl_wait_set_t & get_rcl_wait_set() const
Return the internal rcl wait set object.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
Container for subscription's, guard condition's, etc to be waited on.
Definition: wait.h:42