15 #ifndef RCLCPP__STRATEGIES__ALLOCATOR_MEMORY_STRATEGY_HPP_
16 #define RCLCPP__STRATEGIES__ALLOCATOR_MEMORY_STRATEGY_HPP_
23 #include "rclcpp/allocator/allocator_common.hpp"
24 #include "rclcpp/detail/add_guard_condition_to_rcl_wait_set.hpp"
25 #include "rclcpp/memory_strategy.hpp"
26 #include "rclcpp/node.hpp"
27 #include "rclcpp/visibility_control.hpp"
29 #include "rcutils/logging_macros.h"
31 #include "rmw/types.h"
35 namespace memory_strategies
37 namespace allocator_memory_strategy
46 template<
typename Alloc = std::allocator<
void>>
52 using VoidAllocTraits =
typename allocator::AllocRebind<void *, Alloc>;
53 using VoidAlloc =
typename VoidAllocTraits::allocator_type;
57 allocator_ = std::make_shared<VoidAlloc>(*allocator.get());
62 allocator_ = std::make_shared<VoidAlloc>();
67 for (
const auto & existing_guard_condition : guard_conditions_) {
68 if (existing_guard_condition == &guard_condition) {
72 guard_conditions_.push_back(&guard_condition);
77 for (
auto it = guard_conditions_.begin(); it != guard_conditions_.end(); ++it) {
78 if (*it == guard_condition) {
79 guard_conditions_.erase(it);
85 void clear_handles()
override
87 subscription_handles_.clear();
88 service_handles_.clear();
89 client_handles_.clear();
90 timer_handles_.clear();
91 waitable_handles_.clear();
102 for (
size_t i = 0; i < subscription_handles_.size(); ++i) {
104 subscription_handles_[i].reset();
107 for (
size_t i = 0; i < service_handles_.size(); ++i) {
109 service_handles_[i].reset();
112 for (
size_t i = 0; i < client_handles_.size(); ++i) {
114 client_handles_[i].reset();
117 for (
size_t i = 0; i < timer_handles_.size(); ++i) {
118 if (!wait_set->
timers[i]) {
119 timer_handles_[i].reset();
122 for (
size_t i = 0; i < waitable_handles_.size(); ++i) {
123 if (!waitable_handles_[i]->is_ready(*wait_set)) {
124 waitable_handles_[i].reset();
128 subscription_handles_.erase(
129 std::remove(subscription_handles_.begin(), subscription_handles_.end(),
nullptr),
130 subscription_handles_.end()
133 service_handles_.erase(
134 std::remove(service_handles_.begin(), service_handles_.end(),
nullptr),
135 service_handles_.end()
138 client_handles_.erase(
139 std::remove(client_handles_.begin(), client_handles_.end(),
nullptr),
140 client_handles_.end()
143 timer_handles_.erase(
144 std::remove(timer_handles_.begin(), timer_handles_.end(),
nullptr),
148 waitable_handles_.erase(
149 std::remove(waitable_handles_.begin(), waitable_handles_.end(),
nullptr),
150 waitable_handles_.end()
154 bool collect_entities(
const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes)
override
156 bool has_invalid_weak_groups_or_nodes =
false;
157 for (
const auto & pair : weak_groups_to_nodes) {
158 auto group = pair.first.lock();
159 auto node = pair.second.lock();
160 if (group ==
nullptr || node ==
nullptr) {
161 has_invalid_weak_groups_or_nodes =
true;
164 if (!group || !group->can_be_taken_from().load()) {
168 group->collect_all_ptrs(
169 [
this](
const rclcpp::SubscriptionBase::SharedPtr & subscription) {
170 subscription_handles_.push_back(subscription->get_subscription_handle());
172 [
this](
const rclcpp::ServiceBase::SharedPtr & service) {
173 service_handles_.push_back(service->get_service_handle());
175 [
this](
const rclcpp::ClientBase::SharedPtr & client) {
176 client_handles_.push_back(client->get_client_handle());
178 [
this](
const rclcpp::TimerBase::SharedPtr & timer) {
179 timer_handles_.push_back(timer->get_timer_handle());
181 [
this](
const rclcpp::Waitable::SharedPtr & waitable) {
182 waitable_handles_.push_back(waitable);
186 return has_invalid_weak_groups_or_nodes;
189 void add_waitable_handle(
const rclcpp::Waitable::SharedPtr & waitable)
override
191 if (
nullptr == waitable) {
192 throw std::runtime_error(
"waitable object unexpectedly nullptr");
194 waitable_handles_.push_back(waitable);
199 for (
const std::shared_ptr<const rcl_subscription_t> & subscription : subscription_handles_) {
201 RCUTILS_LOG_ERROR_NAMED(
203 "Couldn't add subscription to wait set: %s", rcl_get_error_string().str);
209 for (
const std::shared_ptr<const rcl_client_t> & client : client_handles_) {
211 RCUTILS_LOG_ERROR_NAMED(
213 "Couldn't add client to wait set: %s", rcl_get_error_string().str);
219 for (
const std::shared_ptr<const rcl_service_t> & service : service_handles_) {
221 RCUTILS_LOG_ERROR_NAMED(
223 "Couldn't add service to wait set: %s", rcl_get_error_string().str);
229 for (
const std::shared_ptr<const rcl_timer_t> & timer : timer_handles_) {
231 RCUTILS_LOG_ERROR_NAMED(
233 "Couldn't add timer to wait set: %s", rcl_get_error_string().str);
239 for (
auto guard_condition : guard_conditions_) {
240 detail::add_guard_condition_to_rcl_wait_set(*wait_set, *guard_condition);
243 for (
const std::shared_ptr<Waitable> & waitable : waitable_handles_) {
244 waitable->add_to_wait_set(*wait_set);
250 get_next_subscription(
252 const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes)
override
254 auto it = subscription_handles_.begin();
255 while (it != subscription_handles_.end()) {
256 auto subscription = get_subscription_by_handle(*it, weak_groups_to_nodes);
259 auto group = get_group_by_subscription(subscription, weak_groups_to_nodes);
263 it = subscription_handles_.erase(it);
266 if (!group->can_be_taken_from().load()) {
273 any_exec.subscription = subscription;
274 any_exec.callback_group = group;
275 any_exec.node_base = get_node_by_group(group, weak_groups_to_nodes);
276 subscription_handles_.erase(it);
280 it = subscription_handles_.erase(it);
287 const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes)
override
289 auto it = service_handles_.begin();
290 while (it != service_handles_.end()) {
291 auto service = get_service_by_handle(*it, weak_groups_to_nodes);
294 auto group = get_group_by_service(service, weak_groups_to_nodes);
298 it = service_handles_.erase(it);
301 if (!group->can_be_taken_from().load()) {
308 any_exec.service = service;
309 any_exec.callback_group = group;
310 any_exec.node_base = get_node_by_group(group, weak_groups_to_nodes);
311 service_handles_.erase(it);
315 it = service_handles_.erase(it);
322 const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes)
override
324 auto it = client_handles_.begin();
325 while (it != client_handles_.end()) {
326 auto client = get_client_by_handle(*it, weak_groups_to_nodes);
329 auto group = get_group_by_client(client, weak_groups_to_nodes);
333 it = client_handles_.erase(it);
336 if (!group->can_be_taken_from().load()) {
343 any_exec.client = client;
344 any_exec.callback_group = group;
345 any_exec.node_base = get_node_by_group(group, weak_groups_to_nodes);
346 client_handles_.erase(it);
350 it = client_handles_.erase(it);
357 const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes)
override
359 auto it = timer_handles_.begin();
360 while (it != timer_handles_.end()) {
361 auto timer = get_timer_by_handle(*it, weak_groups_to_nodes);
364 auto group = get_group_by_timer(timer, weak_groups_to_nodes);
368 it = timer_handles_.erase(it);
371 if (!group->can_be_taken_from().load()) {
377 auto data = timer->call();
384 any_exec.timer = timer;
385 any_exec.callback_group = group;
386 any_exec.node_base = get_node_by_group(group, weak_groups_to_nodes);
387 any_exec.data = data;
388 timer_handles_.erase(it);
392 it = timer_handles_.erase(it);
399 const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes)
override
401 auto it = waitable_handles_.begin();
402 while (it != waitable_handles_.end()) {
403 std::shared_ptr<Waitable> & waitable = *it;
406 auto group = get_group_by_waitable(waitable, weak_groups_to_nodes);
410 it = waitable_handles_.erase(it);
413 if (!group->can_be_taken_from().load()) {
420 any_exec.waitable = waitable;
421 any_exec.callback_group = group;
422 any_exec.node_base = get_node_by_group(group, weak_groups_to_nodes);
423 waitable_handles_.erase(it);
427 it = waitable_handles_.erase(it);
433 return rclcpp::allocator::get_rcl_allocator<void *, VoidAlloc>(*allocator_.get());
436 size_t number_of_ready_subscriptions()
const override
438 size_t number_of_subscriptions = subscription_handles_.size();
439 for (
const std::shared_ptr<Waitable> & waitable : waitable_handles_) {
440 number_of_subscriptions += waitable->get_number_of_ready_subscriptions();
442 return number_of_subscriptions;
445 size_t number_of_ready_services()
const override
447 size_t number_of_services = service_handles_.size();
448 for (
const std::shared_ptr<Waitable> & waitable : waitable_handles_) {
449 number_of_services += waitable->get_number_of_ready_services();
451 return number_of_services;
454 size_t number_of_ready_events()
const override
456 size_t number_of_events = 0;
457 for (
const std::shared_ptr<Waitable> & waitable : waitable_handles_) {
458 number_of_events += waitable->get_number_of_ready_events();
460 return number_of_events;
463 size_t number_of_ready_clients()
const override
465 size_t number_of_clients = client_handles_.size();
466 for (
const std::shared_ptr<Waitable> & waitable : waitable_handles_) {
467 number_of_clients += waitable->get_number_of_ready_clients();
469 return number_of_clients;
472 size_t number_of_guard_conditions()
const override
474 size_t number_of_guard_conditions = guard_conditions_.size();
475 for (
const std::shared_ptr<Waitable> & waitable : waitable_handles_) {
476 number_of_guard_conditions += waitable->get_number_of_ready_guard_conditions();
478 return number_of_guard_conditions;
481 size_t number_of_ready_timers()
const override
483 size_t number_of_timers = timer_handles_.size();
484 for (
const std::shared_ptr<Waitable> & waitable : waitable_handles_) {
485 number_of_timers += waitable->get_number_of_ready_timers();
487 return number_of_timers;
490 size_t number_of_waitables()
const override
492 return waitable_handles_.size();
498 std::vector<T, typename std::allocator_traits<Alloc>::template rebind_alloc<T>>;
500 VectorRebind<const rclcpp::GuardCondition *> guard_conditions_;
502 VectorRebind<std::shared_ptr<const rcl_subscription_t>> subscription_handles_;
503 VectorRebind<std::shared_ptr<const rcl_service_t>> service_handles_;
504 VectorRebind<std::shared_ptr<const rcl_client_t>> client_handles_;
505 VectorRebind<std::shared_ptr<const rcl_timer_t>> timer_handles_;
506 VectorRebind<std::shared_ptr<Waitable>> waitable_handles_;
508 std::shared_ptr<VoidAlloc> allocator_;
rcutils_allocator_t rcl_allocator_t
Encapsulation of an allocator.
A condition that can be waited on in a single wait set and asynchronously triggered.
Delegate for handling memory allocations while the Executor is executing.
Delegate for handling memory allocations while the Executor is executing.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
Container for subscription's, guard condition's, etc to be waited on.
const rcl_timer_t ** timers
Storage for timer pointers.
const rcl_service_t ** services
Storage for service pointers.
const rcl_client_t ** clients
Storage for client pointers.
const rcl_subscription_t ** subscriptions
Storage for subscription pointers.
#define RCL_RET_OK
Success return code.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_wait_set_add_subscription(rcl_wait_set_t *wait_set, const rcl_subscription_t *subscription, size_t *index)
Store a pointer to the given subscription in the next empty spot in the set.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_wait_set_add_service(rcl_wait_set_t *wait_set, const rcl_service_t *service, size_t *index)
Store a pointer to the service in the next empty spot in the set.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_wait_set_add_timer(rcl_wait_set_t *wait_set, const rcl_timer_t *timer, size_t *index)
Store a pointer to the timer in the next empty spot in the set.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_wait_set_add_client(rcl_wait_set_t *wait_set, const rcl_client_t *client, size_t *index)
Store a pointer to the client in the next empty spot in the set.