ROS 2 rclcpp + rcl - jazzy  jazzy
ROS 2 C++ Client Library with ROS Client Library
events_executor.cpp
1 // Copyright 2023 iRobot Corporation.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "rclcpp/experimental/executors/events_executor/events_executor.hpp"
16 
17 #include <memory>
18 #include <utility>
19 #include <vector>
20 
21 #include "rcpputils/scope_exit.hpp"
22 
23 using namespace std::chrono_literals;
24 
26 
27 EventsExecutor::EventsExecutor(
28  rclcpp::experimental::executors::EventsQueue::UniquePtr events_queue,
29  bool execute_timers_separate_thread,
30  const rclcpp::ExecutorOptions & options)
31 : rclcpp::Executor(options)
32 {
33  // Get ownership of the queue used to store events.
34  if (!events_queue) {
35  throw std::invalid_argument("events_queue can't be a null pointer");
36  }
37  events_queue_ = std::move(events_queue);
38 
39  // Create timers manager
40  // The timers manager can be used either to only track timers (in this case an expired
41  // timer will generate an executor event and then it will be executed by the executor thread)
42  // or it can also take care of executing expired timers in its dedicated thread.
43  std::function<void(const rclcpp::TimerBase *,
44  const std::shared_ptr<void> &)> timer_on_ready_cb = nullptr;
45  if (!execute_timers_separate_thread) {
46  timer_on_ready_cb =
47  [this](const rclcpp::TimerBase * timer_id, const std::shared_ptr<void> & data) {
48  ExecutorEvent event = {timer_id, data, -1, ExecutorEventType::TIMER_EVENT, 1};
49  this->events_queue_->enqueue(event);
50  };
51  }
52  timers_manager_ =
53  std::make_shared<rclcpp::experimental::TimersManager>(context_, timer_on_ready_cb);
54 
55  this->current_entities_collection_ =
56  std::make_shared<rclcpp::executors::ExecutorEntitiesCollection>();
57 
58  notify_waitable_ = std::make_shared<rclcpp::executors::ExecutorNotifyWaitable>(
59  [this]() {
60  // This callback is invoked when:
61  // - the interrupt or shutdown guard condition is triggered:
62  // ---> we need to wake up the executor so that it can terminate
63  // - a node or callback group guard condition is triggered:
64  // ---> the entities collection is changed, we need to update callbacks
65  this->refresh_current_collection_from_callback_groups();
66  });
67 
68  // Make sure that the notify waitable is immediately added to the collection
69  // to avoid missing events
70  this->add_notify_waitable_to_collection(current_entities_collection_->waitables);
71 
72  notify_waitable_->add_guard_condition(interrupt_guard_condition_);
73  notify_waitable_->add_guard_condition(shutdown_guard_condition_);
74 
75  notify_waitable_->set_on_ready_callback(
76  this->create_waitable_callback(notify_waitable_.get()));
77 
78  auto notify_waitable_entity_id = notify_waitable_.get();
79  notify_waitable_->set_on_ready_callback(
80  [this, notify_waitable_entity_id](size_t num_events, int waitable_data) {
81  // The notify waitable has a special callback.
82  // We don't care about how many events as when we wake up the executor we are going to
83  // process everything regardless.
84  // For the same reason, if an event of this type has already been pushed but it has not been
85  // processed yet, we avoid pushing additional events.
86  (void)num_events;
87  if (notify_waitable_event_pushed_.exchange(true)) {
88  return;
89  }
90 
91  ExecutorEvent event =
92  {notify_waitable_entity_id, nullptr, waitable_data, ExecutorEventType::WAITABLE_EVENT, 1};
93  this->events_queue_->enqueue(event);
94  });
95 
96  this->entities_collector_ =
97  std::make_shared<rclcpp::executors::ExecutorEntitiesCollector>(notify_waitable_);
98 }
99 
101 {
102  spinning.store(false);
103  notify_waitable_->clear_on_ready_callback();
104  this->refresh_current_collection({});
105 }
106 
107 void
109 {
110  if (spinning.exchange(true)) {
111  throw std::runtime_error("spin() called while already spinning");
112  }
113  RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); );
114 
115  timers_manager_->start();
116  RCPPUTILS_SCOPE_EXIT(timers_manager_->stop(); );
117 
118  while (rclcpp::ok(context_) && spinning.load()) {
119  // Wait until we get an event
120  ExecutorEvent event;
121  bool has_event = events_queue_->dequeue(event);
122  if (has_event) {
123  this->execute_event(event);
124  }
125  }
126 }
127 
128 void
129 EventsExecutor::spin_some(std::chrono::nanoseconds max_duration)
130 {
131  return this->spin_some_impl(max_duration, false);
132 }
133 
134 void
135 EventsExecutor::spin_all(std::chrono::nanoseconds max_duration)
136 {
137  if (max_duration <= 0ns) {
138  throw std::invalid_argument("max_duration must be positive");
139  }
140  return this->spin_some_impl(max_duration, true);
141 }
142 
143 void
144 EventsExecutor::spin_some_impl(std::chrono::nanoseconds max_duration, bool exhaustive)
145 {
146  if (spinning.exchange(true)) {
147  throw std::runtime_error("spin_some() called while already spinning");
148  }
149 
150  RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); );
151 
152  auto start = std::chrono::steady_clock::now();
153 
154  auto max_duration_not_elapsed = [max_duration, start]() {
155  if (std::chrono::nanoseconds(0) == max_duration) {
156  // told to spin forever if need be
157  return true;
158  } else if (std::chrono::steady_clock::now() - start < max_duration) {
159  // told to spin only for some maximum amount of time
160  return true;
161  }
162  // spun too long
163  return false;
164  };
165 
166  // If this spin is not exhaustive (e.g. spin_some), we need to explicitly check
167  // if entities need to be rebuilt here rather than letting the notify waitable event do it.
168  // A non-exhaustive spin would not check for work a second time, thus delaying the execution
169  // of some entities to the next invocation of spin.
170  if (!exhaustive) {
171  this->refresh_current_collection_from_callback_groups();
172  }
173 
174  // Get the number of events and timers ready at start
175  const size_t ready_events_at_start = events_queue_->size();
176  size_t executed_events = 0;
177  const size_t ready_timers_at_start = timers_manager_->get_number_ready_timers();
178  size_t executed_timers = 0;
179 
180  while (rclcpp::ok(context_) && spinning.load() && max_duration_not_elapsed()) {
181  // Execute first ready event from queue if exists
182  if (exhaustive || (executed_events < ready_events_at_start)) {
183  bool has_event = !events_queue_->empty();
184 
185  if (has_event) {
186  ExecutorEvent event;
187  bool ret = events_queue_->dequeue(event, std::chrono::nanoseconds(0));
188  if (ret) {
189  this->execute_event(event);
190  executed_events++;
191  continue;
192  }
193  }
194  }
195 
196  // Execute first timer if it is ready
197  if (exhaustive || (executed_timers < ready_timers_at_start)) {
198  bool timer_executed = timers_manager_->execute_head_timer();
199  if (timer_executed) {
200  executed_timers++;
201  continue;
202  }
203  }
204 
205  // If there's no more work available, exit
206  break;
207  }
208 }
209 
210 void
211 EventsExecutor::spin_once_impl(std::chrono::nanoseconds timeout)
212 {
213  // In this context a negative input timeout means no timeout
214  if (timeout < 0ns) {
215  timeout = std::chrono::nanoseconds::max();
216  }
217 
218  // Select the smallest between input timeout and timer timeout.
219  // Cancelled timers are not considered.
220  bool is_timer_timeout = false;
221  auto next_timer_timeout = timers_manager_->get_head_timeout();
222  if (next_timer_timeout.has_value() && next_timer_timeout.value() < timeout) {
223  timeout = next_timer_timeout.value();
224  is_timer_timeout = true;
225  }
226 
227  ExecutorEvent event;
228  bool has_event = events_queue_->dequeue(event, timeout);
229 
230  // If we wake up from the wait with an event, it means that it
231  // arrived before any of the timers expired.
232  if (has_event) {
233  this->execute_event(event);
234  } else if (is_timer_timeout) {
235  timers_manager_->execute_head_timer();
236  }
237 }
238 
239 void
241  rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify)
242 {
243  // This field is unused because we don't have to wake up the executor when a node is added.
244  (void) notify;
245 
246  // Add node to entities collector
247  this->entities_collector_->add_node(node_ptr);
248 
249  this->refresh_current_collection_from_callback_groups();
250 }
251 
252 void
253 EventsExecutor::add_node(std::shared_ptr<rclcpp::Node> node_ptr, bool notify)
254 {
255  this->add_node(node_ptr->get_node_base_interface(), notify);
256 }
257 
258 void
260  rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify)
261 {
262  // This field is unused because we don't have to wake up the executor when a node is removed.
263  (void)notify;
264 
265  // Remove node from entities collector.
266  // This will result in un-setting all the event callbacks from its entities.
267  // After this function returns, this executor will not receive any more events associated
268  // to these entities.
269  this->entities_collector_->remove_node(node_ptr);
270 
271  this->refresh_current_collection_from_callback_groups();
272 }
273 
274 void
275 EventsExecutor::remove_node(std::shared_ptr<rclcpp::Node> node_ptr, bool notify)
276 {
277  this->remove_node(node_ptr->get_node_base_interface(), notify);
278 }
279 
280 void
281 EventsExecutor::execute_event(const ExecutorEvent & event)
282 {
283  switch (event.type) {
284  case ExecutorEventType::CLIENT_EVENT:
285  {
286  rclcpp::ClientBase::SharedPtr client;
287  {
288  std::lock_guard<std::recursive_mutex> lock(collection_mutex_);
289  client = this->retrieve_entity(
290  static_cast<const rcl_client_t *>(event.entity_key),
291  current_entities_collection_->clients);
292  }
293  if (client) {
294  for (size_t i = 0; i < event.num_events; i++) {
295  execute_client(client);
296  }
297  }
298 
299  break;
300  }
301  case ExecutorEventType::SUBSCRIPTION_EVENT:
302  {
303  rclcpp::SubscriptionBase::SharedPtr subscription;
304  {
305  std::lock_guard<std::recursive_mutex> lock(collection_mutex_);
306  subscription = this->retrieve_entity(
307  static_cast<const rcl_subscription_t *>(event.entity_key),
308  current_entities_collection_->subscriptions);
309  }
310  if (subscription) {
311  for (size_t i = 0; i < event.num_events; i++) {
312  execute_subscription(subscription);
313  }
314  }
315  break;
316  }
317  case ExecutorEventType::SERVICE_EVENT:
318  {
319  rclcpp::ServiceBase::SharedPtr service;
320  {
321  std::lock_guard<std::recursive_mutex> lock(collection_mutex_);
322  service = this->retrieve_entity(
323  static_cast<const rcl_service_t *>(event.entity_key),
324  current_entities_collection_->services);
325  }
326  if (service) {
327  for (size_t i = 0; i < event.num_events; i++) {
328  execute_service(service);
329  }
330  }
331 
332  break;
333  }
334  case ExecutorEventType::TIMER_EVENT:
335  {
336  timers_manager_->execute_ready_timer(
337  static_cast<const rclcpp::TimerBase *>(event.entity_key), event.data);
338  break;
339  }
340  case ExecutorEventType::WAITABLE_EVENT:
341  {
342  rclcpp::Waitable::SharedPtr waitable;
343  {
344  std::lock_guard<std::recursive_mutex> lock(collection_mutex_);
345  waitable = this->retrieve_entity(
346  static_cast<const rclcpp::Waitable *>(event.entity_key),
347  current_entities_collection_->waitables);
348  }
349  if (waitable) {
350  for (size_t i = 0; i < event.num_events; i++) {
351  const auto data = waitable->take_data_by_entity_id(event.waitable_data);
352  waitable->execute(data);
353  }
354  }
355  break;
356  }
357  }
358 }
359 
360 void
362  rclcpp::CallbackGroup::SharedPtr group_ptr,
363  rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
364  bool notify)
365 {
366  // This field is unused because we don't have to wake up
367  // the executor when a callback group is added.
368  (void)notify;
369  (void)node_ptr;
370 
371  this->entities_collector_->add_callback_group(group_ptr);
372 
373  this->refresh_current_collection_from_callback_groups();
374 }
375 
376 void
378  rclcpp::CallbackGroup::SharedPtr group_ptr, bool notify)
379 {
380  // This field is unused because we don't have to wake up
381  // the executor when a callback group is removed.
382  (void)notify;
383 
384  this->entities_collector_->remove_callback_group(group_ptr);
385 
386  this->refresh_current_collection_from_callback_groups();
387 }
388 
389 std::vector<rclcpp::CallbackGroup::WeakPtr>
391 {
392  this->entities_collector_->update_collections();
393  return this->entities_collector_->get_all_callback_groups();
394 }
395 
396 std::vector<rclcpp::CallbackGroup::WeakPtr>
398 {
399  this->entities_collector_->update_collections();
400  return this->entities_collector_->get_manually_added_callback_groups();
401 }
402 
403 std::vector<rclcpp::CallbackGroup::WeakPtr>
405 {
406  this->entities_collector_->update_collections();
407  return this->entities_collector_->get_automatically_added_callback_groups();
408 }
409 
410 void
411 EventsExecutor::refresh_current_collection_from_callback_groups()
412 {
413  // Do not rebuild if we don't need to.
414  // A rebuild event could be generated, but then
415  // this function could end up being called from somewhere else
416  // before that event gets processed, for example if
417  // a node or callback group is manually added to the executor.
418  const bool notify_waitable_triggered = notify_waitable_event_pushed_.exchange(false);
419  if (!notify_waitable_triggered && !this->entities_collector_->has_pending()) {
420  return;
421  }
422 
423  // Build the new collection
424  this->entities_collector_->update_collections();
425  auto callback_groups = this->entities_collector_->get_all_callback_groups();
427  rclcpp::executors::build_entities_collection(callback_groups, new_collection);
428 
429  // TODO(alsora): this may be implemented in a better way.
430  // We need the notify waitable to be included in the executor "current_collection"
431  // because we need to be able to retrieve events for it.
432  // We could explicitly check for the notify waitable ID when we receive a waitable event
433  // but I think that it's better if the waitable was in the collection and it could be
434  // retrieved in the "standard" way.
435  // To do it, we need to add the notify waitable as an entry in both the new and
436  // current collections such that it's neither added or removed.
437  this->add_notify_waitable_to_collection(new_collection.waitables);
438 
439  // Acquire lock before modifying the current collection
440  std::lock_guard<std::recursive_mutex> lock(collection_mutex_);
441  this->add_notify_waitable_to_collection(current_entities_collection_->waitables);
442 
443  this->refresh_current_collection(new_collection);
444 }
445 
446 void
447 EventsExecutor::refresh_current_collection(
448  const rclcpp::executors::ExecutorEntitiesCollection & new_collection)
449 {
450  // Acquire lock before modifying the current collection
451  std::lock_guard<std::recursive_mutex> lock(collection_mutex_);
452 
453  current_entities_collection_->timers.update(
454  new_collection.timers,
455  [this](rclcpp::TimerBase::SharedPtr timer) {timers_manager_->add_timer(timer);},
456  [this](rclcpp::TimerBase::SharedPtr timer) {timers_manager_->remove_timer(timer);});
457 
458  current_entities_collection_->subscriptions.update(
459  new_collection.subscriptions,
460  [this](auto subscription) {
461  subscription->set_on_new_message_callback(
462  this->create_entity_callback(
463  subscription->get_subscription_handle().get(), ExecutorEventType::SUBSCRIPTION_EVENT));
464  },
465  [](auto subscription) {subscription->clear_on_new_message_callback();});
466 
467  current_entities_collection_->clients.update(
468  new_collection.clients,
469  [this](auto client) {
470  client->set_on_new_response_callback(
471  this->create_entity_callback(
472  client->get_client_handle().get(), ExecutorEventType::CLIENT_EVENT));
473  },
474  [](auto client) {client->clear_on_new_response_callback();});
475 
476  current_entities_collection_->services.update(
477  new_collection.services,
478  [this](auto service) {
479  service->set_on_new_request_callback(
480  this->create_entity_callback(
481  service->get_service_handle().get(), ExecutorEventType::SERVICE_EVENT));
482  },
483  [](auto service) {service->clear_on_new_request_callback();});
484 
485  // DO WE NEED THIS? WE ARE NOT DOING ANYTHING WITH GUARD CONDITIONS
486  /*
487  current_entities_collection_->guard_conditions.update(new_collection.guard_conditions,
488  [](auto guard_condition) {(void)guard_condition;},
489  [](auto guard_condition) {guard_condition->set_on_trigger_callback(nullptr);});
490  */
491 
492  current_entities_collection_->waitables.update(
493  new_collection.waitables,
494  [this](auto waitable) {
495  waitable->set_on_ready_callback(
496  this->create_waitable_callback(waitable.get()));
497  },
498  [](auto waitable) {waitable->clear_on_ready_callback();});
499 }
500 
501 std::function<void(size_t)>
502 EventsExecutor::create_entity_callback(
503  void * entity_key, ExecutorEventType event_type)
504 {
505  std::function<void(size_t)>
506  callback = [this, entity_key, event_type](size_t num_events) {
507  ExecutorEvent event = {entity_key, nullptr, -1, event_type, num_events};
508  this->events_queue_->enqueue(event);
509  };
510  return callback;
511 }
512 
513 std::function<void(size_t, int)>
514 EventsExecutor::create_waitable_callback(const rclcpp::Waitable * entity_key)
515 {
516  std::function<void(size_t, int)>
517  callback = [this, entity_key](size_t num_events, int waitable_data) {
518  ExecutorEvent event =
519  {entity_key, nullptr, waitable_data, ExecutorEventType::WAITABLE_EVENT, num_events};
520  this->events_queue_->enqueue(event);
521  };
522  return callback;
523 }
524 
525 void
526 EventsExecutor::add_notify_waitable_to_collection(
528 {
529  // The notify waitable is not associated to any group, so use an invalid one
530  rclcpp::CallbackGroup::WeakPtr weak_group_ptr;
531  collection.insert(
532  {
533  this->notify_waitable_.get(),
534  {this->notify_waitable_, weak_group_ptr}
535  });
536 }
Coordinate the order and timing of available communication tasks.
Definition: executor.hpp:65
std::shared_ptr< rclcpp::GuardCondition > interrupt_guard_condition_
Guard condition for signaling the rmw layer to wake up for special events.
Definition: executor.hpp:551
static RCLCPP_PUBLIC void execute_service(rclcpp::ServiceBase::SharedPtr service)
Run service server executable.
Definition: executor.cpp:650
std::shared_ptr< rclcpp::Context > context_
The context associated with this executor.
Definition: executor.hpp:559
static RCLCPP_PUBLIC void execute_subscription(rclcpp::SubscriptionBase::SharedPtr subscription)
Run subscription executable.
Definition: executor.cpp:543
static RCLCPP_PUBLIC void execute_client(rclcpp::ClientBase::SharedPtr client)
Run service client executable.
Definition: executor.cpp:662
std::shared_ptr< rclcpp::GuardCondition > shutdown_guard_condition_
Guard condition for signaling the rmw layer to wake up for system shutdown.
Definition: executor.hpp:554
std::atomic_bool spinning
Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins.
Definition: executor.hpp:548
RCLCPP_PUBLIC void remove_callback_group(rclcpp::CallbackGroup::SharedPtr group_ptr, bool notify=true) override
Remove callback group from the executor.
RCLCPP_PUBLIC void spin_once_impl(std::chrono::nanoseconds timeout) override
Internal implementation of spin_once.
RCLCPP_PUBLIC void spin() override
Events executor implementation of spin.
RCLCPP_PUBLIC std::vector< rclcpp::CallbackGroup::WeakPtr > get_all_callback_groups() override
Get callback groups that belong to executor.
virtual RCLCPP_PUBLIC ~EventsExecutor()
Default destructor.
RCLCPP_PUBLIC void add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify=true) override
Add a node to the executor.
RCLCPP_PUBLIC void remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify=true) override
Remove a node from the executor.
RCLCPP_PUBLIC void spin_some_impl(std::chrono::nanoseconds max_duration, bool exhaustive)
Internal implementation of spin_some.
RCLCPP_PUBLIC void add_callback_group(rclcpp::CallbackGroup::SharedPtr group_ptr, rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify=true) override
Add a callback group to an executor.
RCLCPP_PUBLIC std::vector< rclcpp::CallbackGroup::WeakPtr > get_automatically_added_callback_groups_from_nodes() override
Get callback groups that belong to executor.
RCLCPP_PUBLIC void spin_all(std::chrono::nanoseconds max_duration) override
Events executor implementation of spin all.
RCLCPP_PUBLIC std::vector< rclcpp::CallbackGroup::WeakPtr > get_manually_added_callback_groups() override
Get callback groups that belong to executor.
RCLCPP_PUBLIC void spin_some(std::chrono::nanoseconds max_duration=std::chrono::nanoseconds(0)) override
Events executor implementation of spin some.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
RCLCPP_PUBLIC bool ok(rclcpp::Context::SharedPtr context=nullptr)
Check rclcpp's status.
Structure which encapsulates a ROS Client.
Definition: client.h:43
Structure which encapsulates a ROS Service.
Definition: service.h:43
Structure which encapsulates a ROS Subscription.
Definition: subscription.h:40
Options to be passed to the executor constructor.
Represent the total set of entities for a single executor.
TimerCollection timers
Collection of timers currently in use by the executor.
ServiceCollection services
Collection of services currently in use by the executor.
SubscriptionCollection subscriptions
Collection of subscriptions currently in use by the executor.
WaitableCollection waitables
Collection of waitables currently in use by the executor.
ClientCollection clients
Collection of clients currently in use by the executor.