ROS 2 rclcpp + rcl - rolling  rolling-a919a6e5
ROS 2 C++ Client Library with ROS Client Library
events_executor.cpp
1 // Copyright 2023 iRobot Corporation.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "rclcpp/experimental/executors/events_executor/events_executor.hpp"
16 
17 #include <memory>
18 #include <utility>
19 #include <vector>
20 
21 #include "rcpputils/scope_exit.hpp"
22 
23 using namespace std::chrono_literals;
24 
26 
27 EventsExecutor::EventsExecutor(
28  rclcpp::experimental::executors::EventsQueue::UniquePtr events_queue,
29  bool execute_timers_separate_thread,
30  const rclcpp::ExecutorOptions & options)
31 : rclcpp::Executor(options)
32 {
33  // Get ownership of the queue used to store events.
34  if (!events_queue) {
35  throw std::invalid_argument("events_queue can't be a null pointer");
36  }
37  events_queue_ = std::move(events_queue);
38 
39  // Create timers manager
40  // The timers manager can be used either to only track timers (in this case an expired
41  // timer will generate an executor event and then it will be executed by the executor thread)
42  // or it can also take care of executing expired timers in its dedicated thread.
43  std::function<void(const rclcpp::TimerBase *,
44  const std::shared_ptr<void> &)> timer_on_ready_cb = nullptr;
45  if (!execute_timers_separate_thread) {
46  timer_on_ready_cb =
47  [this](const rclcpp::TimerBase * timer_id, const std::shared_ptr<void> & data) {
48  ExecutorEvent event = {timer_id, data, -1, ExecutorEventType::TIMER_EVENT, 1};
49  this->events_queue_->enqueue(event);
50  };
51  }
52  timers_manager_ =
53  std::make_shared<rclcpp::experimental::TimersManager>(context_, timer_on_ready_cb);
54 
55  entities_need_rebuild_ = false;
56 
57  this->setup_notify_waitable();
58 
59  // Ensure that the entities collection is empty (the base class may have added elements
60  // that we are not interested in)
61  this->current_collection_.clear();
62 
63  // Make sure that the notify waitable is immediately added to the collection
64  // to avoid missing events
65  this->add_notify_waitable_to_collection(current_collection_.waitables);
66 }
67 
68 void
69 EventsExecutor::setup_notify_waitable()
70 {
71  // The base class already created this object but the events-executor
72  // needs different callbacks.
73  assert(notify_waitable_ && "The notify waitable should have already been constructed");
74 
75  notify_waitable_->set_execute_callback(
76  [this]() {
77  // This callback is invoked when:
78  // - the interrupt or shutdown guard condition is triggered:
79  // ---> we need to wake up the executor so that it can terminate
80  // - a node or callback group guard condition is triggered:
81  // ---> the entities collection is changed, we need to update callbacks
82  this->handle_updated_entities(false);
83  });
84 
85  auto notify_waitable_entity_id = notify_waitable_.get();
86  notify_waitable_->set_on_ready_callback(
87  [this, notify_waitable_entity_id](size_t num_events, int waitable_data) {
88  // The notify waitable has a special callback.
89  // We don't care about how many events as when we wake up the executor we are going to
90  // process everything regardless.
91  // For the same reason, if an event of this type has already been pushed but it has not been
92  // processed yet, we avoid pushing additional events.
93  (void)num_events;
94  if (entities_need_rebuild_.exchange(true)) {
95  return;
96  }
97 
98  ExecutorEvent event =
99  {notify_waitable_entity_id, nullptr, waitable_data, ExecutorEventType::WAITABLE_EVENT, 1};
100  this->events_queue_->enqueue(event);
101  });
102 }
103 
105 {
106  spinning.store(false);
107  notify_waitable_->clear_on_ready_callback();
108  this->refresh_current_collection({});
109 }
110 
111 void
113 {
114  if (spinning.exchange(true)) {
115  throw std::runtime_error("spin() called while already spinning");
116  }
117  RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); );
118 
119  timers_manager_->start();
120  RCPPUTILS_SCOPE_EXIT(timers_manager_->stop(); );
121 
122  while (rclcpp::ok(context_) && spinning.load()) {
123  // Wait until we get an event
124  ExecutorEvent event;
125  bool has_event = events_queue_->dequeue(event);
126  if (has_event) {
127  this->execute_event(event);
128  }
129  }
130 }
131 
132 void
133 EventsExecutor::spin_some(std::chrono::nanoseconds max_duration)
134 {
135  return this->spin_some_impl(max_duration, false);
136 }
137 
138 void
139 EventsExecutor::spin_all(std::chrono::nanoseconds max_duration)
140 {
141  if (max_duration <= 0ns) {
142  throw std::invalid_argument("max_duration must be positive");
143  }
144  return this->spin_some_impl(max_duration, true);
145 }
146 
147 void
148 EventsExecutor::spin_some_impl(std::chrono::nanoseconds max_duration, bool exhaustive)
149 {
150  if (spinning.exchange(true)) {
151  throw std::runtime_error("spin_some() called while already spinning");
152  }
153 
154  RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); );
155 
156  auto start = std::chrono::steady_clock::now();
157 
158  auto max_duration_not_elapsed = [max_duration, start]() {
159  if (std::chrono::nanoseconds(0) == max_duration) {
160  // told to spin forever if need be
161  return true;
162  } else if (std::chrono::steady_clock::now() - start < max_duration) {
163  // told to spin only for some maximum amount of time
164  return true;
165  }
166  // spun too long
167  return false;
168  };
169 
170  // If this spin is not exhaustive (e.g. spin_some), we need to explicitly check
171  // if entities need to be rebuilt here rather than letting the notify waitable event do it.
172  // A non-exhaustive spin would not check for work a second time, thus delaying the execution
173  // of some entities to the next invocation of spin.
174  if (!exhaustive) {
175  this->handle_updated_entities(false);
176  }
177 
178  // Get the number of events and timers ready at start
179  const size_t ready_events_at_start = events_queue_->size();
180  size_t executed_events = 0;
181  const size_t ready_timers_at_start = timers_manager_->get_number_ready_timers();
182  size_t executed_timers = 0;
183 
184  while (rclcpp::ok(context_) && spinning.load() && max_duration_not_elapsed()) {
185  // Execute first ready event from queue if exists
186  if (exhaustive || (executed_events < ready_events_at_start)) {
187  bool has_event = !events_queue_->empty();
188 
189  if (has_event) {
190  ExecutorEvent event;
191  bool ret = events_queue_->dequeue(event, std::chrono::nanoseconds(0));
192  if (ret) {
193  this->execute_event(event);
194  executed_events++;
195  continue;
196  }
197  }
198  }
199 
200  // Execute first timer if it is ready
201  if (exhaustive || (executed_timers < ready_timers_at_start)) {
202  bool timer_executed = timers_manager_->execute_head_timer();
203  if (timer_executed) {
204  executed_timers++;
205  continue;
206  }
207  }
208 
209  // If there's no more work available, exit
210  break;
211  }
212 }
213 
214 void
215 EventsExecutor::spin_once_impl(std::chrono::nanoseconds timeout)
216 {
217  // In this context a negative input timeout means no timeout
218  if (timeout < 0ns) {
219  timeout = std::chrono::nanoseconds::max();
220  }
221 
222  // Select the smallest between input timeout and timer timeout.
223  // Cancelled timers are not considered.
224  bool is_timer_timeout = false;
225  auto next_timer_timeout = timers_manager_->get_head_timeout();
226  if (next_timer_timeout.has_value() && next_timer_timeout.value() < timeout) {
227  timeout = next_timer_timeout.value();
228  is_timer_timeout = true;
229  }
230 
231  ExecutorEvent event;
232  bool has_event = events_queue_->dequeue(event, timeout);
233 
234  // If we wake up from the wait with an event, it means that it
235  // arrived before any of the timers expired.
236  if (has_event) {
237  this->execute_event(event);
238  } else if (is_timer_timeout) {
239  timers_manager_->execute_head_timer();
240  }
241 }
242 
243 
244 void
245 EventsExecutor::execute_event(const ExecutorEvent & event)
246 {
247  switch (event.type) {
248  case ExecutorEventType::CLIENT_EVENT:
249  {
250  rclcpp::ClientBase::SharedPtr client;
251  {
252  client = this->retrieve_entity(
253  static_cast<const rcl_client_t *>(event.entity_key),
254  current_collection_.clients);
255  }
256  if (client) {
257  for (size_t i = 0; i < event.num_events; i++) {
258  execute_client(client);
259  }
260  }
261 
262  break;
263  }
264  case ExecutorEventType::SUBSCRIPTION_EVENT:
265  {
266  rclcpp::SubscriptionBase::SharedPtr subscription;
267  {
268  subscription = this->retrieve_entity(
269  static_cast<const rcl_subscription_t *>(event.entity_key),
270  current_collection_.subscriptions);
271  }
272  if (subscription) {
273  for (size_t i = 0; i < event.num_events; i++) {
274  execute_subscription(subscription);
275  }
276  }
277  break;
278  }
279  case ExecutorEventType::SERVICE_EVENT:
280  {
281  rclcpp::ServiceBase::SharedPtr service;
282  {
283  service = this->retrieve_entity(
284  static_cast<const rcl_service_t *>(event.entity_key),
285  current_collection_.services);
286  }
287  if (service) {
288  for (size_t i = 0; i < event.num_events; i++) {
289  execute_service(service);
290  }
291  }
292 
293  break;
294  }
295  case ExecutorEventType::TIMER_EVENT:
296  {
297  timers_manager_->execute_ready_timer(
298  static_cast<const rclcpp::TimerBase *>(event.entity_key), event.data);
299  break;
300  }
301  case ExecutorEventType::WAITABLE_EVENT:
302  {
303  rclcpp::Waitable::SharedPtr waitable;
304  {
305  waitable = this->retrieve_entity(
306  static_cast<const rclcpp::Waitable *>(event.entity_key),
307  current_collection_.waitables);
308  }
309  if (waitable) {
310  for (size_t i = 0; i < event.num_events; i++) {
311  const auto data = waitable->take_data_by_entity_id(event.waitable_data);
312  waitable->execute(data);
313  }
314  }
315  break;
316  }
317  }
318 }
319 
320 void
321 EventsExecutor::handle_updated_entities([[maybe_unused]] bool notify)
322 {
323  // Do not rebuild if we don't need to.
324  // A rebuild event could be generated, but then
325  // this function could end up being called from somewhere else
326  // before that event gets processed, for example if
327  // a node or callback group is manually added to the executor.
328  const bool notify_waitable_triggered = entities_need_rebuild_.exchange(false);
329  if (!notify_waitable_triggered && !this->collector_.has_pending()) {
330  return;
331  }
332 
333  // Build the new collection
335  auto callback_groups = this->collector_.get_all_callback_groups();
337  rclcpp::executors::build_entities_collection(callback_groups, new_collection);
338 
339  // TODO(alsora): this may be implemented in a better way.
340  // We need the notify waitable to be included in the executor "current_collection"
341  // because we need to be able to retrieve events for it.
342  // We could explicitly check for the notify waitable ID when we receive a waitable event
343  // but I think that it's better if the waitable was in the collection and it could be
344  // retrieved in the "standard" way.
345  // To do it, we need to add the notify waitable as an entry in the new collection
346  // such that it's neither added or removed (it should have already been added
347  // to the current collection in the constructor)
348  this->add_notify_waitable_to_collection(new_collection.waitables);
349 
350  this->refresh_current_collection(new_collection);
351 }
352 
353 void
354 EventsExecutor::refresh_current_collection(
355  const rclcpp::executors::ExecutorEntitiesCollection & new_collection)
356 {
357  // Acquire lock before modifying the current collection
358  std::lock_guard<std::mutex> guard(mutex_);
359 
360  // Remove expired entities to ensure re-initialized objects
361  // are updated. This fixes issues with stale state entities.
362  // See: https://github.com/ros2/rclcpp/pull/2586
363  current_collection_.remove_expired_entities();
364 
365  current_collection_.timers.update(
366  new_collection.timers,
367  [this](rclcpp::TimerBase::SharedPtr timer) {timers_manager_->add_timer(timer);},
368  [this](rclcpp::TimerBase::SharedPtr timer) {timers_manager_->remove_timer(timer);});
369 
370  current_collection_.subscriptions.update(
371  new_collection.subscriptions,
372  [this](auto subscription) {
373  subscription->set_on_new_message_callback(
374  this->create_entity_callback(
375  subscription->get_subscription_handle().get(), ExecutorEventType::SUBSCRIPTION_EVENT));
376  },
377  [](auto subscription) {subscription->clear_on_new_message_callback();});
378 
379  current_collection_.clients.update(
380  new_collection.clients,
381  [this](auto client) {
382  client->set_on_new_response_callback(
383  this->create_entity_callback(
384  client->get_client_handle().get(), ExecutorEventType::CLIENT_EVENT));
385  },
386  [](auto client) {client->clear_on_new_response_callback();});
387 
388  current_collection_.services.update(
389  new_collection.services,
390  [this](auto service) {
391  service->set_on_new_request_callback(
392  this->create_entity_callback(
393  service->get_service_handle().get(), ExecutorEventType::SERVICE_EVENT));
394  },
395  [](auto service) {service->clear_on_new_request_callback();});
396 
397  // DO WE NEED THIS? WE ARE NOT DOING ANYTHING WITH GUARD CONDITIONS
398  /*
399  current_collection_.guard_conditions.update(new_collection.guard_conditions,
400  [](auto guard_condition) {(void)guard_condition;},
401  [](auto guard_condition) {guard_condition->set_on_trigger_callback(nullptr);});
402  */
403 
404  current_collection_.waitables.update(
405  new_collection.waitables,
406  [this](auto waitable) {
407  waitable->set_on_ready_callback(
408  this->create_waitable_callback(waitable.get()));
409  for (const auto & t : waitable->get_timers()) {
410  timers_manager_->add_timer(t);
411  }
412  },
413  [this](auto waitable) {
414  waitable->clear_on_ready_callback();
415  for (const auto & t : waitable->get_timers()) {
416  timers_manager_->remove_timer(t);
417  }
418  });
419 }
420 
421 std::function<void(size_t)>
422 EventsExecutor::create_entity_callback(
423  void * entity_key, ExecutorEventType event_type)
424 {
425  std::function<void(size_t)>
426  callback = [this, entity_key, event_type](size_t num_events) {
427  ExecutorEvent event = {entity_key, nullptr, -1, event_type, num_events};
428  this->events_queue_->enqueue(event);
429  };
430  return callback;
431 }
432 
433 std::function<void(size_t, int)>
434 EventsExecutor::create_waitable_callback(const rclcpp::Waitable * entity_key)
435 {
436  std::function<void(size_t, int)>
437  callback = [this, entity_key](size_t num_events, int waitable_data) {
438  ExecutorEvent event =
439  {entity_key, nullptr, waitable_data, ExecutorEventType::WAITABLE_EVENT, num_events};
440  this->events_queue_->enqueue(event);
441  };
442  return callback;
443 }
444 
445 void
446 EventsExecutor::add_notify_waitable_to_collection(
448 {
449  // The notify waitable is not associated to any group, so use an invalid one
450  rclcpp::CallbackGroup::WeakPtr weak_group_ptr;
451  collection.insert(
452  {
453  this->notify_waitable_.get(),
454  {this->notify_waitable_, weak_group_ptr}
455  });
456 }
Coordinate the order and timing of available communication tasks.
Definition: executor.hpp:65
static RCLCPP_PUBLIC void execute_service(rclcpp::ServiceBase::SharedPtr service)
Run service server executable.
Definition: executor.cpp:650
std::shared_ptr< rclcpp::Context > context_
The context associated with this executor.
Definition: executor.hpp:572
static RCLCPP_PUBLIC void execute_subscription(rclcpp::SubscriptionBase::SharedPtr subscription)
Run subscription executable.
Definition: executor.cpp:542
std::shared_ptr< rclcpp::executors::ExecutorNotifyWaitable > notify_waitable_
Waitable containing guard conditions controlling the executor flow.
Definition: executor.hpp:587
static RCLCPP_PUBLIC void execute_client(rclcpp::ClientBase::SharedPtr client)
Run service client executable.
Definition: executor.cpp:662
rclcpp::executors::ExecutorEntitiesCollector collector_
Collector used to associate executable entities from nodes and guard conditions.
Definition: executor.hpp:592
std::atomic_bool spinning
Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins.
Definition: executor.hpp:561
RCLCPP_PUBLIC void update_collections()
Update the underlying collections.
RCLCPP_PUBLIC std::vector< rclcpp::CallbackGroup::WeakPtr > get_all_callback_groups() const
Get all callback groups known to this entity collector.
bool has_pending() const
Indicate if the entities collector has pending additions or removals.
RCLCPP_PUBLIC void spin_once_impl(std::chrono::nanoseconds timeout) override
Internal implementation of spin_once.
RCLCPP_PUBLIC void spin() override
Events executor implementation of spin.
virtual RCLCPP_PUBLIC ~EventsExecutor()
Default destructor.
RCLCPP_PUBLIC void spin_some_impl(std::chrono::nanoseconds max_duration, bool exhaustive)
Internal implementation of spin_some.
RCLCPP_PUBLIC void spin_all(std::chrono::nanoseconds max_duration) override
Events executor implementation of spin all.
RCLCPP_PUBLIC void handle_updated_entities(bool notify) override
Collect entities from callback groups and refresh the current collection with them.
RCLCPP_PUBLIC void spin_some(std::chrono::nanoseconds max_duration=std::chrono::nanoseconds(0)) override
Events executor implementation of spin some.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
RCLCPP_PUBLIC bool ok(rclcpp::Context::SharedPtr context=nullptr)
Check rclcpp's status.
Structure which encapsulates a ROS Client.
Definition: client.h:43
Structure which encapsulates a ROS Service.
Definition: service.h:43
Structure which encapsulates a ROS Subscription.
Definition: subscription.h:40
Options to be passed to the executor constructor.
Represent the total set of entities for a single executor.
TimerCollection timers
Collection of timers currently in use by the executor.
ServiceCollection services
Collection of services currently in use by the executor.
SubscriptionCollection subscriptions
Collection of subscriptions currently in use by the executor.
WaitableCollection waitables
Collection of waitables currently in use by the executor.
ClientCollection clients
Collection of clients currently in use by the executor.