ROS 2 rclcpp + rcl - kilted  kilted
ROS 2 C++ Client Library with ROS Client Library
executor_entities_collection.cpp
1 // Copyright 2023 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "rclcpp/executors/executor_entities_collection.hpp"
16 
17 namespace rclcpp
18 {
19 namespace executors
20 {
22 {
23  return
24  subscriptions.empty() &&
25  timers.empty() &&
26  guard_conditions.empty() &&
27  clients.empty() &&
28  services.empty() &&
29  waitables.empty();
30 }
31 
33 {
34  subscriptions.clear();
35  timers.clear();
36  guard_conditions.clear();
37  clients.clear();
38  services.clear();
39  waitables.clear();
40 }
41 
43 {
44  auto remove_entities = [](auto & collection) -> size_t {
45  size_t removed = 0;
46  for (auto it = collection.begin(); it != collection.end(); ) {
47  if (it->second.entity.expired()) {
48  ++removed;
49  it = collection.erase(it);
50  } else {
51  ++it;
52  }
53  }
54  return removed;
55  };
56 
57  return
58  remove_entities(subscriptions) +
59  remove_entities(timers) +
60  remove_entities(guard_conditions) +
61  remove_entities(clients) +
62  remove_entities(services) +
63  remove_entities(waitables);
64 }
65 
66 void
67 build_entities_collection(
68  const std::vector<rclcpp::CallbackGroup::WeakPtr> & callback_groups,
69  ExecutorEntitiesCollection & collection)
70 {
71  collection.clear();
72 
73  for (auto weak_group_ptr : callback_groups) {
74  auto group_ptr = weak_group_ptr.lock();
75  if (!group_ptr) {
76  continue;
77  }
78 
79  if (group_ptr->can_be_taken_from().load()) {
80  group_ptr->collect_all_ptrs(
81  [&collection, weak_group_ptr](const rclcpp::SubscriptionBase::SharedPtr & subscription) {
82  collection.subscriptions.insert(
83  {
84  subscription->get_subscription_handle().get(),
85  {subscription, weak_group_ptr}
86  });
87  },
88  [&collection, weak_group_ptr](const rclcpp::ServiceBase::SharedPtr & service) {
89  collection.services.insert(
90  {
91  service->get_service_handle().get(),
92  {service, weak_group_ptr}
93  });
94  },
95  [&collection, weak_group_ptr](const rclcpp::ClientBase::SharedPtr & client) {
96  collection.clients.insert(
97  {
98  client->get_client_handle().get(),
99  {client, weak_group_ptr}
100  });
101  },
102  [&collection, weak_group_ptr](const rclcpp::TimerBase::SharedPtr & timer) {
103  collection.timers.insert(
104  {
105  timer->get_timer_handle().get(),
106  {timer, weak_group_ptr}
107  });
108  },
109  [&collection, weak_group_ptr](const rclcpp::Waitable::SharedPtr & waitable) {
110  collection.waitables.insert(
111  {
112  waitable.get(),
113  {waitable, weak_group_ptr}
114  });
115  }
116  );
117  }
118  }
119 }
120 
121 size_t
122 ready_executables(
123  const ExecutorEntitiesCollection & collection,
125  std::deque<rclcpp::AnyExecutable> & executables
126 )
127 {
128  size_t added = 0;
129  if (wait_result.kind() != rclcpp::WaitResultKind::Ready) {
130  return added;
131  }
132  auto rcl_wait_set = wait_result.get_wait_set().get_rcl_wait_set();
133 
134  // Cache shared pointers to groups to avoid extra work re-locking them
135  std::map<rclcpp::CallbackGroup::WeakPtr,
136  rclcpp::CallbackGroup::SharedPtr,
137  std::owner_less<rclcpp::CallbackGroup::WeakPtr>> group_map;
138 
139  auto group_cache = [&group_map](const rclcpp::CallbackGroup::WeakPtr & weak_cbg_ptr)
140  {
141  if (group_map.count(weak_cbg_ptr) == 0) {
142  group_map.insert({weak_cbg_ptr, weak_cbg_ptr.lock()});
143  }
144  return group_map.find(weak_cbg_ptr)->second;
145  };
146 
147  for (size_t ii = 0; ii < rcl_wait_set.size_of_timers; ++ii) {
148  if (nullptr == rcl_wait_set.timers[ii]) {continue;}
149  auto entity_iter = collection.timers.find(rcl_wait_set.timers[ii]);
150  if (entity_iter != collection.timers.end()) {
151  auto entity = entity_iter->second.entity.lock();
152  if (!entity) {
153  continue;
154  }
155  auto group_info = group_cache(entity_iter->second.callback_group);
156  if (!group_info || !group_info->can_be_taken_from().load()) {
157  continue;
158  }
159  if (!entity->call()) {
160  continue;
161  }
163  exec.timer = entity;
164  exec.callback_group = group_info;
165  executables.push_back(exec);
166  added++;
167  }
168  }
169 
170  for (size_t ii = 0; ii < rcl_wait_set.size_of_subscriptions; ++ii) {
171  if (nullptr == rcl_wait_set.subscriptions[ii]) {continue;}
172  auto entity_iter = collection.subscriptions.find(rcl_wait_set.subscriptions[ii]);
173  if (entity_iter != collection.subscriptions.end()) {
174  auto entity = entity_iter->second.entity.lock();
175  if (!entity) {
176  continue;
177  }
178  auto group_info = group_cache(entity_iter->second.callback_group);
179  if (!group_info || !group_info->can_be_taken_from().load()) {
180  continue;
181  }
183  exec.subscription = entity;
184  exec.callback_group = group_info;
185  executables.push_back(exec);
186  added++;
187  }
188  }
189 
190  for (size_t ii = 0; ii < rcl_wait_set.size_of_services; ++ii) {
191  if (nullptr == rcl_wait_set.services[ii]) {continue;}
192  auto entity_iter = collection.services.find(rcl_wait_set.services[ii]);
193  if (entity_iter != collection.services.end()) {
194  auto entity = entity_iter->second.entity.lock();
195  if (!entity) {
196  continue;
197  }
198  auto group_info = group_cache(entity_iter->second.callback_group);
199  if (!group_info || !group_info->can_be_taken_from().load()) {
200  continue;
201  }
203  exec.service = entity;
204  exec.callback_group = group_info;
205  executables.push_back(exec);
206  added++;
207  }
208  }
209 
210  for (size_t ii = 0; ii < rcl_wait_set.size_of_clients; ++ii) {
211  if (nullptr == rcl_wait_set.clients[ii]) {continue;}
212  auto entity_iter = collection.clients.find(rcl_wait_set.clients[ii]);
213  if (entity_iter != collection.clients.end()) {
214  auto entity = entity_iter->second.entity.lock();
215  if (!entity) {
216  continue;
217  }
218  auto group_info = group_cache(entity_iter->second.callback_group);
219  if (!group_info || !group_info->can_be_taken_from().load()) {
220  continue;
221  }
223  exec.client = entity;
224  exec.callback_group = group_info;
225  executables.push_back(exec);
226  added++;
227  }
228  }
229 
230  for (const auto & [handle, entry] : collection.waitables) {
231  auto waitable = entry.entity.lock();
232  if (!waitable) {
233  continue;
234  }
235  if (!waitable->is_ready(rcl_wait_set)) {
236  continue;
237  }
238  auto group_info = group_cache(entry.callback_group);
239  if (!group_info || !group_info->can_be_taken_from().load()) {
240  continue;
241  }
243  exec.waitable = waitable;
244  exec.callback_group = group_info;
245  executables.push_back(exec);
246  added++;
247  }
248  return added;
249 }
250 } // namespace executors
251 } // namespace rclcpp
Interface for introspecting a wait set after waiting on it.
Definition: wait_result.hpp:63
const WaitSetT & get_wait_set() const
Return the rcl wait set.
WaitResultKind kind() const
Return the kind of the WaitResult.
Definition: wait_result.hpp:96
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
Represent the total set of entities for a single executor.
TimerCollection timers
Collection of timers currently in use by the executor.
size_t remove_expired_entities()
Remove entities that have expired weak ownership.
GuardConditionCollection guard_conditions
Collection of guard conditions currently in use by the executor.
ServiceCollection services
Collection of services currently in use by the executor.
SubscriptionCollection subscriptions
Collection of subscriptions currently in use by the executor.
WaitableCollection waitables
Collection of waitables currently in use by the executor.
bool empty() const
Check if the entities collection is empty.
ClientCollection clients
Collection of clients currently in use by the executor.