ROS 2 rclcpp + rcl - kilted  kilted
ROS 2 C++ Client Library with ROS Client Library
dynamic_storage.hpp
1 // Copyright 2020 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef RCLCPP__WAIT_SET_POLICIES__DYNAMIC_STORAGE_HPP_
16 #define RCLCPP__WAIT_SET_POLICIES__DYNAMIC_STORAGE_HPP_
17 
18 #include <algorithm>
19 #include <memory>
20 #include <utility>
21 #include <vector>
22 
23 #include "rclcpp/client.hpp"
24 #include "rclcpp/guard_condition.hpp"
25 #include "rclcpp/macros.hpp"
26 #include "rclcpp/service.hpp"
27 #include "rclcpp/subscription_base.hpp"
28 #include "rclcpp/subscription_wait_set_mask.hpp"
29 #include "rclcpp/timer.hpp"
30 #include "rclcpp/visibility_control.hpp"
31 #include "rclcpp/wait_set_policies/detail/storage_policy_common.hpp"
32 #include "rclcpp/waitable.hpp"
33 
34 namespace rclcpp
35 {
36 namespace wait_set_policies
37 {
38 
41 {
42 protected:
43  using is_mutable = std::true_type;
44 
46  {
47  // (wjwwood): indent of 'public:' is weird, I know. uncrustify is dumb.
48 
49 public:
50  std::shared_ptr<rclcpp::SubscriptionBase> subscription;
52 
55  std::shared_ptr<rclcpp::SubscriptionBase> subscription_in = nullptr,
56  const rclcpp::SubscriptionWaitSetMask & mask_in = {})
57  : subscription(std::move(subscription_in)),
58  mask(mask_in)
59  {}
60 
61  void
62  reset() noexcept
63  {
64  subscription.reset();
65  }
66  };
68  {
69 public:
70  std::weak_ptr<rclcpp::SubscriptionBase> subscription;
72 
73  explicit WeakSubscriptionEntry(
74  const std::shared_ptr<rclcpp::SubscriptionBase> & subscription_in,
75  const rclcpp::SubscriptionWaitSetMask & mask_in) noexcept
76  : subscription(subscription_in),
77  mask(mask_in)
78  {}
79 
80  explicit WeakSubscriptionEntry(const SubscriptionEntry & other)
81  : subscription(other.subscription),
82  mask(other.mask)
83  {}
84 
85  std::shared_ptr<rclcpp::SubscriptionBase>
86  lock() const
87  {
88  return subscription.lock();
89  }
90 
91  bool
92  expired() const noexcept
93  {
94  return subscription.expired();
95  }
96  };
97  using SequenceOfWeakSubscriptions = std::vector<WeakSubscriptionEntry>;
98  using SubscriptionsIterable = std::vector<SubscriptionEntry>;
99 
100  using SequenceOfWeakGuardConditions = std::vector<std::weak_ptr<rclcpp::GuardCondition>>;
101  using GuardConditionsIterable = std::vector<std::shared_ptr<rclcpp::GuardCondition>>;
102 
103  using SequenceOfWeakTimers = std::vector<std::weak_ptr<rclcpp::TimerBase>>;
104  using TimersIterable = std::vector<std::shared_ptr<rclcpp::TimerBase>>;
105 
106  using SequenceOfWeakClients = std::vector<std::weak_ptr<rclcpp::ClientBase>>;
107  using ClientsIterable = std::vector<std::shared_ptr<rclcpp::ClientBase>>;
108 
109  using SequenceOfWeakServices = std::vector<std::weak_ptr<rclcpp::ServiceBase>>;
110  using ServicesIterable = std::vector<std::shared_ptr<rclcpp::ServiceBase>>;
111 
113  {
114 public:
115  std::shared_ptr<rclcpp::Waitable> waitable;
116  std::shared_ptr<void> associated_entity;
117 
120  std::shared_ptr<rclcpp::Waitable> waitable_in = nullptr,
121  std::shared_ptr<void> associated_entity_in = nullptr) noexcept
122  : waitable(std::move(waitable_in)),
123  associated_entity(std::move(associated_entity_in))
124  {}
125 
126  void
127  reset() noexcept
128  {
129  waitable.reset();
130  associated_entity.reset();
131  }
132  };
134  {
135 public:
136  std::weak_ptr<rclcpp::Waitable> waitable;
137  std::weak_ptr<void> associated_entity;
138 
139  explicit WeakWaitableEntry(
140  const std::shared_ptr<rclcpp::Waitable> & waitable_in,
141  const std::shared_ptr<void> & associated_entity_in) noexcept
142  : waitable(waitable_in),
143  associated_entity(associated_entity_in)
144  {}
145 
146  explicit WeakWaitableEntry(const WaitableEntry & other)
147  : waitable(other.waitable),
148  associated_entity(other.associated_entity)
149  {}
150 
151  std::shared_ptr<rclcpp::Waitable>
152  lock() const
153  {
154  return waitable.lock();
155  }
156 
157  bool
158  expired() const noexcept
159  {
160  return waitable.expired();
161  }
162  };
163  using SequenceOfWeakWaitables = std::vector<WeakWaitableEntry>;
164  using WaitablesIterable = std::vector<WaitableEntry>;
165 
166  template<class ArrayOfExtraGuardConditions>
167  explicit
169  const SubscriptionsIterable & subscriptions,
170  const GuardConditionsIterable & guard_conditions,
171  const ArrayOfExtraGuardConditions & extra_guard_conditions,
172  const TimersIterable & timers,
173  const ClientsIterable & clients,
174  const ServicesIterable & services,
175  const WaitablesIterable & waitables,
176  rclcpp::Context::SharedPtr context
177  )
178  : StoragePolicyCommon(
179  subscriptions,
180  guard_conditions,
181  extra_guard_conditions,
182  timers,
183  clients,
184  services,
185  waitables,
186  context),
187  subscriptions_(subscriptions.cbegin(), subscriptions.cend()),
188  shared_subscriptions_(subscriptions_.size()),
189  guard_conditions_(guard_conditions.cbegin(), guard_conditions.cend()),
190  shared_guard_conditions_(guard_conditions_.size()),
191  timers_(timers.cbegin(), timers.cend()),
192  shared_timers_(timers_.size()),
193  clients_(clients.cbegin(), clients.cend()),
194  shared_clients_(clients_.size()),
195  services_(services.cbegin(), services.cend()),
196  shared_services_(services_.size()),
197  waitables_(waitables.cbegin(), waitables.cend()),
198  shared_waitables_(waitables_.size())
199  {}
200 
201  ~DynamicStorage() = default;
202 
203  template<class ArrayOfExtraGuardConditions>
204  void
205  storage_rebuild_rcl_wait_set(const ArrayOfExtraGuardConditions & extra_guard_conditions)
206  {
207  this->storage_acquire_ownerships();
208 
210  shared_subscriptions_,
211  shared_guard_conditions_,
212  extra_guard_conditions,
213  shared_timers_,
214  shared_clients_,
215  shared_services_,
216  shared_waitables_
217  );
218 
219  if (this->needs_pruning_) {
220  this->storage_prune_deleted_entities();
221  this->needs_pruning_ = false;
222  }
223 
224  this->storage_release_ownerships();
225  }
226 
227  template<class EntityT, class SequenceOfEntitiesT>
228  static
229  bool
230  storage_has_entity(const EntityT & entity, const SequenceOfEntitiesT & entities)
231  {
232  return std::any_of(
233  entities.cbegin(),
234  entities.cend(),
235  [&entity](const auto & inner) {return &entity == inner.lock().get();});
236  }
237 
238  template<class EntityT, class SequenceOfEntitiesT>
239  static
240  auto
241  storage_find_entity(const EntityT & entity, const SequenceOfEntitiesT & entities)
242  {
243  return std::find_if(
244  entities.cbegin(),
245  entities.cend(),
246  [&entity](const auto & inner) {return &entity == inner.lock().get();});
247  }
248 
249  void
250  storage_add_subscription(std::shared_ptr<rclcpp::SubscriptionBase> && subscription)
251  {
252  if (this->storage_has_entity(*subscription, subscriptions_)) {
253  throw std::runtime_error("subscription already in wait set");
254  }
255  WeakSubscriptionEntry weak_entry{std::move(subscription), {}};
256  subscriptions_.push_back(std::move(weak_entry));
257  this->storage_flag_for_resize();
258  }
259 
260  void
261  storage_remove_subscription(std::shared_ptr<rclcpp::SubscriptionBase> && subscription)
262  {
263  auto it = this->storage_find_entity(*subscription, subscriptions_);
264  if (subscriptions_.cend() == it) {
265  throw std::runtime_error("subscription not in wait set");
266  }
267  subscriptions_.erase(it);
268  this->storage_flag_for_resize();
269  }
270 
271  void
272  storage_add_guard_condition(std::shared_ptr<rclcpp::GuardCondition> && guard_condition)
273  {
274  if (this->storage_has_entity(*guard_condition, guard_conditions_)) {
275  throw std::runtime_error("guard_condition already in wait set");
276  }
277  guard_conditions_.push_back(std::move(guard_condition));
278  this->storage_flag_for_resize();
279  }
280 
281  void
282  storage_remove_guard_condition(std::shared_ptr<rclcpp::GuardCondition> && guard_condition)
283  {
284  auto it = this->storage_find_entity(*guard_condition, guard_conditions_);
285  if (guard_conditions_.cend() == it) {
286  throw std::runtime_error("guard_condition not in wait set");
287  }
288  guard_conditions_.erase(it);
289  this->storage_flag_for_resize();
290  }
291 
292  void
293  storage_add_timer(std::shared_ptr<rclcpp::TimerBase> && timer)
294  {
295  if (this->storage_has_entity(*timer, timers_)) {
296  throw std::runtime_error("timer already in wait set");
297  }
298  timers_.push_back(std::move(timer));
299  this->storage_flag_for_resize();
300  }
301 
302  void
303  storage_remove_timer(std::shared_ptr<rclcpp::TimerBase> && timer)
304  {
305  auto it = this->storage_find_entity(*timer, timers_);
306  if (timers_.cend() == it) {
307  throw std::runtime_error("timer not in wait set");
308  }
309  timers_.erase(it);
310  this->storage_flag_for_resize();
311  }
312 
313  void
314  storage_add_client(std::shared_ptr<rclcpp::ClientBase> && client)
315  {
316  if (this->storage_has_entity(*client, clients_)) {
317  throw std::runtime_error("client already in wait set");
318  }
319  clients_.push_back(std::move(client));
320  this->storage_flag_for_resize();
321  }
322 
323  void
324  storage_remove_client(std::shared_ptr<rclcpp::ClientBase> && client)
325  {
326  auto it = this->storage_find_entity(*client, clients_);
327  if (clients_.cend() == it) {
328  throw std::runtime_error("client not in wait set");
329  }
330  clients_.erase(it);
331  this->storage_flag_for_resize();
332  }
333 
334  void
335  storage_add_service(std::shared_ptr<rclcpp::ServiceBase> && service)
336  {
337  if (this->storage_has_entity(*service, services_)) {
338  throw std::runtime_error("service already in wait set");
339  }
340  services_.push_back(std::move(service));
341  this->storage_flag_for_resize();
342  }
343 
344  void
345  storage_remove_service(std::shared_ptr<rclcpp::ServiceBase> && service)
346  {
347  auto it = this->storage_find_entity(*service, services_);
348  if (services_.cend() == it) {
349  throw std::runtime_error("service not in wait set");
350  }
351  services_.erase(it);
352  this->storage_flag_for_resize();
353  }
354 
355  void
356  storage_add_waitable(
357  std::shared_ptr<rclcpp::Waitable> && waitable,
358  std::shared_ptr<void> && associated_entity)
359  {
360  if (this->storage_has_entity(*waitable, waitables_)) {
361  throw std::runtime_error("waitable already in wait set");
362  }
363  WeakWaitableEntry weak_entry(std::move(waitable), std::move(associated_entity));
364  waitables_.push_back(std::move(weak_entry));
365  this->storage_flag_for_resize();
366  }
367 
368  void
369  storage_remove_waitable(std::shared_ptr<rclcpp::Waitable> && waitable)
370  {
371  auto it = this->storage_find_entity(*waitable, waitables_);
372  if (waitables_.cend() == it) {
373  throw std::runtime_error("waitable not in wait set");
374  }
375  waitables_.erase(it);
376  this->storage_flag_for_resize();
377  }
378 
379  // this is noexcept because:
380  // - std::weak_ptr::expired is noexcept
381  // - the erase-remove idiom is noexcept, since we're not using the ExecutionPolicy version
382  // - std::vector::erase is noexcept if the assignment operator of T is also
383  // - and, the operator= for std::weak_ptr is noexcept
384  void
385  storage_prune_deleted_entities() noexcept
386  {
387  // reusable (templated) lambda for removal predicate
388  auto p =
389  [](const auto & weak_ptr) {
390  // remove entries which have expired
391  return weak_ptr.expired();
392  };
393  // remove guard conditions which have been deleted
394  subscriptions_.erase(
395  std::remove_if(subscriptions_.begin(), subscriptions_.end(), p), subscriptions_.end());
396  guard_conditions_.erase(
397  std::remove_if(guard_conditions_.begin(), guard_conditions_.end(), p),
398  guard_conditions_.end());
399  timers_.erase(std::remove_if(timers_.begin(), timers_.end(), p), timers_.end());
400  clients_.erase(std::remove_if(clients_.begin(), clients_.end(), p), clients_.end());
401  services_.erase(std::remove_if(services_.begin(), services_.end(), p), services_.end());
402  waitables_.erase(std::remove_if(waitables_.begin(), waitables_.end(), p), waitables_.end());
403  }
404 
405  void
406  storage_acquire_ownerships()
407  {
408  if (++ownership_reference_counter_ > 1) {
409  // Avoid redundant locking.
410  return;
411  }
412  // Setup common locking function.
413  auto lock_all = [](const auto & weak_ptrs, auto & shared_ptrs) {
414  shared_ptrs.resize(weak_ptrs.size());
415  size_t index = 0;
416  for (const auto & weak_ptr : weak_ptrs) {
417  shared_ptrs[index++] = weak_ptr.lock();
418  }
419  };
420  // Lock all the weak pointers and hold them until released.
421  lock_all(subscriptions_, shared_subscriptions_);
422  lock_all(guard_conditions_, shared_guard_conditions_);
423  lock_all(timers_, shared_timers_);
424  lock_all(clients_, shared_clients_);
425  lock_all(services_, shared_services_);
426 
427  // We need a specialized version of this for waitables.
428  auto lock_all_waitables = [](const auto & weak_ptrs, auto & shared_ptrs) {
429  shared_ptrs.resize(weak_ptrs.size());
430  size_t index = 0;
431  for (const auto & weak_ptr : weak_ptrs) {
432  shared_ptrs[index++] = WaitableEntry{
433  weak_ptr.waitable.lock(),
434  weak_ptr.associated_entity.lock()};
435  }
436  };
437  lock_all_waitables(waitables_, shared_waitables_);
438  }
439 
440  void
441  storage_release_ownerships()
442  {
443  if (--ownership_reference_counter_ > 0) {
444  // Avoid releasing ownership until reference count is 0.
445  return;
446  }
447  // "Unlock" all shared pointers by resetting them.
448  auto reset_all = [](auto & shared_ptrs) {
449  for (auto & shared_ptr : shared_ptrs) {
450  shared_ptr.reset();
451  }
452  };
453  reset_all(shared_subscriptions_);
454  reset_all(shared_guard_conditions_);
455  reset_all(shared_timers_);
456  reset_all(shared_clients_);
457  reset_all(shared_services_);
458  reset_all(shared_waitables_);
459  }
460 
461  size_t size_of_subscriptions() const
462  {
463  return subscriptions_.size();
464  }
465 
466  size_t size_of_timers() const
467  {
468  return timers_.size();
469  }
470 
471  size_t size_of_clients() const
472  {
473  return clients_.size();
474  }
475 
476  size_t size_of_services() const
477  {
478  return services_.size();
479  }
480 
481  size_t size_of_waitables() const
482  {
483  return waitables_.size();
484  }
485 
486  std::shared_ptr<rclcpp::SubscriptionBase>
487  subscriptions(size_t ii) const
488  {
489  return subscriptions_[ii].lock();
490  }
491 
492  std::shared_ptr<rclcpp::TimerBase>
493  timers(size_t ii) const
494  {
495  return timers_[ii].lock();
496  }
497 
498  std::shared_ptr<rclcpp::ClientBase>
499  clients(size_t ii) const
500  {
501  return clients_[ii].lock();
502  }
503 
504  std::shared_ptr<rclcpp::ServiceBase>
505  services(size_t ii) const
506  {
507  return services_[ii].lock();
508  }
509 
510  std::shared_ptr<rclcpp::Waitable>
511  waitables(size_t ii) const
512  {
513  return waitables_[ii].lock();
514  }
515 
516 private:
517  size_t ownership_reference_counter_ = 0;
518 
519  SequenceOfWeakSubscriptions subscriptions_;
520  SubscriptionsIterable shared_subscriptions_;
521 
522  SequenceOfWeakGuardConditions guard_conditions_;
523  GuardConditionsIterable shared_guard_conditions_;
524 
525  SequenceOfWeakTimers timers_;
526  TimersIterable shared_timers_;
527 
528  SequenceOfWeakClients clients_;
529  ClientsIterable shared_clients_;
530 
531  SequenceOfWeakServices services_;
532  ServicesIterable shared_services_;
533 
534  SequenceOfWeakWaitables waitables_;
535  WaitablesIterable shared_waitables_;
536 };
537 
538 } // namespace wait_set_policies
539 } // namespace rclcpp
540 
541 #endif // RCLCPP__WAIT_SET_POLICIES__DYNAMIC_STORAGE_HPP_
Options used to determine what parts of a subscription get added to or removed from a wait set.
SubscriptionEntry(std::shared_ptr< rclcpp::SubscriptionBase > subscription_in=nullptr, const rclcpp::SubscriptionWaitSetMask &mask_in={})
Conversion constructor, which is intentionally not marked explicit.
WaitableEntry(std::shared_ptr< rclcpp::Waitable > waitable_in=nullptr, std::shared_ptr< void > associated_entity_in=nullptr) noexcept
Conversion constructor, which is intentionally not marked explicit.
WaitSet policy that provides dynamically sized storage.
Common structure for storage policies, which provides rcl wait set access.
void storage_rebuild_rcl_wait_set_with_sets(const SubscriptionsIterable &subscriptions, const GuardConditionsIterable &guard_conditions, const ExtraGuardConditionsIterable &extra_guard_conditions, const TimersIterable &timers, const ClientsIterable &clients, const ServicesIterable &services, const WaitablesIterable &waitables)
Rebuild the wait set, preparing it for the next wait call.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.