15 #include "rclcpp/executors/executor_entities_collection.hpp"
44 auto remove_entities = [](
auto & collection) ->
size_t {
46 for (
auto it = collection.begin(); it != collection.end(); ) {
47 if (it->second.entity.expired()) {
49 it = collection.erase(it);
67 build_entities_collection(
68 const std::vector<rclcpp::CallbackGroup::WeakPtr> & callback_groups,
73 for (
auto weak_group_ptr : callback_groups) {
74 auto group_ptr = weak_group_ptr.lock();
79 if (group_ptr->can_be_taken_from().load()) {
80 group_ptr->collect_all_ptrs(
81 [&collection, weak_group_ptr](
const rclcpp::SubscriptionBase::SharedPtr & subscription) {
84 subscription->get_subscription_handle().get(),
85 {subscription, weak_group_ptr}
88 [&collection, weak_group_ptr](
const rclcpp::ServiceBase::SharedPtr & service) {
91 service->get_service_handle().get(),
92 {service, weak_group_ptr}
95 [&collection, weak_group_ptr](
const rclcpp::ClientBase::SharedPtr & client) {
98 client->get_client_handle().get(),
99 {client, weak_group_ptr}
102 [&collection, weak_group_ptr](
const rclcpp::TimerBase::SharedPtr & timer) {
105 timer->get_timer_handle().get(),
106 {timer, weak_group_ptr}
109 [&collection, weak_group_ptr](
const rclcpp::Waitable::SharedPtr & waitable) {
113 {waitable, weak_group_ptr}
123 const ExecutorEntitiesCollection & collection,
125 std::deque<rclcpp::AnyExecutable> & executables
129 if (wait_result.
kind() != rclcpp::WaitResultKind::Ready) {
132 auto rcl_wait_set = wait_result.
get_wait_set().get_rcl_wait_set();
135 std::map<rclcpp::CallbackGroup::WeakPtr,
136 rclcpp::CallbackGroup::SharedPtr,
137 std::owner_less<rclcpp::CallbackGroup::WeakPtr>> group_map;
139 auto group_cache = [&group_map](
const rclcpp::CallbackGroup::WeakPtr & weak_cbg_ptr)
141 if (group_map.count(weak_cbg_ptr) == 0) {
142 group_map.insert({weak_cbg_ptr, weak_cbg_ptr.lock()});
144 return group_map.find(weak_cbg_ptr)->second;
147 for (
size_t ii = 0; ii < rcl_wait_set.size_of_timers; ++ii) {
148 if (
nullptr == rcl_wait_set.timers[ii]) {
continue;}
149 auto entity_iter = collection.timers.find(rcl_wait_set.timers[ii]);
150 if (entity_iter != collection.timers.end()) {
151 auto entity = entity_iter->second.entity.lock();
155 auto group_info = group_cache(entity_iter->second.callback_group);
156 if (!group_info || !group_info->can_be_taken_from().load()) {
159 if (!entity->call()) {
164 exec.callback_group = group_info;
165 executables.push_back(exec);
170 for (
size_t ii = 0; ii < rcl_wait_set.size_of_subscriptions; ++ii) {
171 if (
nullptr == rcl_wait_set.subscriptions[ii]) {
continue;}
172 auto entity_iter = collection.subscriptions.find(rcl_wait_set.subscriptions[ii]);
173 if (entity_iter != collection.subscriptions.end()) {
174 auto entity = entity_iter->second.entity.lock();
178 auto group_info = group_cache(entity_iter->second.callback_group);
179 if (!group_info || !group_info->can_be_taken_from().load()) {
183 exec.subscription = entity;
184 exec.callback_group = group_info;
185 executables.push_back(exec);
190 for (
size_t ii = 0; ii < rcl_wait_set.size_of_services; ++ii) {
191 if (
nullptr == rcl_wait_set.services[ii]) {
continue;}
192 auto entity_iter = collection.services.find(rcl_wait_set.services[ii]);
193 if (entity_iter != collection.services.end()) {
194 auto entity = entity_iter->second.entity.lock();
198 auto group_info = group_cache(entity_iter->second.callback_group);
199 if (!group_info || !group_info->can_be_taken_from().load()) {
203 exec.service = entity;
204 exec.callback_group = group_info;
205 executables.push_back(exec);
210 for (
size_t ii = 0; ii < rcl_wait_set.size_of_clients; ++ii) {
211 if (
nullptr == rcl_wait_set.clients[ii]) {
continue;}
212 auto entity_iter = collection.clients.find(rcl_wait_set.clients[ii]);
213 if (entity_iter != collection.clients.end()) {
214 auto entity = entity_iter->second.entity.lock();
218 auto group_info = group_cache(entity_iter->second.callback_group);
219 if (!group_info || !group_info->can_be_taken_from().load()) {
223 exec.client = entity;
224 exec.callback_group = group_info;
225 executables.push_back(exec);
230 for (
const auto & [handle, entry] : collection.waitables) {
231 auto waitable = entry.entity.lock();
235 if (!waitable->is_ready(rcl_wait_set)) {
238 auto group_info = group_cache(entry.callback_group);
239 if (!group_info || !group_info->can_be_taken_from().load()) {
243 exec.waitable = waitable;
244 exec.callback_group = group_info;
245 executables.push_back(exec);
Interface for introspecting a wait set after waiting on it.
const WaitSetT & get_wait_set() const
Return the rcl wait set.
WaitResultKind kind() const
Return the kind of the WaitResult.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
Represent the total set of entities for a single executor.
TimerCollection timers
Collection of timers currently in use by the executor.
size_t remove_expired_entities()
Remove entities that have expired weak ownership.
GuardConditionCollection guard_conditions
Collection of guard conditions currently in use by the executor.
void clear()
Clear the entities collection.
ServiceCollection services
Collection of services currently in use by the executor.
SubscriptionCollection subscriptions
Collection of subscriptions currently in use by the executor.
WaitableCollection waitables
Collection of waitables currently in use by the executor.
bool empty() const
Check if the entities collection is empty.
ClientCollection clients
Collection of clients currently in use by the executor.