15 #ifndef RCLCPP__STRATEGIES__ALLOCATOR_MEMORY_STRATEGY_HPP_
16 #define RCLCPP__STRATEGIES__ALLOCATOR_MEMORY_STRATEGY_HPP_
23 #include "rclcpp/allocator/allocator_common.hpp"
24 #include "rclcpp/detail/add_guard_condition_to_rcl_wait_set.hpp"
25 #include "rclcpp/memory_strategy.hpp"
26 #include "rclcpp/node.hpp"
27 #include "rclcpp/visibility_control.hpp"
29 #include "rcutils/logging_macros.h"
31 #include "rmw/types.h"
35 namespace memory_strategies
37 namespace allocator_memory_strategy
46 template<
typename Alloc = std::allocator<
void>>
52 using VoidAllocTraits =
typename allocator::AllocRebind<void *, Alloc>;
53 using VoidAlloc =
typename VoidAllocTraits::allocator_type;
57 allocator_ = std::make_shared<VoidAlloc>(*allocator.get());
62 allocator_ = std::make_shared<VoidAlloc>();
67 for (
const auto & existing_guard_condition : guard_conditions_) {
68 if (existing_guard_condition == &guard_condition) {
72 guard_conditions_.push_back(&guard_condition);
77 for (
auto it = guard_conditions_.begin(); it != guard_conditions_.end(); ++it) {
78 if (*it == guard_condition) {
79 guard_conditions_.erase(it);
85 void clear_handles()
override
87 subscription_handles_.clear();
88 service_handles_.clear();
89 client_handles_.clear();
90 timer_handles_.clear();
91 waitable_handles_.clear();
102 for (
size_t i = 0; i < subscription_handles_.size(); ++i) {
104 subscription_handles_[i].reset();
107 for (
size_t i = 0; i < service_handles_.size(); ++i) {
109 service_handles_[i].reset();
112 for (
size_t i = 0; i < client_handles_.size(); ++i) {
114 client_handles_[i].reset();
117 for (
size_t i = 0; i < timer_handles_.size(); ++i) {
118 if (!wait_set->
timers[i]) {
119 timer_handles_[i].reset();
122 for (
size_t i = 0; i < waitable_handles_.size(); ++i) {
123 if (!waitable_handles_[i]->is_ready(*wait_set)) {
124 waitable_handles_[i].reset();
128 subscription_handles_.erase(
129 std::remove(subscription_handles_.begin(), subscription_handles_.end(),
nullptr),
130 subscription_handles_.end()
133 service_handles_.erase(
134 std::remove(service_handles_.begin(), service_handles_.end(),
nullptr),
135 service_handles_.end()
138 client_handles_.erase(
139 std::remove(client_handles_.begin(), client_handles_.end(),
nullptr),
140 client_handles_.end()
143 timer_handles_.erase(
144 std::remove(timer_handles_.begin(), timer_handles_.end(),
nullptr),
148 waitable_handles_.erase(
149 std::remove(waitable_handles_.begin(), waitable_handles_.end(),
nullptr),
150 waitable_handles_.end()
154 bool collect_entities(
const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes)
override
156 bool has_invalid_weak_groups_or_nodes =
false;
157 for (
const auto & pair : weak_groups_to_nodes) {
158 auto group = pair.first.lock();
159 auto node = pair.second.lock();
160 if (group ==
nullptr || node ==
nullptr) {
161 has_invalid_weak_groups_or_nodes =
true;
164 if (!group || !group->can_be_taken_from().load()) {
168 group->collect_all_ptrs(
169 [
this](
const rclcpp::SubscriptionBase::SharedPtr & subscription) {
170 subscription_handles_.push_back(subscription->get_subscription_handle());
172 [
this](
const rclcpp::ServiceBase::SharedPtr & service) {
173 service_handles_.push_back(service->get_service_handle());
175 [
this](
const rclcpp::ClientBase::SharedPtr & client) {
176 client_handles_.push_back(client->get_client_handle());
178 [
this](
const rclcpp::TimerBase::SharedPtr & timer) {
179 timer_handles_.push_back(timer->get_timer_handle());
181 [
this](
const rclcpp::Waitable::SharedPtr & waitable) {
182 waitable_handles_.push_back(waitable);
186 return has_invalid_weak_groups_or_nodes;
189 void add_waitable_handle(
const rclcpp::Waitable::SharedPtr & waitable)
override
191 if (
nullptr == waitable) {
192 throw std::runtime_error(
"waitable object unexpectedly nullptr");
194 waitable_handles_.push_back(waitable);
199 for (
const std::shared_ptr<const rcl_subscription_t> & subscription : subscription_handles_) {
201 RCUTILS_LOG_ERROR_NAMED(
203 "Couldn't add subscription to wait set: %s", rcl_get_error_string().str);
208 for (
const std::shared_ptr<const rcl_client_t> & client : client_handles_) {
210 RCUTILS_LOG_ERROR_NAMED(
212 "Couldn't add client to wait set: %s", rcl_get_error_string().str);
217 for (
const std::shared_ptr<const rcl_service_t> & service : service_handles_) {
219 RCUTILS_LOG_ERROR_NAMED(
221 "Couldn't add service to wait set: %s", rcl_get_error_string().str);
226 for (
const std::shared_ptr<const rcl_timer_t> & timer : timer_handles_) {
228 RCUTILS_LOG_ERROR_NAMED(
230 "Couldn't add timer to wait set: %s", rcl_get_error_string().str);
235 for (
auto guard_condition : guard_conditions_) {
236 detail::add_guard_condition_to_rcl_wait_set(*wait_set, *guard_condition);
239 for (
const std::shared_ptr<Waitable> & waitable : waitable_handles_) {
240 waitable->add_to_wait_set(*wait_set);
246 get_next_subscription(
248 const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes)
override
250 auto it = subscription_handles_.begin();
251 while (it != subscription_handles_.end()) {
252 auto subscription = get_subscription_by_handle(*it, weak_groups_to_nodes);
255 auto group = get_group_by_subscription(subscription, weak_groups_to_nodes);
259 it = subscription_handles_.erase(it);
262 if (!group->can_be_taken_from().load()) {
269 any_exec.subscription = subscription;
270 any_exec.callback_group = group;
271 any_exec.node_base = get_node_by_group(group, weak_groups_to_nodes);
272 subscription_handles_.erase(it);
276 it = subscription_handles_.erase(it);
283 const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes)
override
285 auto it = service_handles_.begin();
286 while (it != service_handles_.end()) {
287 auto service = get_service_by_handle(*it, weak_groups_to_nodes);
290 auto group = get_group_by_service(service, weak_groups_to_nodes);
294 it = service_handles_.erase(it);
297 if (!group->can_be_taken_from().load()) {
304 any_exec.service = service;
305 any_exec.callback_group = group;
306 any_exec.node_base = get_node_by_group(group, weak_groups_to_nodes);
307 service_handles_.erase(it);
311 it = service_handles_.erase(it);
318 const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes)
override
320 auto it = client_handles_.begin();
321 while (it != client_handles_.end()) {
322 auto client = get_client_by_handle(*it, weak_groups_to_nodes);
325 auto group = get_group_by_client(client, weak_groups_to_nodes);
329 it = client_handles_.erase(it);
332 if (!group->can_be_taken_from().load()) {
339 any_exec.client = client;
340 any_exec.callback_group = group;
341 any_exec.node_base = get_node_by_group(group, weak_groups_to_nodes);
342 client_handles_.erase(it);
346 it = client_handles_.erase(it);
353 const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes)
override
355 auto it = timer_handles_.begin();
356 while (it != timer_handles_.end()) {
357 auto timer = get_timer_by_handle(*it, weak_groups_to_nodes);
360 auto group = get_group_by_timer(timer, weak_groups_to_nodes);
364 it = timer_handles_.erase(it);
367 if (!group->can_be_taken_from().load()) {
373 auto data = timer->call();
380 any_exec.timer = timer;
381 any_exec.callback_group = group;
382 any_exec.node_base = get_node_by_group(group, weak_groups_to_nodes);
383 any_exec.data = data;
384 timer_handles_.erase(it);
388 it = timer_handles_.erase(it);
395 const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes)
override
397 auto it = waitable_handles_.begin();
398 while (it != waitable_handles_.end()) {
399 std::shared_ptr<Waitable> & waitable = *it;
402 auto group = get_group_by_waitable(waitable, weak_groups_to_nodes);
406 it = waitable_handles_.erase(it);
409 if (!group->can_be_taken_from().load()) {
416 any_exec.waitable = waitable;
417 any_exec.callback_group = group;
418 any_exec.node_base = get_node_by_group(group, weak_groups_to_nodes);
419 waitable_handles_.erase(it);
423 it = waitable_handles_.erase(it);
429 return rclcpp::allocator::get_rcl_allocator<void *, VoidAlloc>(*allocator_.get());
432 size_t number_of_ready_subscriptions()
const override
434 size_t number_of_subscriptions = subscription_handles_.size();
435 for (
const std::shared_ptr<Waitable> & waitable : waitable_handles_) {
436 number_of_subscriptions += waitable->get_number_of_ready_subscriptions();
438 return number_of_subscriptions;
441 size_t number_of_ready_services()
const override
443 size_t number_of_services = service_handles_.size();
444 for (
const std::shared_ptr<Waitable> & waitable : waitable_handles_) {
445 number_of_services += waitable->get_number_of_ready_services();
447 return number_of_services;
450 size_t number_of_ready_events()
const override
452 size_t number_of_events = 0;
453 for (
const std::shared_ptr<Waitable> & waitable : waitable_handles_) {
454 number_of_events += waitable->get_number_of_ready_events();
456 return number_of_events;
459 size_t number_of_ready_clients()
const override
461 size_t number_of_clients = client_handles_.size();
462 for (
const std::shared_ptr<Waitable> & waitable : waitable_handles_) {
463 number_of_clients += waitable->get_number_of_ready_clients();
465 return number_of_clients;
468 size_t number_of_guard_conditions()
const override
470 size_t number_of_guard_conditions = guard_conditions_.size();
471 for (
const std::shared_ptr<Waitable> & waitable : waitable_handles_) {
472 number_of_guard_conditions += waitable->get_number_of_ready_guard_conditions();
474 return number_of_guard_conditions;
477 size_t number_of_ready_timers()
const override
479 size_t number_of_timers = timer_handles_.size();
480 for (
const std::shared_ptr<Waitable> & waitable : waitable_handles_) {
481 number_of_timers += waitable->get_number_of_ready_timers();
483 return number_of_timers;
486 size_t number_of_waitables()
const override
488 return waitable_handles_.size();
494 std::vector<T, typename std::allocator_traits<Alloc>::template rebind_alloc<T>>;
496 VectorRebind<const rclcpp::GuardCondition *> guard_conditions_;
498 VectorRebind<std::shared_ptr<const rcl_subscription_t>> subscription_handles_;
499 VectorRebind<std::shared_ptr<const rcl_service_t>> service_handles_;
500 VectorRebind<std::shared_ptr<const rcl_client_t>> client_handles_;
501 VectorRebind<std::shared_ptr<const rcl_timer_t>> timer_handles_;
502 VectorRebind<std::shared_ptr<Waitable>> waitable_handles_;
504 std::shared_ptr<VoidAlloc> allocator_;
rcutils_allocator_t rcl_allocator_t
Encapsulation of an allocator.
A condition that can be waited on in a single wait set and asynchronously triggered.
Delegate for handling memory allocations while the Executor is executing.
Delegate for handling memory allocations while the Executor is executing.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
Container for subscription's, guard condition's, etc to be waited on.
const rcl_timer_t ** timers
Storage for timer pointers.
const rcl_service_t ** services
Storage for service pointers.
const rcl_client_t ** clients
Storage for client pointers.
const rcl_subscription_t ** subscriptions
Storage for subscription pointers.
#define RCL_RET_OK
Success return code.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_wait_set_add_subscription(rcl_wait_set_t *wait_set, const rcl_subscription_t *subscription, size_t *index)
Store a pointer to the given subscription in the next empty spot in the set.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_wait_set_add_service(rcl_wait_set_t *wait_set, const rcl_service_t *service, size_t *index)
Store a pointer to the service in the next empty spot in the set.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_wait_set_add_timer(rcl_wait_set_t *wait_set, const rcl_timer_t *timer, size_t *index)
Store a pointer to the timer in the next empty spot in the set.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_wait_set_add_client(rcl_wait_set_t *wait_set, const rcl_client_t *client, size_t *index)
Store a pointer to the client in the next empty spot in the set.