ROS 2 rclcpp + rcl - jazzy  jazzy
ROS 2 C++ Client Library with ROS Client Library
executor.cpp
1 // Copyright 2015 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <algorithm>
16 #include <cassert>
17 #include <chrono>
18 #include <iterator>
19 #include <memory>
20 #include <map>
21 #include <string>
22 #include <type_traits>
23 #include <utility>
24 #include <vector>
25 
26 #include "rcl/allocator.h"
27 #include "rcl/error_handling.h"
28 #include "rclcpp/executors/executor_notify_waitable.hpp"
29 #include "rclcpp/subscription_wait_set_mask.hpp"
30 #include "rcpputils/scope_exit.hpp"
31 
32 #include "rclcpp/dynamic_typesupport/dynamic_message.hpp"
33 #include "rclcpp/exceptions.hpp"
34 #include "rclcpp/executor.hpp"
35 #include "rclcpp/guard_condition.hpp"
36 #include "rclcpp/node.hpp"
37 #include "rclcpp/utilities.hpp"
38 
39 #include "rcutils/logging_macros.h"
40 
41 #include "tracetools/tracetools.h"
42 
43 using namespace std::chrono_literals;
44 
45 using rclcpp::Executor;
46 
49 static constexpr rclcpp::SubscriptionWaitSetMask kDefaultSubscriptionMask = {true, false, false};
50 
52 
53 Executor::Executor(const std::shared_ptr<rclcpp::Context> & context)
54 : spinning(false),
55  entities_need_rebuild_(true),
56  collector_(nullptr),
57  wait_set_({}, {}, {}, {}, {}, {}, context)
58 {
59 }
60 
62 : spinning(false),
63  interrupt_guard_condition_(std::make_shared<rclcpp::GuardCondition>(options.context)),
64  shutdown_guard_condition_(std::make_shared<rclcpp::GuardCondition>(options.context)),
65  context_(options.context),
66  notify_waitable_(std::make_shared<rclcpp::executors::ExecutorNotifyWaitable>(
67  [this]() {
68  this->entities_need_rebuild_.store(true);
69  })),
70  entities_need_rebuild_(true),
71  collector_(notify_waitable_),
72  wait_set_({}, {}, {}, {}, {}, {}, options.context),
73  current_notify_waitable_(notify_waitable_),
74  impl_(std::make_unique<rclcpp::ExecutorImplementation>())
75 {
76  shutdown_callback_handle_ = context_->add_on_shutdown_callback(
77  [weak_gc = std::weak_ptr<rclcpp::GuardCondition>{shutdown_guard_condition_}]() {
78  auto strong_gc = weak_gc.lock();
79  if (strong_gc) {
80  strong_gc->trigger();
81  }
82  });
83 
84  notify_waitable_->add_guard_condition(interrupt_guard_condition_);
85  notify_waitable_->add_guard_condition(shutdown_guard_condition_);
86 
87  // we need to initially rebuild the collection,
88  // so that the notify_waitable_ is added
89  collect_entities();
90 }
91 
93 {
94  std::lock_guard<std::mutex> guard(mutex_);
95 
96  notify_waitable_->remove_guard_condition(interrupt_guard_condition_);
97  notify_waitable_->remove_guard_condition(shutdown_guard_condition_);
98  current_collection_.timers.update(
99  {}, {},
100  [this](auto timer) {wait_set_.remove_timer(timer);});
101 
102  current_collection_.subscriptions.update(
103  {}, {},
104  [this](auto subscription) {
105  wait_set_.remove_subscription(subscription, kDefaultSubscriptionMask);
106  });
107 
108  current_collection_.clients.update(
109  {}, {},
110  [this](auto client) {wait_set_.remove_client(client);});
111 
112  current_collection_.services.update(
113  {}, {},
114  [this](auto service) {wait_set_.remove_service(service);});
115 
116  current_collection_.guard_conditions.update(
117  {}, {},
118  [this](auto guard_condition) {wait_set_.remove_guard_condition(guard_condition);});
119 
120  current_collection_.waitables.update(
121  {}, {},
122  [this](auto waitable) {wait_set_.remove_waitable(waitable);});
123 
124  // Remove shutdown callback handle registered to Context
125  if (!context_->remove_on_shutdown_callback(shutdown_callback_handle_)) {
126  RCUTILS_LOG_ERROR_NAMED(
127  "rclcpp",
128  "failed to remove registered on_shutdown callback");
129  rcl_reset_error();
130  }
131 }
132 
133 void
135 {
136  this->entities_need_rebuild_.store(true);
137 
138  if (!spinning.load() && entities_need_rebuild_.exchange(false)) {
139  std::lock_guard<std::mutex> guard(mutex_);
140  this->collect_entities();
141  }
142 
143  if (notify) {
144  interrupt_guard_condition_->trigger();
145  }
146 }
147 
148 std::vector<rclcpp::CallbackGroup::WeakPtr>
150 {
152  return this->collector_.get_all_callback_groups();
153 }
154 
155 std::vector<rclcpp::CallbackGroup::WeakPtr>
157 {
160 }
161 
162 std::vector<rclcpp::CallbackGroup::WeakPtr>
164 {
167 }
168 
169 void
171  rclcpp::CallbackGroup::SharedPtr group_ptr,
172  rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
173  bool notify)
174 {
175  (void) node_ptr;
176  this->collector_.add_callback_group(group_ptr);
177 
178  try {
179  this->trigger_entity_recollect(notify);
180  } catch (const rclcpp::exceptions::RCLError & ex) {
181  throw std::runtime_error(
182  std::string(
183  "Failed to trigger guard condition on callback group add: ") + ex.what());
184  }
185 }
186 
187 void
188 Executor::add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify)
189 {
190  this->collector_.add_node(node_ptr);
191 
192  try {
193  this->trigger_entity_recollect(notify);
194  } catch (const rclcpp::exceptions::RCLError & ex) {
195  throw std::runtime_error(
196  std::string(
197  "Failed to trigger guard condition on node add: ") + ex.what());
198  }
199 }
200 
201 void
203  rclcpp::CallbackGroup::SharedPtr group_ptr,
204  bool notify)
205 {
206  this->collector_.remove_callback_group(group_ptr);
207 
208  try {
209  this->trigger_entity_recollect(notify);
210  } catch (const rclcpp::exceptions::RCLError & ex) {
211  throw std::runtime_error(
212  std::string(
213  "Failed to trigger guard condition on callback group remove: ") + ex.what());
214  }
215 }
216 
217 void
218 Executor::add_node(std::shared_ptr<rclcpp::Node> node_ptr, bool notify)
219 {
220  this->add_node(node_ptr->get_node_base_interface(), notify);
221 }
222 
223 void
224 Executor::remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify)
225 {
226  this->collector_.remove_node(node_ptr);
227 
228  try {
229  this->trigger_entity_recollect(notify);
230  } catch (const rclcpp::exceptions::RCLError & ex) {
231  throw std::runtime_error(
232  std::string(
233  "Failed to trigger guard condition on node remove: ") + ex.what());
234  }
235 }
236 
237 void
238 Executor::remove_node(std::shared_ptr<rclcpp::Node> node_ptr, bool notify)
239 {
240  this->remove_node(node_ptr->get_node_base_interface(), notify);
241 }
242 
243 void
245  rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node,
246  std::chrono::nanoseconds timeout)
247 {
248  this->add_node(node, false);
249  // non-blocking = true
250  spin_once(timeout);
251  this->remove_node(node, false);
252 }
253 
256  std::chrono::nanoseconds timeout,
257  const std::function<std::future_status(std::chrono::nanoseconds wait_time)> & wait_for_future)
258 {
259  // TODO(wjwwood): does not work recursively; can't call spin_node_until_future_complete
260  // inside a callback executed by an executor.
261 
262  // Check the future before entering the while loop.
263  // If the future is already complete, don't try to spin.
264  std::future_status status = wait_for_future(std::chrono::seconds(0));
265  if (status == std::future_status::ready) {
266  return FutureReturnCode::SUCCESS;
267  }
268 
269  auto end_time = std::chrono::steady_clock::now();
270  std::chrono::nanoseconds timeout_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
271  timeout);
272  if (timeout_ns > std::chrono::nanoseconds::zero()) {
273  end_time += timeout_ns;
274  }
275  std::chrono::nanoseconds timeout_left = timeout_ns;
276 
277  if (spinning.exchange(true)) {
278  throw std::runtime_error("spin_until_future_complete() called while already spinning");
279  }
280  RCPPUTILS_SCOPE_EXIT(wait_result_.reset();this->spinning.store(false););
281  while (rclcpp::ok(this->context_) && spinning.load()) {
282  // Do one item of work.
283  spin_once_impl(timeout_left);
284 
285  // Check if the future is set, return SUCCESS if it is.
286  status = wait_for_future(std::chrono::seconds(0));
287  if (status == std::future_status::ready) {
288  return FutureReturnCode::SUCCESS;
289  }
290  // If the original timeout is < 0, then this is blocking, never TIMEOUT.
291  if (timeout_ns < std::chrono::nanoseconds::zero()) {
292  continue;
293  }
294  // Otherwise check if we still have time to wait, return TIMEOUT if not.
295  auto now = std::chrono::steady_clock::now();
296  if (now >= end_time) {
297  return FutureReturnCode::TIMEOUT;
298  }
299  // Subtract the elapsed time from the original timeout.
300  timeout_left = std::chrono::duration_cast<std::chrono::nanoseconds>(end_time - now);
301  }
302 
303  // The future did not complete before ok() returned false, return INTERRUPTED.
304  return FutureReturnCode::INTERRUPTED;
305 }
306 
307 void
308 Executor::spin_node_some(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node)
309 {
310  this->add_node(node, false);
311  spin_some();
312  this->remove_node(node, false);
313 }
314 
315 void
316 Executor::spin_node_some(std::shared_ptr<rclcpp::Node> node)
317 {
318  this->spin_node_some(node->get_node_base_interface());
319 }
320 
321 void Executor::spin_some(std::chrono::nanoseconds max_duration)
322 {
323  return this->spin_some_impl(max_duration, false);
324 }
325 
326 void
328  rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node,
329  std::chrono::nanoseconds max_duration)
330 {
331  this->add_node(node, false);
332  spin_all(max_duration);
333  this->remove_node(node, false);
334 }
335 
336 void
337 Executor::spin_node_all(std::shared_ptr<rclcpp::Node> node, std::chrono::nanoseconds max_duration)
338 {
339  this->spin_node_all(node->get_node_base_interface(), max_duration);
340 }
341 
342 void Executor::spin_all(std::chrono::nanoseconds max_duration)
343 {
344  if (max_duration < 0ns) {
345  throw std::invalid_argument("max_duration must be greater than or equal to 0");
346  }
347  return this->spin_some_impl(max_duration, true);
348 }
349 
350 void
351 Executor::spin_some_impl(std::chrono::nanoseconds max_duration, bool exhaustive)
352 {
353  auto start = std::chrono::steady_clock::now();
354  auto max_duration_not_elapsed = [max_duration, start]() {
355  if (std::chrono::nanoseconds(0) == max_duration) {
356  // told to spin forever if need be
357  return true;
358  } else if (std::chrono::steady_clock::now() - start < max_duration) {
359  // told to spin only for some maximum amount of time
360  return true;
361  }
362  // spun too long
363  return false;
364  };
365 
366  if (spinning.exchange(true)) {
367  throw std::runtime_error("spin_some() called while already spinning");
368  }
369  RCPPUTILS_SCOPE_EXIT(wait_result_.reset();this->spinning.store(false););
370 
371  // clear the wait result and wait for work without blocking to collect the work
372  // for the first time
373  // both spin_some and spin_all wait for work at the beginning
374  wait_result_.reset();
375  wait_for_work(std::chrono::milliseconds(0));
376  bool entity_states_fully_polled = true;
377 
378  if (entities_need_rebuild_) {
379  // if the last wait triggered a collection rebuild, we need to call
380  // wait_for_work once more, in order to do a collection rebuild and collect
381  // events from the just added entities
382  entity_states_fully_polled = false;
383  }
384 
385  // The logic of this while loop is as follows:
386  //
387  // - while not shutdown, and spinning (not canceled), and not max duration reached...
388  // - try to get an executable item to execute, and execute it if available
389  // - otherwise, reset the wait result, and ...
390  // - if there was no work available just after waiting, break the loop unconditionally
391  // - this is appropriate for both spin_some and spin_all which use this function
392  // - else if exhaustive = true, then wait for work again
393  // - this is only used for spin_all and not spin_some
394  // - else break
395  // - this only occurs with spin_some
396  //
397  // The logic of this loop is subtle and should be carefully changed if at all.
398  // See also:
399  // https://github.com/ros2/rclcpp/issues/2508
400  // https://github.com/ros2/rclcpp/pull/2517
401  while (rclcpp::ok(context_) && spinning.load() && max_duration_not_elapsed()) {
402  AnyExecutable any_exec;
403  if (get_next_ready_executable(any_exec)) {
404  execute_any_executable(any_exec);
405  // during the execution some entity might got ready therefore we need
406  // to repoll the states of all entities
407  entity_states_fully_polled = false;
408  } else {
409  // if nothing is ready, reset the result to clear it
410  wait_result_.reset();
411 
412  if (entity_states_fully_polled) {
413  // there was no work after just waiting, always exit in this case
414  // before the exhaustive condition can be checked
415  break;
416  }
417 
418  if (exhaustive) {
419  // if exhaustive, wait for work again
420  // this only happens for spin_all; spin_some only waits at the start
421  wait_for_work(std::chrono::milliseconds(0));
422  entity_states_fully_polled = true;
423  if (entities_need_rebuild_) {
424  // if the last wait triggered a collection rebuild, we need to call
425  // wait_for_work once more, in order to do a collection rebuild and
426  // collect events from the just added entities
427  entity_states_fully_polled = false;
428  }
429  } else {
430  break;
431  }
432  }
433  }
434 }
435 
436 void
437 Executor::spin_once_impl(std::chrono::nanoseconds timeout)
438 {
439  AnyExecutable any_exec;
440  if (get_next_executable(any_exec, timeout)) {
441  execute_any_executable(any_exec);
442  }
443 }
444 
445 void
446 Executor::spin_once(std::chrono::nanoseconds timeout)
447 {
448  if (spinning.exchange(true)) {
449  throw std::runtime_error("spin_once() called while already spinning");
450  }
451  RCPPUTILS_SCOPE_EXIT(wait_result_.reset();this->spinning.store(false););
452  spin_once_impl(timeout);
453 }
454 
455 void
457 {
458  spinning.store(false);
459  try {
460  interrupt_guard_condition_->trigger();
461  } catch (const rclcpp::exceptions::RCLError & ex) {
462  throw std::runtime_error(
463  std::string("Failed to trigger guard condition in cancel: ") + ex.what());
464  }
465 }
466 
467 void
469 {
470  if (!spinning.load()) {
471  return;
472  }
473 
474  assert(
475  (void("cannot execute an AnyExecutable without a valid callback group"),
476  any_exec.callback_group));
477 
478  if (any_exec.timer) {
479  TRACETOOLS_TRACEPOINT(
480  rclcpp_executor_execute,
481  static_cast<const void *>(any_exec.timer->get_timer_handle().get()));
482  execute_timer(any_exec.timer, any_exec.data);
483  }
484  if (any_exec.subscription) {
485  TRACETOOLS_TRACEPOINT(
486  rclcpp_executor_execute,
487  static_cast<const void *>(any_exec.subscription->get_subscription_handle().get()));
488  execute_subscription(any_exec.subscription);
489  }
490  if (any_exec.service) {
491  execute_service(any_exec.service);
492  }
493  if (any_exec.client) {
494  execute_client(any_exec.client);
495  }
496  if (any_exec.waitable) {
497  const std::shared_ptr<void> & const_data = any_exec.data;
498  any_exec.waitable->execute(const_data);
499  }
500 
501  // Reset the callback_group, regardless of type
502  any_exec.callback_group->can_be_taken_from().store(true);
503 }
504 
505 template<typename Taker, typename Handler>
506 static
507 void
508 take_and_do_error_handling(
509  const char * action_description,
510  const char * topic_or_service_name,
511  Taker take_action,
512  Handler handle_action)
513 {
514  bool taken = false;
515  try {
516  taken = take_action();
517  } catch (const rclcpp::exceptions::RCLError & rcl_error) {
518  RCLCPP_ERROR(
519  rclcpp::get_logger("rclcpp"),
520  "executor %s '%s' unexpectedly failed: %s",
521  action_description,
522  topic_or_service_name,
523  rcl_error.what());
524  }
525  if (taken) {
526  handle_action();
527  } else {
528  // Message or Service was not taken for some reason.
529  // Note that this can be normal, if the underlying middleware needs to
530  // interrupt wait spuriously it is allowed.
531  // So in that case the executor cannot tell the difference in a
532  // spurious wake up and an entity actually having data until trying
533  // to take the data.
534  RCLCPP_DEBUG(
535  rclcpp::get_logger("rclcpp"),
536  "executor %s '%s' failed to take anything",
537  action_description,
538  topic_or_service_name);
539  }
540 }
541 
542 void
543 Executor::execute_subscription(rclcpp::SubscriptionBase::SharedPtr subscription)
544 {
546 
547  rclcpp::MessageInfo message_info;
548  message_info.get_rmw_message_info().from_intra_process = false;
549 
550  switch (subscription->get_delivered_message_kind()) {
551  // Deliver ROS message
552  case rclcpp::DeliveredMessageKind::ROS_MESSAGE:
553  {
554  if (subscription->can_loan_messages()) {
555  // This is the case where a loaned message is taken from the middleware via
556  // inter-process communication, given to the user for their callback,
557  // and then returned.
558  void * loaned_msg = nullptr;
559  // TODO(wjwwood): refactor this into methods on subscription when LoanedMessage
560  // is extened to support subscriptions as well.
561  take_and_do_error_handling(
562  "taking a loaned message from topic",
563  subscription->get_topic_name(),
564  [&]()
565  {
566  rcl_ret_t ret = rcl_take_loaned_message(
567  subscription->get_subscription_handle().get(),
568  &loaned_msg,
569  &message_info.get_rmw_message_info(),
570  nullptr);
571  if (RCL_RET_SUBSCRIPTION_TAKE_FAILED == ret) {
572  return false;
573  } else if (RCL_RET_OK != ret) {
574  rclcpp::exceptions::throw_from_rcl_error(ret);
575  }
576  return true;
577  },
578  [&]() {subscription->handle_loaned_message(loaned_msg, message_info);});
579  if (nullptr != loaned_msg) {
581  subscription->get_subscription_handle().get(), loaned_msg);
582  if (RCL_RET_OK != ret) {
583  RCLCPP_ERROR(
584  rclcpp::get_logger("rclcpp"),
585  "rcl_return_loaned_message_from_subscription() failed for subscription on topic "
586  "'%s': %s",
587  subscription->get_topic_name(), rcl_get_error_string().str);
588  }
589  loaned_msg = nullptr;
590  }
591  } else {
592  // This case is taking a copy of the message data from the middleware via
593  // inter-process communication.
594  std::shared_ptr<void> message = subscription->create_message();
595  take_and_do_error_handling(
596  "taking a message from topic",
597  subscription->get_topic_name(),
598  [&]() {return subscription->take_type_erased(message.get(), message_info);},
599  [&]() {subscription->handle_message(message, message_info);});
600  // TODO(clalancette): In the case that the user is using the MessageMemoryPool,
601  // and they take a shared_ptr reference to the message in the callback, this can
602  // inadvertently return the message to the pool when the user is still using it.
603  // This is a bug that needs to be fixed in the pool, and we should probably have
604  // a custom deleter for the message that actually does the return_message().
605  subscription->return_message(message);
606  }
607  break;
608  }
609 
610  // Deliver serialized message
611  case rclcpp::DeliveredMessageKind::SERIALIZED_MESSAGE:
612  {
613  // This is the case where a copy of the serialized message is taken from
614  // the middleware via inter-process communication.
615  std::shared_ptr<SerializedMessage> serialized_msg =
616  subscription->create_serialized_message();
617  take_and_do_error_handling(
618  "taking a serialized message from topic",
619  subscription->get_topic_name(),
620  [&]() {return subscription->take_serialized(*serialized_msg.get(), message_info);},
621  [&]()
622  {
623  subscription->handle_serialized_message(serialized_msg, message_info);
624  });
625  subscription->return_serialized_message(serialized_msg);
626  break;
627  }
628 
629  // DYNAMIC SUBSCRIPTION ========================================================================
630  // Deliver dynamic message
631  case rclcpp::DeliveredMessageKind::DYNAMIC_MESSAGE:
632  {
633  throw std::runtime_error("Unimplemented");
634  }
635 
636  default:
637  {
638  throw std::runtime_error("Delivered message kind is not supported");
639  }
640  }
641 }
642 
643 void
644 Executor::execute_timer(rclcpp::TimerBase::SharedPtr timer, const std::shared_ptr<void> & data_ptr)
645 {
646  timer->execute_callback(data_ptr);
647 }
648 
649 void
650 Executor::execute_service(rclcpp::ServiceBase::SharedPtr service)
651 {
652  auto request_header = service->create_request_header();
653  std::shared_ptr<void> request = service->create_request();
654  take_and_do_error_handling(
655  "taking a service server request from service",
656  service->get_service_name(),
657  [&]() {return service->take_type_erased_request(request.get(), *request_header);},
658  [&]() {service->handle_request(request_header, request);});
659 }
660 
661 void
662 Executor::execute_client(rclcpp::ClientBase::SharedPtr client)
663 {
664  auto request_header = client->create_request_header();
665  std::shared_ptr<void> response = client->create_response();
666  take_and_do_error_handling(
667  "taking a service client response from service",
668  client->get_service_name(),
669  [&]() {return client->take_type_erased_response(response.get(), *request_header);},
670  [&]() {client->handle_response(request_header, response);});
671 }
672 
673 void
675 {
676  // Updating the entity collection and waitset expires any active result
677  this->wait_result_.reset();
678 
679  // Get the current list of available waitables from the collector.
682  auto callback_groups = this->collector_.get_all_callback_groups();
683  rclcpp::executors::build_entities_collection(callback_groups, collection);
684 
685  // Make a copy of notify waitable so we can continue to mutate the original
686  // one outside of the execute loop.
687  // This prevents the collection of guard conditions in the waitable from changing
688  // while we are waiting on it.
689  if (notify_waitable_) {
690  current_notify_waitable_ = std::make_shared<rclcpp::executors::ExecutorNotifyWaitable>(
692  auto notify_waitable = std::static_pointer_cast<rclcpp::Waitable>(current_notify_waitable_);
693  collection.waitables.insert({notify_waitable.get(), {notify_waitable, {}}});
694  }
695 
696  // We must remove expired entities here, so that we don't continue to use older entities.
697  // See https://github.com/ros2/rclcpp/issues/2180 for more information.
698  current_collection_.remove_expired_entities();
699 
700  // Update each of the groups of entities in the current collection, adding or removing
701  // from the wait set as necessary.
702  current_collection_.timers.update(
703  collection.timers,
704  [this](auto timer) {wait_set_.add_timer(timer);},
705  [this](auto timer) {wait_set_.remove_timer(timer);});
706 
707  current_collection_.subscriptions.update(
708  collection.subscriptions,
709  [this](auto subscription) {
710  wait_set_.add_subscription(subscription, kDefaultSubscriptionMask);
711  },
712  [this](auto subscription) {
713  wait_set_.remove_subscription(subscription, kDefaultSubscriptionMask);
714  });
715 
716  current_collection_.clients.update(
717  collection.clients,
718  [this](auto client) {wait_set_.add_client(client);},
719  [this](auto client) {wait_set_.remove_client(client);});
720 
721  current_collection_.services.update(
722  collection.services,
723  [this](auto service) {wait_set_.add_service(service);},
724  [this](auto service) {wait_set_.remove_service(service);});
725 
726  current_collection_.guard_conditions.update(
727  collection.guard_conditions,
728  [this](auto guard_condition) {wait_set_.add_guard_condition(guard_condition);},
729  [this](auto guard_condition) {wait_set_.remove_guard_condition(guard_condition);});
730 
731  current_collection_.waitables.update(
732  collection.waitables,
733  [this](auto waitable) {wait_set_.add_waitable(waitable);},
734  [this](auto waitable) {wait_set_.remove_waitable(waitable);});
735 
736  // In the case that an entity already has an expired weak pointer
737  // before being removed from the waitset, additionally prune the waitset.
738  this->wait_set_.prune_deleted_entities();
739 }
740 
741 void
742 Executor::wait_for_work(std::chrono::nanoseconds timeout)
743 {
744  TRACETOOLS_TRACEPOINT(rclcpp_executor_wait_for_work, timeout.count());
745 
746  // Clear any previous wait result
747  this->wait_result_.reset();
748 
749  // we need to make sure that callback groups don't get out of scope
750  // during the wait. As in jazzy, they are not covered by the DynamicStorage,
751  // we explicitly hold them here as a bugfix
752  std::vector<rclcpp::CallbackGroup::SharedPtr> cbgs;
753 
754  {
755  std::lock_guard<std::mutex> guard(mutex_);
756 
757  if (this->entities_need_rebuild_.exchange(false) || current_collection_.empty()) {
758  this->collect_entities();
759  }
760 
761  auto callback_groups = this->collector_.get_all_callback_groups();
762  cbgs.resize(callback_groups.size());
763  for(const auto & w_ptr : callback_groups) {
764  auto shr_ptr = w_ptr.lock();
765  if(shr_ptr) {
766  cbgs.push_back(std::move(shr_ptr));
767  }
768  }
769  }
770 
771  this->wait_result_.emplace(wait_set_.wait(timeout));
772 
773  // drop references to the callback groups, before trying to execute anything
774  cbgs.clear();
775 
776  if (!this->wait_result_ || this->wait_result_->kind() == WaitResultKind::Empty) {
777  RCUTILS_LOG_WARN_NAMED(
778  "rclcpp",
779  "empty wait set received in wait(). This should never happen.");
780  } else {
781  if (this->wait_result_->kind() == WaitResultKind::Ready && current_notify_waitable_) {
782  auto & rcl_wait_set = this->wait_result_->get_wait_set().get_rcl_wait_set();
783  if (current_notify_waitable_->is_ready(rcl_wait_set)) {
784  current_notify_waitable_->execute(current_notify_waitable_->take_data());
785  }
786  }
787  }
788 }
789 
790 bool
792 {
793  TRACETOOLS_TRACEPOINT(rclcpp_executor_get_next_ready);
794 
795  bool valid_executable = false;
796 
797  if (!wait_result_.has_value() || wait_result_->kind() != rclcpp::WaitResultKind::Ready) {
798  return false;
799  }
800 
801  if (!valid_executable) {
802  size_t current_timer_index = 0;
803  while (true) {
804  auto [timer, timer_index] = wait_result_->peek_next_ready_timer(current_timer_index);
805  if (nullptr == timer) {
806  break;
807  }
808  current_timer_index = timer_index;
809  auto entity_iter = current_collection_.timers.find(timer->get_timer_handle().get());
810  if (entity_iter != current_collection_.timers.end()) {
811  auto callback_group = entity_iter->second.callback_group.lock();
812  if (!callback_group || !callback_group->can_be_taken_from()) {
813  current_timer_index++;
814  continue;
815  }
816  // At this point the timer is either ready for execution or was perhaps
817  // it was canceled, based on the result of call(), but either way it
818  // should not be checked again from peek_next_ready_timer(), so clear
819  // it from the wait result.
820  wait_result_->clear_timer_with_index(current_timer_index);
821  // Check that the timer should be called still, i.e. it wasn't canceled.
822  any_executable.data = timer->call();
823  if (!any_executable.data) {
824  current_timer_index++;
825  continue;
826  }
827  any_executable.timer = timer;
828  any_executable.callback_group = callback_group;
829  valid_executable = true;
830  break;
831  }
832  current_timer_index++;
833  }
834  }
835 
836  if (!valid_executable) {
837  while (auto subscription = wait_result_->next_ready_subscription()) {
838  auto entity_iter = current_collection_.subscriptions.find(
839  subscription->get_subscription_handle().get());
840  if (entity_iter != current_collection_.subscriptions.end()) {
841  auto callback_group = entity_iter->second.callback_group.lock();
842  if (!callback_group || !callback_group->can_be_taken_from()) {
843  continue;
844  }
845  any_executable.subscription = subscription;
846  any_executable.callback_group = callback_group;
847  valid_executable = true;
848  break;
849  }
850  }
851  }
852 
853  if (!valid_executable) {
854  while (auto service = wait_result_->next_ready_service()) {
855  auto entity_iter = current_collection_.services.find(service->get_service_handle().get());
856  if (entity_iter != current_collection_.services.end()) {
857  auto callback_group = entity_iter->second.callback_group.lock();
858  if (!callback_group || !callback_group->can_be_taken_from()) {
859  continue;
860  }
861  any_executable.service = service;
862  any_executable.callback_group = callback_group;
863  valid_executable = true;
864  break;
865  }
866  }
867  }
868 
869  if (!valid_executable) {
870  while (auto client = wait_result_->next_ready_client()) {
871  auto entity_iter = current_collection_.clients.find(client->get_client_handle().get());
872  if (entity_iter != current_collection_.clients.end()) {
873  auto callback_group = entity_iter->second.callback_group.lock();
874  if (!callback_group || !callback_group->can_be_taken_from()) {
875  continue;
876  }
877  any_executable.client = client;
878  any_executable.callback_group = callback_group;
879  valid_executable = true;
880  break;
881  }
882  }
883  }
884 
885  if (!valid_executable) {
886  while (auto waitable = wait_result_->next_ready_waitable()) {
887  auto entity_iter = current_collection_.waitables.find(waitable.get());
888  if (entity_iter != current_collection_.waitables.end()) {
889  auto callback_group = entity_iter->second.callback_group.lock();
890  if (!callback_group || !callback_group->can_be_taken_from()) {
891  continue;
892  }
893  any_executable.waitable = waitable;
894  any_executable.callback_group = callback_group;
895  any_executable.data = waitable->take_data();
896  valid_executable = true;
897  break;
898  }
899  }
900  }
901 
902  if (any_executable.callback_group) {
903  if (any_executable.callback_group->type() == CallbackGroupType::MutuallyExclusive) {
904  assert(any_executable.callback_group->can_be_taken_from().load());
905  any_executable.callback_group->can_be_taken_from().store(false);
906  }
907  }
908 
909 
910  return valid_executable;
911 }
912 
913 bool
914 Executor::get_next_executable(AnyExecutable & any_executable, std::chrono::nanoseconds timeout)
915 {
916  bool success = false;
917  // Check to see if there are any subscriptions or timers needing service
918  // TODO(wjwwood): improve run to run efficiency of this function
919  success = get_next_ready_executable(any_executable);
920  // If there are none
921  if (!success) {
922  // Wait for subscriptions or timers to work on
923  wait_for_work(timeout);
924  if (!spinning.load()) {
925  return false;
926  }
927  // Try again
928  success = get_next_ready_executable(any_executable);
929  }
930  return success;
931 }
932 
933 bool
935 {
936  return spinning;
937 }
Coordinate the order and timing of available communication tasks.
Definition: executor.hpp:65
virtual RCLCPP_PUBLIC void spin_node_some(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node)
Add a node, complete all immediately available work, and remove the node.
Definition: executor.cpp:308
std::shared_ptr< rclcpp::GuardCondition > interrupt_guard_condition_
Guard condition for signaling the rmw layer to wake up for special events.
Definition: executor.hpp:551
RCLCPP_PUBLIC void spin_node_once_nanoseconds(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node, std::chrono::nanoseconds timeout)
Add a node to executor, execute the next available unit of work, and remove the node.
Definition: executor.cpp:244
virtual RCLCPP_PUBLIC ~Executor()
Default destructor.
Definition: executor.cpp:92
virtual RCLCPP_PUBLIC std::vector< rclcpp::CallbackGroup::WeakPtr > get_manually_added_callback_groups()
Get callback groups that belong to executor.
Definition: executor.cpp:156
virtual RCLCPP_PUBLIC void remove_callback_group(rclcpp::CallbackGroup::SharedPtr group_ptr, bool notify=true)
Remove a callback group from the executor.
Definition: executor.cpp:202
virtual RCLCPP_PUBLIC void add_callback_group(rclcpp::CallbackGroup::SharedPtr group_ptr, rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify=true)
Add a callback group to an executor.
Definition: executor.cpp:170
RCLCPP_PUBLIC void wait_for_work(std::chrono::nanoseconds timeout=std::chrono::nanoseconds(-1))
Block until more work becomes avilable or timeout is reached.
Definition: executor.cpp:742
static RCLCPP_PUBLIC void execute_service(rclcpp::ServiceBase::SharedPtr service)
Run service server executable.
Definition: executor.cpp:650
virtual RCLCPP_PUBLIC void remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify=true)
Remove a node from the executor.
Definition: executor.cpp:224
RCLCPP_PUBLIC Executor(const rclcpp::ExecutorOptions &options=rclcpp::ExecutorOptions())
Default constructor.
Definition: executor.cpp:61
virtual RCLCPP_PUBLIC void spin_node_all(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node, std::chrono::nanoseconds max_duration)
Add a node, complete all immediately available work exhaustively, and remove the node.
Definition: executor.cpp:327
RCLCPP_PUBLIC bool get_next_executable(AnyExecutable &any_executable, std::chrono::nanoseconds timeout=std::chrono::nanoseconds(-1))
Wait for executable in ready state and populate union structure.
Definition: executor.cpp:914
virtual RCLCPP_PUBLIC void spin_once(std::chrono::nanoseconds timeout=std::chrono::nanoseconds(-1))
Collect work once and execute the next available work, optionally within a duration.
Definition: executor.cpp:446
virtual RCLCPP_PUBLIC void spin_some(std::chrono::nanoseconds max_duration=std::chrono::nanoseconds(0))
Collect work once and execute all available work, optionally within a max duration.
Definition: executor.cpp:321
virtual RCLCPP_PUBLIC void cancel()
Cancel any running spin* function, causing it to return.
Definition: executor.cpp:456
RCLCPP_PUBLIC bool is_spinning()
Returns true if the executor is currently spinning.
Definition: executor.cpp:934
RCLCPP_PUBLIC bool get_next_ready_executable(AnyExecutable &any_executable)
Check for executable in ready state and populate union structure.
Definition: executor.cpp:791
std::shared_ptr< rclcpp::Context > context_
The context associated with this executor.
Definition: executor.hpp:559
static RCLCPP_PUBLIC void execute_timer(rclcpp::TimerBase::SharedPtr timer, const std::shared_ptr< void > &data_ptr)
Run timer executable.
Definition: executor.cpp:644
void trigger_entity_recollect(bool notify)
This function triggers a recollect of all entities that are registered to the executor.
Definition: executor.cpp:134
static RCLCPP_PUBLIC void execute_subscription(rclcpp::SubscriptionBase::SharedPtr subscription)
Run subscription executable.
Definition: executor.cpp:543
RCLCPP_PUBLIC void collect_entities()
Gather all of the waitable entities from associated nodes and callback groups.
Definition: executor.cpp:674
virtual RCLCPP_PUBLIC std::vector< rclcpp::CallbackGroup::WeakPtr > get_automatically_added_callback_groups_from_nodes()
Get callback groups that belong to executor.
Definition: executor.cpp:163
virtual RCLCPP_PUBLIC FutureReturnCode spin_until_future_complete_impl(std::chrono::nanoseconds timeout, const std::function< std::future_status(std::chrono::nanoseconds wait_time)> &wait_for_future)
Spin (blocking) until the future is complete, it times out waiting, or rclcpp is interrupted.
Definition: executor.cpp:255
rclcpp::OnShutdownCallbackHandle shutdown_callback_handle_
shutdown callback handle registered to Context
Definition: executor.hpp:594
virtual RCLCPP_PUBLIC void add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify=true)
Add a node to the executor.
Definition: executor.cpp:188
RCLCPP_PUBLIC void execute_any_executable(AnyExecutable &any_exec)
Find the next available executable and do the work associated with it.
Definition: executor.cpp:468
std::shared_ptr< rclcpp::executors::ExecutorNotifyWaitable > notify_waitable_
Waitable containing guard conditions controlling the executor flow.
Definition: executor.hpp:574
virtual RCLCPP_PUBLIC std::vector< rclcpp::CallbackGroup::WeakPtr > get_all_callback_groups()
Get callback groups that belong to executor.
Definition: executor.cpp:149
static RCLCPP_PUBLIC void execute_client(rclcpp::ClientBase::SharedPtr client)
Run service client executable.
Definition: executor.cpp:662
rclcpp::executors::ExecutorEntitiesCollector collector_
Collector used to associate executable entities from nodes and guard conditions.
Definition: executor.hpp:579
virtual RCLCPP_PUBLIC void spin_all(std::chrono::nanoseconds max_duration)
Collect and execute work repeatedly within a duration or until no more work is available.
Definition: executor.cpp:342
RCLCPP_PUBLIC void spin_some_impl(std::chrono::nanoseconds max_duration, bool exhaustive)
Collect work and execute available work, optionally within a duration.
Definition: executor.cpp:351
std::shared_ptr< rclcpp::GuardCondition > shutdown_guard_condition_
Guard condition for signaling the rmw layer to wake up for system shutdown.
Definition: executor.hpp:554
std::atomic_bool spinning
Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins.
Definition: executor.hpp:548
A condition that can be waited on in a single wait set and asynchronously triggered.
Additional meta data about messages taken from subscriptions.
const rmw_message_info_t & get_rmw_message_info() const
Return the message info as the underlying rmw message info type.
Options used to determine what parts of a subscription get added to or removed from a wait set.
Created when the return code does not match one of the other specialized exceptions.
Definition: exceptions.hpp:153
void update(const EntityCollection< EntityKeyType, EntityValueType > &other, std::function< void(const EntitySharedPtr &)> on_added, std::function< void(const EntitySharedPtr &)> on_removed)
Update this collection based on the contents of another collection.
RCLCPP_PUBLIC void add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr)
Add a node to the entity collector.
RCLCPP_PUBLIC void update_collections()
Update the underlying collections.
RCLCPP_PUBLIC void remove_callback_group(rclcpp::CallbackGroup::SharedPtr group_ptr)
Remove a callback group from the entity collector.
RCLCPP_PUBLIC std::vector< rclcpp::CallbackGroup::WeakPtr > get_all_callback_groups() const
Get all callback groups known to this entity collector.
RCLCPP_PUBLIC std::vector< rclcpp::CallbackGroup::WeakPtr > get_automatically_added_callback_groups() const
Get automatically-added callback groups known to this entity collector.
RCLCPP_PUBLIC void remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr)
Remove a node from the entity collector.
RCLCPP_PUBLIC std::vector< rclcpp::CallbackGroup::WeakPtr > get_manually_added_callback_groups() const
Get manually-added callback groups known to this entity collector.
RCLCPP_PUBLIC void add_callback_group(rclcpp::CallbackGroup::SharedPtr group_ptr)
Add a callback group to the entity collector.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
FutureReturnCode
Return codes to be used with spin_until_future_complete.
RCLCPP_PUBLIC bool ok(rclcpp::Context::SharedPtr context=nullptr)
Check rclcpp's status.
RCLCPP_PUBLIC Logger get_logger(const std::string &name)
Return a named logger.
Definition: logger.cpp:33
Options to be passed to the executor constructor.
Represent the total set of entities for a single executor.
TimerCollection timers
Collection of timers currently in use by the executor.
GuardConditionCollection guard_conditions
Collection of guard conditions currently in use by the executor.
ServiceCollection services
Collection of services currently in use by the executor.
SubscriptionCollection subscriptions
Collection of subscriptions currently in use by the executor.
WaitableCollection waitables
Collection of waitables currently in use by the executor.
ClientCollection clients
Collection of clients currently in use by the executor.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_return_loaned_message_from_subscription(const rcl_subscription_t *subscription, void *loaned_message)
Return a loaned message from a topic using a rcl subscription.
Definition: subscription.c:723
#define RCL_RET_OK
Success return code.
Definition: types.h:27
rmw_ret_t rcl_ret_t
The type that holds an rcl return code.
Definition: types.h:24