ROS 2 rclcpp + rcl - kilted  kilted
ROS 2 C++ Client Library with ROS Client Library
allocator_memory_strategy.hpp
1 // Copyright 2015 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef RCLCPP__STRATEGIES__ALLOCATOR_MEMORY_STRATEGY_HPP_
16 #define RCLCPP__STRATEGIES__ALLOCATOR_MEMORY_STRATEGY_HPP_
17 
18 #include <memory>
19 #include <vector>
20 
21 #include "rcl/allocator.h"
22 
23 #include "rclcpp/allocator/allocator_common.hpp"
24 #include "rclcpp/detail/add_guard_condition_to_rcl_wait_set.hpp"
25 #include "rclcpp/memory_strategy.hpp"
26 #include "rclcpp/node.hpp"
27 #include "rclcpp/visibility_control.hpp"
28 
29 #include "rcutils/logging_macros.h"
30 
31 #include "rmw/types.h"
32 
33 namespace rclcpp
34 {
35 namespace memory_strategies
36 {
37 namespace allocator_memory_strategy
38 {
39 
41 
46 template<typename Alloc = std::allocator<void>>
48 {
49 public:
50  RCLCPP_SMART_PTR_DEFINITIONS(AllocatorMemoryStrategy<Alloc>)
51 
52  using VoidAllocTraits = typename allocator::AllocRebind<void *, Alloc>;
53  using VoidAlloc = typename VoidAllocTraits::allocator_type;
54 
55  explicit AllocatorMemoryStrategy(std::shared_ptr<Alloc> allocator)
56  {
57  allocator_ = std::make_shared<VoidAlloc>(*allocator.get());
58  }
59 
61  {
62  allocator_ = std::make_shared<VoidAlloc>();
63  }
64 
65  void add_guard_condition(const rclcpp::GuardCondition & guard_condition) override
66  {
67  for (const auto & existing_guard_condition : guard_conditions_) {
68  if (existing_guard_condition == &guard_condition) {
69  return;
70  }
71  }
72  guard_conditions_.push_back(&guard_condition);
73  }
74 
75  void remove_guard_condition(const rclcpp::GuardCondition * guard_condition) override
76  {
77  for (auto it = guard_conditions_.begin(); it != guard_conditions_.end(); ++it) {
78  if (*it == guard_condition) {
79  guard_conditions_.erase(it);
80  break;
81  }
82  }
83  }
84 
85  void clear_handles() override
86  {
87  subscription_handles_.clear();
88  service_handles_.clear();
89  client_handles_.clear();
90  timer_handles_.clear();
91  waitable_handles_.clear();
92  }
93 
94  void remove_null_handles(rcl_wait_set_t * wait_set) override
95  {
96  // TODO(jacobperron): Check if wait set sizes are what we expect them to be?
97  // e.g. wait_set->size_of_clients == client_handles_.size()
98 
99  // Important to use subscription_handles_.size() instead of wait set's size since
100  // there may be more subscriptions in the wait set due to Waitables added to the end.
101  // The same logic applies for other entities.
102  for (size_t i = 0; i < subscription_handles_.size(); ++i) {
103  if (!wait_set->subscriptions[i]) {
104  subscription_handles_[i].reset();
105  }
106  }
107  for (size_t i = 0; i < service_handles_.size(); ++i) {
108  if (!wait_set->services[i]) {
109  service_handles_[i].reset();
110  }
111  }
112  for (size_t i = 0; i < client_handles_.size(); ++i) {
113  if (!wait_set->clients[i]) {
114  client_handles_[i].reset();
115  }
116  }
117  for (size_t i = 0; i < timer_handles_.size(); ++i) {
118  if (!wait_set->timers[i]) {
119  timer_handles_[i].reset();
120  }
121  }
122  for (size_t i = 0; i < waitable_handles_.size(); ++i) {
123  if (!waitable_handles_[i]->is_ready(*wait_set)) {
124  waitable_handles_[i].reset();
125  }
126  }
127 
128  subscription_handles_.erase(
129  std::remove(subscription_handles_.begin(), subscription_handles_.end(), nullptr),
130  subscription_handles_.end()
131  );
132 
133  service_handles_.erase(
134  std::remove(service_handles_.begin(), service_handles_.end(), nullptr),
135  service_handles_.end()
136  );
137 
138  client_handles_.erase(
139  std::remove(client_handles_.begin(), client_handles_.end(), nullptr),
140  client_handles_.end()
141  );
142 
143  timer_handles_.erase(
144  std::remove(timer_handles_.begin(), timer_handles_.end(), nullptr),
145  timer_handles_.end()
146  );
147 
148  waitable_handles_.erase(
149  std::remove(waitable_handles_.begin(), waitable_handles_.end(), nullptr),
150  waitable_handles_.end()
151  );
152  }
153 
154  bool collect_entities(const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) override
155  {
156  bool has_invalid_weak_groups_or_nodes = false;
157  for (const auto & pair : weak_groups_to_nodes) {
158  auto group = pair.first.lock();
159  auto node = pair.second.lock();
160  if (group == nullptr || node == nullptr) {
161  has_invalid_weak_groups_or_nodes = true;
162  continue;
163  }
164  if (!group || !group->can_be_taken_from().load()) {
165  continue;
166  }
167 
168  group->collect_all_ptrs(
169  [this](const rclcpp::SubscriptionBase::SharedPtr & subscription) {
170  subscription_handles_.push_back(subscription->get_subscription_handle());
171  },
172  [this](const rclcpp::ServiceBase::SharedPtr & service) {
173  service_handles_.push_back(service->get_service_handle());
174  },
175  [this](const rclcpp::ClientBase::SharedPtr & client) {
176  client_handles_.push_back(client->get_client_handle());
177  },
178  [this](const rclcpp::TimerBase::SharedPtr & timer) {
179  timer_handles_.push_back(timer->get_timer_handle());
180  },
181  [this](const rclcpp::Waitable::SharedPtr & waitable) {
182  waitable_handles_.push_back(waitable);
183  });
184  }
185 
186  return has_invalid_weak_groups_or_nodes;
187  }
188 
189  void add_waitable_handle(const rclcpp::Waitable::SharedPtr & waitable) override
190  {
191  if (nullptr == waitable) {
192  throw std::runtime_error("waitable object unexpectedly nullptr");
193  }
194  waitable_handles_.push_back(waitable);
195  }
196 
197  bool add_handles_to_wait_set(rcl_wait_set_t * wait_set) override
198  {
199  for (const std::shared_ptr<const rcl_subscription_t> & subscription : subscription_handles_) {
200  if (rcl_wait_set_add_subscription(wait_set, subscription.get(), NULL) != RCL_RET_OK) {
201  RCUTILS_LOG_ERROR_NAMED(
202  "rclcpp",
203  "Couldn't add subscription to wait set: %s", rcl_get_error_string().str);
204  rcl_reset_error();
205  return false;
206  }
207  }
208 
209  for (const std::shared_ptr<const rcl_client_t> & client : client_handles_) {
210  if (rcl_wait_set_add_client(wait_set, client.get(), NULL) != RCL_RET_OK) {
211  RCUTILS_LOG_ERROR_NAMED(
212  "rclcpp",
213  "Couldn't add client to wait set: %s", rcl_get_error_string().str);
214  rcl_reset_error();
215  return false;
216  }
217  }
218 
219  for (const std::shared_ptr<const rcl_service_t> & service : service_handles_) {
220  if (rcl_wait_set_add_service(wait_set, service.get(), NULL) != RCL_RET_OK) {
221  RCUTILS_LOG_ERROR_NAMED(
222  "rclcpp",
223  "Couldn't add service to wait set: %s", rcl_get_error_string().str);
224  rcl_reset_error();
225  return false;
226  }
227  }
228 
229  for (const std::shared_ptr<const rcl_timer_t> & timer : timer_handles_) {
230  if (rcl_wait_set_add_timer(wait_set, timer.get(), NULL) != RCL_RET_OK) {
231  RCUTILS_LOG_ERROR_NAMED(
232  "rclcpp",
233  "Couldn't add timer to wait set: %s", rcl_get_error_string().str);
234  rcl_reset_error();
235  return false;
236  }
237  }
238 
239  for (auto guard_condition : guard_conditions_) {
240  detail::add_guard_condition_to_rcl_wait_set(*wait_set, *guard_condition);
241  }
242 
243  for (const std::shared_ptr<Waitable> & waitable : waitable_handles_) {
244  waitable->add_to_wait_set(*wait_set);
245  }
246  return true;
247  }
248 
249  void
250  get_next_subscription(
251  rclcpp::AnyExecutable & any_exec,
252  const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) override
253  {
254  auto it = subscription_handles_.begin();
255  while (it != subscription_handles_.end()) {
256  auto subscription = get_subscription_by_handle(*it, weak_groups_to_nodes);
257  if (subscription) {
258  // Find the group for this handle and see if it can be serviced
259  auto group = get_group_by_subscription(subscription, weak_groups_to_nodes);
260  if (!group) {
261  // Group was not found, meaning the subscription is not valid...
262  // Remove it from the ready list and continue looking
263  it = subscription_handles_.erase(it);
264  continue;
265  }
266  if (!group->can_be_taken_from().load()) {
267  // Group is mutually exclusive and is being used, so skip it for now
268  // Leave it to be checked next time, but continue searching
269  ++it;
270  continue;
271  }
272  // Otherwise it is safe to set and return the any_exec
273  any_exec.subscription = subscription;
274  any_exec.callback_group = group;
275  any_exec.node_base = get_node_by_group(group, weak_groups_to_nodes);
276  subscription_handles_.erase(it);
277  return;
278  }
279  // Else, the subscription is no longer valid, remove it and continue
280  it = subscription_handles_.erase(it);
281  }
282  }
283 
284  void
285  get_next_service(
286  rclcpp::AnyExecutable & any_exec,
287  const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) override
288  {
289  auto it = service_handles_.begin();
290  while (it != service_handles_.end()) {
291  auto service = get_service_by_handle(*it, weak_groups_to_nodes);
292  if (service) {
293  // Find the group for this handle and see if it can be serviced
294  auto group = get_group_by_service(service, weak_groups_to_nodes);
295  if (!group) {
296  // Group was not found, meaning the service is not valid...
297  // Remove it from the ready list and continue looking
298  it = service_handles_.erase(it);
299  continue;
300  }
301  if (!group->can_be_taken_from().load()) {
302  // Group is mutually exclusive and is being used, so skip it for now
303  // Leave it to be checked next time, but continue searching
304  ++it;
305  continue;
306  }
307  // Otherwise it is safe to set and return the any_exec
308  any_exec.service = service;
309  any_exec.callback_group = group;
310  any_exec.node_base = get_node_by_group(group, weak_groups_to_nodes);
311  service_handles_.erase(it);
312  return;
313  }
314  // Else, the service is no longer valid, remove it and continue
315  it = service_handles_.erase(it);
316  }
317  }
318 
319  void
320  get_next_client(
321  rclcpp::AnyExecutable & any_exec,
322  const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) override
323  {
324  auto it = client_handles_.begin();
325  while (it != client_handles_.end()) {
326  auto client = get_client_by_handle(*it, weak_groups_to_nodes);
327  if (client) {
328  // Find the group for this handle and see if it can be serviced
329  auto group = get_group_by_client(client, weak_groups_to_nodes);
330  if (!group) {
331  // Group was not found, meaning the service is not valid...
332  // Remove it from the ready list and continue looking
333  it = client_handles_.erase(it);
334  continue;
335  }
336  if (!group->can_be_taken_from().load()) {
337  // Group is mutually exclusive and is being used, so skip it for now
338  // Leave it to be checked next time, but continue searching
339  ++it;
340  continue;
341  }
342  // Otherwise it is safe to set and return the any_exec
343  any_exec.client = client;
344  any_exec.callback_group = group;
345  any_exec.node_base = get_node_by_group(group, weak_groups_to_nodes);
346  client_handles_.erase(it);
347  return;
348  }
349  // Else, the service is no longer valid, remove it and continue
350  it = client_handles_.erase(it);
351  }
352  }
353 
354  void
355  get_next_timer(
356  rclcpp::AnyExecutable & any_exec,
357  const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) override
358  {
359  auto it = timer_handles_.begin();
360  while (it != timer_handles_.end()) {
361  auto timer = get_timer_by_handle(*it, weak_groups_to_nodes);
362  if (timer) {
363  // Find the group for this handle and see if it can be serviced
364  auto group = get_group_by_timer(timer, weak_groups_to_nodes);
365  if (!group) {
366  // Group was not found, meaning the timer is not valid...
367  // Remove it from the ready list and continue looking
368  it = timer_handles_.erase(it);
369  continue;
370  }
371  if (!group->can_be_taken_from().load()) {
372  // Group is mutually exclusive and is being used, so skip it for now
373  // Leave it to be checked next time, but continue searching
374  ++it;
375  continue;
376  }
377  auto data = timer->call();
378  if (!data) {
379  // timer was cancelled, skip it.
380  ++it;
381  continue;
382  }
383  // Otherwise it is safe to set and return the any_exec
384  any_exec.timer = timer;
385  any_exec.callback_group = group;
386  any_exec.node_base = get_node_by_group(group, weak_groups_to_nodes);
387  any_exec.data = data;
388  timer_handles_.erase(it);
389  return;
390  }
391  // Else, the timer is no longer valid, remove it and continue
392  it = timer_handles_.erase(it);
393  }
394  }
395 
396  void
397  get_next_waitable(
398  rclcpp::AnyExecutable & any_exec,
399  const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) override
400  {
401  auto it = waitable_handles_.begin();
402  while (it != waitable_handles_.end()) {
403  std::shared_ptr<Waitable> & waitable = *it;
404  if (waitable) {
405  // Find the group for this handle and see if it can be serviced
406  auto group = get_group_by_waitable(waitable, weak_groups_to_nodes);
407  if (!group) {
408  // Group was not found, meaning the waitable is not valid...
409  // Remove it from the ready list and continue looking
410  it = waitable_handles_.erase(it);
411  continue;
412  }
413  if (!group->can_be_taken_from().load()) {
414  // Group is mutually exclusive and is being used, so skip it for now
415  // Leave it to be checked next time, but continue searching
416  ++it;
417  continue;
418  }
419  // Otherwise it is safe to set and return the any_exec
420  any_exec.waitable = waitable;
421  any_exec.callback_group = group;
422  any_exec.node_base = get_node_by_group(group, weak_groups_to_nodes);
423  waitable_handles_.erase(it);
424  return;
425  }
426  // Else, the waitable is no longer valid, remove it and continue
427  it = waitable_handles_.erase(it);
428  }
429  }
430 
431  rcl_allocator_t get_allocator() override
432  {
433  return rclcpp::allocator::get_rcl_allocator<void *, VoidAlloc>(*allocator_.get());
434  }
435 
436  size_t number_of_ready_subscriptions() const override
437  {
438  size_t number_of_subscriptions = subscription_handles_.size();
439  for (const std::shared_ptr<Waitable> & waitable : waitable_handles_) {
440  number_of_subscriptions += waitable->get_number_of_ready_subscriptions();
441  }
442  return number_of_subscriptions;
443  }
444 
445  size_t number_of_ready_services() const override
446  {
447  size_t number_of_services = service_handles_.size();
448  for (const std::shared_ptr<Waitable> & waitable : waitable_handles_) {
449  number_of_services += waitable->get_number_of_ready_services();
450  }
451  return number_of_services;
452  }
453 
454  size_t number_of_ready_events() const override
455  {
456  size_t number_of_events = 0;
457  for (const std::shared_ptr<Waitable> & waitable : waitable_handles_) {
458  number_of_events += waitable->get_number_of_ready_events();
459  }
460  return number_of_events;
461  }
462 
463  size_t number_of_ready_clients() const override
464  {
465  size_t number_of_clients = client_handles_.size();
466  for (const std::shared_ptr<Waitable> & waitable : waitable_handles_) {
467  number_of_clients += waitable->get_number_of_ready_clients();
468  }
469  return number_of_clients;
470  }
471 
472  size_t number_of_guard_conditions() const override
473  {
474  size_t number_of_guard_conditions = guard_conditions_.size();
475  for (const std::shared_ptr<Waitable> & waitable : waitable_handles_) {
476  number_of_guard_conditions += waitable->get_number_of_ready_guard_conditions();
477  }
478  return number_of_guard_conditions;
479  }
480 
481  size_t number_of_ready_timers() const override
482  {
483  size_t number_of_timers = timer_handles_.size();
484  for (const std::shared_ptr<Waitable> & waitable : waitable_handles_) {
485  number_of_timers += waitable->get_number_of_ready_timers();
486  }
487  return number_of_timers;
488  }
489 
490  size_t number_of_waitables() const override
491  {
492  return waitable_handles_.size();
493  }
494 
495 private:
496  template<typename T>
497  using VectorRebind =
498  std::vector<T, typename std::allocator_traits<Alloc>::template rebind_alloc<T>>;
499 
500  VectorRebind<const rclcpp::GuardCondition *> guard_conditions_;
501 
502  VectorRebind<std::shared_ptr<const rcl_subscription_t>> subscription_handles_;
503  VectorRebind<std::shared_ptr<const rcl_service_t>> service_handles_;
504  VectorRebind<std::shared_ptr<const rcl_client_t>> client_handles_;
505  VectorRebind<std::shared_ptr<const rcl_timer_t>> timer_handles_;
506  VectorRebind<std::shared_ptr<Waitable>> waitable_handles_;
507 
508  std::shared_ptr<VoidAlloc> allocator_;
509 };
510 
511 } // namespace allocator_memory_strategy
512 } // namespace memory_strategies
513 } // namespace rclcpp
514 
515 #endif // RCLCPP__STRATEGIES__ALLOCATOR_MEMORY_STRATEGY_HPP_
rcutils_allocator_t rcl_allocator_t
Encapsulation of an allocator.
Definition: allocator.h:31
A condition that can be waited on in a single wait set and asynchronously triggered.
Delegate for handling memory allocations while the Executor is executing.
Delegate for handling memory allocations while the Executor is executing.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
Container for subscription's, guard condition's, etc to be waited on.
Definition: wait.h:42
const rcl_timer_t ** timers
Storage for timer pointers.
Definition: wait.h:52
const rcl_service_t ** services
Storage for service pointers.
Definition: wait.h:60
const rcl_client_t ** clients
Storage for client pointers.
Definition: wait.h:56
const rcl_subscription_t ** subscriptions
Storage for subscription pointers.
Definition: wait.h:44
#define RCL_RET_OK
Success return code.
Definition: types.h:27
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_wait_set_add_subscription(rcl_wait_set_t *wait_set, const rcl_subscription_t *subscription, size_t *index)
Store a pointer to the given subscription in the next empty spot in the set.
Definition: wait.c:305
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_wait_set_add_service(rcl_wait_set_t *wait_set, const rcl_service_t *service, size_t *index)
Store a pointer to the service in the next empty spot in the set.
Definition: wait.c:486
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_wait_set_add_timer(rcl_wait_set_t *wait_set, const rcl_timer_t *timer, size_t *index)
Store a pointer to the timer in the next empty spot in the set.
Definition: wait.c:455
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_wait_set_add_client(rcl_wait_set_t *wait_set, const rcl_client_t *client, size_t *index)
Store a pointer to the client in the next empty spot in the set.
Definition: wait.c:475