ROS 2 rclcpp + rcl - rolling  rolling-a919a6e5
ROS 2 C++ Client Library with ROS Client Library
executor.cpp
1 // Copyright 2015 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <algorithm>
16 #include <cassert>
17 #include <chrono>
18 #include <iterator>
19 #include <memory>
20 #include <map>
21 #include <string>
22 #include <type_traits>
23 #include <utility>
24 #include <vector>
25 
26 #include "rcl/allocator.h"
27 #include "rcl/error_handling.h"
28 #include "rclcpp/executors/executor_notify_waitable.hpp"
29 #include "rclcpp/subscription_wait_set_mask.hpp"
30 #include "rcpputils/scope_exit.hpp"
31 
32 #include "rclcpp/dynamic_typesupport/dynamic_message.hpp"
33 #include "rclcpp/exceptions.hpp"
34 #include "rclcpp/executor.hpp"
35 #include "rclcpp/guard_condition.hpp"
36 #include "rclcpp/node.hpp"
37 #include "rclcpp/utilities.hpp"
38 
39 #include "rcutils/logging_macros.h"
40 
41 #include "tracetools/tracetools.h"
42 
43 using namespace std::chrono_literals;
44 
45 using rclcpp::Executor;
46 
49 static constexpr rclcpp::SubscriptionWaitSetMask kDefaultSubscriptionMask = {true, false, false};
50 
52 
53 Executor::Executor(const std::shared_ptr<rclcpp::Context> & context)
54 : spinning(false),
55  entities_need_rebuild_(true),
56  collector_(nullptr),
57  wait_set_({}, {}, {}, {}, {}, {}, context)
58 {
59 }
60 
62 : spinning(false),
63  interrupt_guard_condition_(std::make_shared<rclcpp::GuardCondition>(options.context)),
64  shutdown_guard_condition_(std::make_shared<rclcpp::GuardCondition>(options.context)),
65  context_(options.context),
66  notify_waitable_(std::make_shared<rclcpp::executors::ExecutorNotifyWaitable>(
67  [this]() {
68  this->entities_need_rebuild_.store(true);
69  }, options.context)),
70  entities_need_rebuild_(true),
71  collector_(notify_waitable_),
72  wait_set_({}, {}, {}, {}, {}, {}, options.context),
73  current_notify_waitable_(notify_waitable_),
74  impl_(std::make_unique<rclcpp::ExecutorImplementation>())
75 {
76  shutdown_callback_handle_ = context_->add_on_shutdown_callback(
77  [weak_gc = std::weak_ptr<rclcpp::GuardCondition>{shutdown_guard_condition_}]() {
78  auto strong_gc = weak_gc.lock();
79  if (strong_gc) {
80  strong_gc->trigger();
81  }
82  });
83 
84  notify_waitable_->add_guard_condition(interrupt_guard_condition_);
85  notify_waitable_->add_guard_condition(shutdown_guard_condition_);
86 
87  // we need to initially rebuild the collection,
88  // so that the notify_waitable_ is added
89  collect_entities();
90 }
91 
93 {
94  std::lock_guard<std::mutex> guard(mutex_);
95 
96  notify_waitable_->remove_guard_condition(interrupt_guard_condition_);
97  notify_waitable_->remove_guard_condition(shutdown_guard_condition_);
98  current_collection_.timers.update(
99  {}, {},
100  [this](auto timer) {wait_set_.remove_timer(timer);});
101 
102  current_collection_.subscriptions.update(
103  {}, {},
104  [this](auto subscription) {
105  wait_set_.remove_subscription(subscription, kDefaultSubscriptionMask);
106  });
107 
108  current_collection_.clients.update(
109  {}, {},
110  [this](auto client) {wait_set_.remove_client(client);});
111 
112  current_collection_.services.update(
113  {}, {},
114  [this](auto service) {wait_set_.remove_service(service);});
115 
116  current_collection_.guard_conditions.update(
117  {}, {},
118  [this](auto guard_condition) {wait_set_.remove_guard_condition(guard_condition);});
119 
120  current_collection_.waitables.update(
121  {}, {},
122  [this](auto waitable) {wait_set_.remove_waitable(waitable);});
123 
124  // Remove shutdown callback handle registered to Context
125  if (!context_->remove_on_shutdown_callback(shutdown_callback_handle_)) {
126  RCUTILS_LOG_ERROR_NAMED(
127  "rclcpp",
128  "failed to remove registered on_shutdown callback");
129  rcl_reset_error();
130  }
131 }
132 
133 void
135 {
136  this->entities_need_rebuild_.store(true);
137 
138  if (!spinning.load() && entities_need_rebuild_.exchange(false)) {
139  std::lock_guard<std::mutex> guard(mutex_);
140  this->collect_entities();
141  }
142 
143  if (notify) {
144  interrupt_guard_condition_->trigger();
145  }
146 }
147 
148 std::vector<rclcpp::CallbackGroup::WeakPtr>
150 {
152  return this->collector_.get_all_callback_groups();
153 }
154 
155 std::vector<rclcpp::CallbackGroup::WeakPtr>
157 {
160 }
161 
162 std::vector<rclcpp::CallbackGroup::WeakPtr>
164 {
167 }
168 
169 void
171  rclcpp::CallbackGroup::SharedPtr group_ptr,
172  [[maybe_unused]] rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
173  bool notify)
174 {
175  this->collector_.add_callback_group(group_ptr);
176 
177  try {
178  this->handle_updated_entities(notify);
179  } catch (const rclcpp::exceptions::RCLError & ex) {
180  throw std::runtime_error(
181  std::string(
182  "Failed to handle entities update on callback group add: ") + ex.what());
183  }
184 }
185 
186 void
187 Executor::add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify)
188 {
189  this->collector_.add_node(node_ptr);
190 
191  try {
192  this->handle_updated_entities(notify);
193  } catch (const rclcpp::exceptions::RCLError & ex) {
194  throw std::runtime_error(
195  std::string(
196  "Failed to handle entities update on node add: ") + ex.what());
197  }
198 }
199 
200 void
202  rclcpp::CallbackGroup::SharedPtr group_ptr,
203  bool notify)
204 {
205  this->collector_.remove_callback_group(group_ptr);
206 
207  try {
208  this->handle_updated_entities(notify);
209  } catch (const rclcpp::exceptions::RCLError & ex) {
210  throw std::runtime_error(
211  std::string(
212  "Failed to handle entities update on callback group remove: ") + ex.what());
213  }
214 }
215 
216 void
217 Executor::add_node(std::shared_ptr<rclcpp::Node> node_ptr, bool notify)
218 {
219  this->add_node(node_ptr->get_node_base_interface(), notify);
220 }
221 
222 void
223 Executor::remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify)
224 {
225  this->collector_.remove_node(node_ptr);
226 
227  try {
228  this->handle_updated_entities(notify);
229  } catch (const rclcpp::exceptions::RCLError & ex) {
230  throw std::runtime_error(
231  std::string(
232  "Failed to handle entities update on node remove: ") + ex.what());
233  }
234 }
235 
236 void
237 Executor::remove_node(std::shared_ptr<rclcpp::Node> node_ptr, bool notify)
238 {
239  this->remove_node(node_ptr->get_node_base_interface(), notify);
240 }
241 
242 void
244  rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node,
245  std::chrono::nanoseconds timeout)
246 {
247  this->add_node(node, false);
248  // non-blocking = true
249  spin_once(timeout);
250  this->remove_node(node, false);
251 }
252 
255  std::chrono::nanoseconds timeout,
256  const std::function<std::future_status(std::chrono::nanoseconds wait_time)> & wait_for_future)
257 {
258  // TODO(wjwwood): does not work recursively; can't call spin_node_until_future_complete
259  // inside a callback executed by an executor.
260 
261  // Check the future before entering the while loop.
262  // If the future is already complete, don't try to spin.
263  std::future_status status = wait_for_future(std::chrono::seconds(0));
264  if (status == std::future_status::ready) {
265  return FutureReturnCode::SUCCESS;
266  }
267 
268  auto end_time = std::chrono::steady_clock::now();
269  std::chrono::nanoseconds timeout_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
270  timeout);
271  if (timeout_ns > std::chrono::nanoseconds::zero()) {
272  end_time += timeout_ns;
273  }
274  std::chrono::nanoseconds timeout_left = timeout_ns;
275 
276  if (spinning.exchange(true)) {
277  throw std::runtime_error("spin_until_future_complete() called while already spinning");
278  }
279  RCPPUTILS_SCOPE_EXIT(wait_result_.reset();this->spinning.store(false););
280  while (rclcpp::ok(this->context_) && spinning.load()) {
281  // Do one item of work.
282  spin_once_impl(timeout_left);
283 
284  // Check if the future is set, return SUCCESS if it is.
285  status = wait_for_future(std::chrono::seconds(0));
286  if (status == std::future_status::ready) {
287  return FutureReturnCode::SUCCESS;
288  }
289  // If the original timeout is < 0, then this is blocking, never TIMEOUT.
290  if (timeout_ns < std::chrono::nanoseconds::zero()) {
291  continue;
292  }
293  // Otherwise check if we still have time to wait, return TIMEOUT if not.
294  auto now = std::chrono::steady_clock::now();
295  if (now >= end_time) {
296  return FutureReturnCode::TIMEOUT;
297  }
298  // Subtract the elapsed time from the original timeout.
299  timeout_left = std::chrono::duration_cast<std::chrono::nanoseconds>(end_time - now);
300  }
301 
302  // The future did not complete before ok() returned false, return INTERRUPTED.
303  return FutureReturnCode::INTERRUPTED;
304 }
305 
306 void
307 Executor::spin_node_some(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node)
308 {
309  this->add_node(node, false);
310  spin_some();
311  this->remove_node(node, false);
312 }
313 
314 void
315 Executor::spin_node_some(std::shared_ptr<rclcpp::Node> node)
316 {
317  this->spin_node_some(node->get_node_base_interface());
318 }
319 
320 void Executor::spin_some(std::chrono::nanoseconds max_duration)
321 {
322  return this->spin_some_impl(max_duration, false);
323 }
324 
325 void
327  rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node,
328  std::chrono::nanoseconds max_duration)
329 {
330  this->add_node(node, false);
331  spin_all(max_duration);
332  this->remove_node(node, false);
333 }
334 
335 void
336 Executor::spin_node_all(std::shared_ptr<rclcpp::Node> node, std::chrono::nanoseconds max_duration)
337 {
338  this->spin_node_all(node->get_node_base_interface(), max_duration);
339 }
340 
341 void Executor::spin_all(std::chrono::nanoseconds max_duration)
342 {
343  if (max_duration < 0ns) {
344  throw std::invalid_argument("max_duration must be greater than or equal to 0");
345  }
346  return this->spin_some_impl(max_duration, true);
347 }
348 
349 void
350 Executor::spin_some_impl(std::chrono::nanoseconds max_duration, bool exhaustive)
351 {
352  auto start = std::chrono::steady_clock::now();
353  auto max_duration_not_elapsed = [max_duration, start]() {
354  if (std::chrono::nanoseconds(0) == max_duration) {
355  // told to spin forever if need be
356  return true;
357  } else if (std::chrono::steady_clock::now() - start < max_duration) {
358  // told to spin only for some maximum amount of time
359  return true;
360  }
361  // spun too long
362  return false;
363  };
364 
365  if (spinning.exchange(true)) {
366  throw std::runtime_error("spin_some() called while already spinning");
367  }
368  RCPPUTILS_SCOPE_EXIT(wait_result_.reset();this->spinning.store(false););
369 
370  // clear the wait result and wait for work without blocking to collect the work
371  // for the first time
372  // both spin_some and spin_all wait for work at the beginning
373  wait_result_.reset();
374  wait_for_work(std::chrono::milliseconds(0));
375  bool entity_states_fully_polled = true;
376 
377  if (entities_need_rebuild_) {
378  // if the last wait triggered a collection rebuild, we need to call
379  // wait_for_work once more, in order to do a collection rebuild and collect
380  // events from the just added entities
381  entity_states_fully_polled = false;
382  }
383 
384  // The logic of this while loop is as follows:
385  //
386  // - while not shutdown, and spinning (not canceled), and not max duration reached...
387  // - try to get an executable item to execute, and execute it if available
388  // - otherwise, reset the wait result, and ...
389  // - if there was no work available just after waiting, break the loop unconditionally
390  // - this is appropriate for both spin_some and spin_all which use this function
391  // - else if exhaustive = true, then wait for work again
392  // - this is only used for spin_all and not spin_some
393  // - else break
394  // - this only occurs with spin_some
395  //
396  // The logic of this loop is subtle and should be carefully changed if at all.
397  // See also:
398  // https://github.com/ros2/rclcpp/issues/2508
399  // https://github.com/ros2/rclcpp/pull/2517
400  while (rclcpp::ok(context_) && spinning.load() && max_duration_not_elapsed()) {
401  AnyExecutable any_exec;
402  if (get_next_ready_executable(any_exec)) {
403  execute_any_executable(any_exec);
404  // during the execution some entity might got ready therefore we need
405  // to repoll the states of all entities
406  entity_states_fully_polled = false;
407  } else {
408  // if nothing is ready, reset the result to clear it
409  wait_result_.reset();
410 
411  if (entity_states_fully_polled) {
412  // there was no work after just waiting, always exit in this case
413  // before the exhaustive condition can be checked
414  break;
415  }
416 
417  if (exhaustive) {
418  // if exhaustive, wait for work again
419  // this only happens for spin_all; spin_some only waits at the start
420  wait_for_work(std::chrono::milliseconds(0));
421  entity_states_fully_polled = true;
422  if (entities_need_rebuild_) {
423  // if the last wait triggered a collection rebuild, we need to call
424  // wait_for_work once more, in order to do a collection rebuild and
425  // collect events from the just added entities
426  entity_states_fully_polled = false;
427  }
428  } else {
429  break;
430  }
431  }
432  }
433 }
434 
435 void
436 Executor::spin_once_impl(std::chrono::nanoseconds timeout)
437 {
438  AnyExecutable any_exec;
439  if (get_next_executable(any_exec, timeout)) {
440  execute_any_executable(any_exec);
441  }
442 }
443 
444 void
445 Executor::spin_once(std::chrono::nanoseconds timeout)
446 {
447  if (spinning.exchange(true)) {
448  throw std::runtime_error("spin_once() called while already spinning");
449  }
450  RCPPUTILS_SCOPE_EXIT(wait_result_.reset();this->spinning.store(false););
451  spin_once_impl(timeout);
452 }
453 
454 void
456 {
457  spinning.store(false);
458  try {
459  interrupt_guard_condition_->trigger();
460  } catch (const rclcpp::exceptions::RCLError & ex) {
461  throw std::runtime_error(
462  std::string("Failed to trigger guard condition in cancel: ") + ex.what());
463  }
464 }
465 
466 void
468 {
469  if (!spinning.load()) {
470  return;
471  }
472 
473  assert(
474  (void("cannot execute an AnyExecutable without a valid callback group"),
475  any_exec.callback_group));
476 
477  if (any_exec.timer) {
478  TRACETOOLS_TRACEPOINT(
479  rclcpp_executor_execute,
480  static_cast<const void *>(any_exec.timer->get_timer_handle().get()));
481  execute_timer(any_exec.timer, any_exec.data);
482  }
483  if (any_exec.subscription) {
484  TRACETOOLS_TRACEPOINT(
485  rclcpp_executor_execute,
486  static_cast<const void *>(any_exec.subscription->get_subscription_handle().get()));
487  execute_subscription(any_exec.subscription);
488  }
489  if (any_exec.service) {
490  execute_service(any_exec.service);
491  }
492  if (any_exec.client) {
493  execute_client(any_exec.client);
494  }
495  if (any_exec.waitable) {
496  const std::shared_ptr<void> & const_data = any_exec.data;
497  any_exec.waitable->execute(const_data);
498  }
499 
500  // Reset the callback_group, regardless of type
501  any_exec.callback_group->can_be_taken_from().store(true);
502 }
503 
504 template<typename Taker, typename Handler>
505 static
506 void
507 take_and_do_error_handling(
508  const char * action_description,
509  const char * topic_or_service_name,
510  Taker take_action,
511  Handler handle_action)
512 {
513  bool taken = false;
514  try {
515  taken = take_action();
516  } catch (const rclcpp::exceptions::RCLError & rcl_error) {
517  RCLCPP_ERROR(
518  rclcpp::get_logger("rclcpp"),
519  "executor %s '%s' unexpectedly failed: %s",
520  action_description,
521  topic_or_service_name,
522  rcl_error.what());
523  }
524  if (taken) {
525  handle_action();
526  } else {
527  // Message or Service was not taken for some reason.
528  // Note that this can be normal, if the underlying middleware needs to
529  // interrupt wait spuriously it is allowed.
530  // So in that case the executor cannot tell the difference in a
531  // spurious wake up and an entity actually having data until trying
532  // to take the data.
533  RCLCPP_DEBUG(
534  rclcpp::get_logger("rclcpp"),
535  "executor %s '%s' failed to take anything",
536  action_description,
537  topic_or_service_name);
538  }
539 }
540 
541 void
542 Executor::execute_subscription(rclcpp::SubscriptionBase::SharedPtr subscription)
543 {
545 
546  rclcpp::MessageInfo message_info;
547  message_info.get_rmw_message_info().from_intra_process = false;
548 
549  switch (subscription->get_delivered_message_kind()) {
550  // Deliver ROS message
551  case rclcpp::DeliveredMessageKind::ROS_MESSAGE:
552  {
553  if (subscription->can_loan_messages()) {
554  // This is the case where a loaned message is taken from the middleware via
555  // inter-process communication, given to the user for their callback,
556  // and then returned.
557  void * loaned_msg = nullptr;
558  // TODO(wjwwood): refactor this into methods on subscription when LoanedMessage
559  // is extened to support subscriptions as well.
560  take_and_do_error_handling(
561  "taking a loaned message from topic",
562  subscription->get_topic_name(),
563  [&]()
564  {
565  rcl_ret_t ret = rcl_take_loaned_message(
566  subscription->get_subscription_handle().get(),
567  &loaned_msg,
568  &message_info.get_rmw_message_info(),
569  nullptr);
570  if (RCL_RET_SUBSCRIPTION_TAKE_FAILED == ret) {
571  return false;
572  } else if (RCL_RET_OK != ret) {
573  rclcpp::exceptions::throw_from_rcl_error(ret);
574  }
575  return true;
576  },
577  [&]() {subscription->handle_loaned_message(loaned_msg, message_info);});
578  if (nullptr != loaned_msg) {
580  subscription->get_subscription_handle().get(), loaned_msg);
581  if (RCL_RET_OK != ret) {
582  RCLCPP_ERROR(
583  rclcpp::get_logger("rclcpp"),
584  "rcl_return_loaned_message_from_subscription() failed for subscription on topic "
585  "'%s': %s",
586  subscription->get_topic_name(), rcl_get_error_string().str);
587  rcl_reset_error();
588  }
589  loaned_msg = nullptr;
590  }
591  } else {
592  // This case is taking a copy of the message data from the middleware via
593  // inter-process communication.
594  std::shared_ptr<void> message = subscription->create_message();
595  take_and_do_error_handling(
596  "taking a message from topic",
597  subscription->get_topic_name(),
598  [&]() {return subscription->take_type_erased(message.get(), message_info);},
599  [&]() {subscription->handle_message(message, message_info);});
600  // TODO(clalancette): In the case that the user is using the MessageMemoryPool,
601  // and they take a shared_ptr reference to the message in the callback, this can
602  // inadvertently return the message to the pool when the user is still using it.
603  // This is a bug that needs to be fixed in the pool, and we should probably have
604  // a custom deleter for the message that actually does the return_message().
605  subscription->return_message(message);
606  }
607  break;
608  }
609 
610  // Deliver serialized message
611  case rclcpp::DeliveredMessageKind::SERIALIZED_MESSAGE:
612  {
613  // This is the case where a copy of the serialized message is taken from
614  // the middleware via inter-process communication.
615  std::shared_ptr<SerializedMessage> serialized_msg =
616  subscription->create_serialized_message();
617  take_and_do_error_handling(
618  "taking a serialized message from topic",
619  subscription->get_topic_name(),
620  [&]() {return subscription->take_serialized(*serialized_msg.get(), message_info);},
621  [&]()
622  {
623  subscription->handle_serialized_message(serialized_msg, message_info);
624  });
625  subscription->return_serialized_message(serialized_msg);
626  break;
627  }
628 
629  // DYNAMIC SUBSCRIPTION ========================================================================
630  // Deliver dynamic message
631  case rclcpp::DeliveredMessageKind::DYNAMIC_MESSAGE:
632  {
633  throw std::runtime_error("Unimplemented");
634  }
635 
636  default:
637  {
638  throw std::runtime_error("Delivered message kind is not supported");
639  }
640  }
641 }
642 
643 void
644 Executor::execute_timer(rclcpp::TimerBase::SharedPtr timer, const std::shared_ptr<void> & data_ptr)
645 {
646  timer->execute_callback(data_ptr);
647 }
648 
649 void
650 Executor::execute_service(rclcpp::ServiceBase::SharedPtr service)
651 {
652  auto request_header = service->create_request_header();
653  std::shared_ptr<void> request = service->create_request();
654  take_and_do_error_handling(
655  "taking a service server request from service",
656  service->get_service_name(),
657  [&]() {return service->take_type_erased_request(request.get(), *request_header);},
658  [&]() {service->handle_request(request_header, request);});
659 }
660 
661 void
662 Executor::execute_client(rclcpp::ClientBase::SharedPtr client)
663 {
664  auto request_header = client->create_request_header();
665  std::shared_ptr<void> response = client->create_response();
666  take_and_do_error_handling(
667  "taking a service client response from service",
668  client->get_service_name(),
669  [&]() {return client->take_type_erased_response(response.get(), *request_header);},
670  [&]() {client->handle_response(request_header, response);});
671 }
672 
673 void
675 {
676  // Updating the entity collection and waitset expires any active result
677  this->wait_result_.reset();
678 
679  // Get the current list of available waitables from the collector.
682  auto callback_groups = this->collector_.get_all_callback_groups();
683  rclcpp::executors::build_entities_collection(callback_groups, collection);
684 
685  // Make a copy of notify waitable so we can continue to mutate the original
686  // one outside of the execute loop.
687  // This prevents the collection of guard conditions in the waitable from changing
688  // while we are waiting on it.
689  if (notify_waitable_) {
690  current_notify_waitable_ = std::make_shared<rclcpp::executors::ExecutorNotifyWaitable>(
692  auto notify_waitable = std::static_pointer_cast<rclcpp::Waitable>(current_notify_waitable_);
693  collection.waitables.insert({notify_waitable.get(), {notify_waitable, {}}});
694  }
695 
696  // We must remove expired entities here, so that we don't continue to use older entities.
697  // See https://github.com/ros2/rclcpp/issues/2180 for more information.
698  current_collection_.remove_expired_entities();
699 
700  // Update each of the groups of entities in the current collection, adding or removing
701  // from the wait set as necessary.
702  current_collection_.timers.update(
703  collection.timers,
704  [this](auto timer) {wait_set_.add_timer(timer);},
705  [this](auto timer) {wait_set_.remove_timer(timer);});
706 
707  current_collection_.subscriptions.update(
708  collection.subscriptions,
709  [this](auto subscription) {
710  wait_set_.add_subscription(subscription, kDefaultSubscriptionMask);
711  },
712  [this](auto subscription) {
713  wait_set_.remove_subscription(subscription, kDefaultSubscriptionMask);
714  });
715 
716  current_collection_.clients.update(
717  collection.clients,
718  [this](auto client) {wait_set_.add_client(client);},
719  [this](auto client) {wait_set_.remove_client(client);});
720 
721  current_collection_.services.update(
722  collection.services,
723  [this](auto service) {wait_set_.add_service(service);},
724  [this](auto service) {wait_set_.remove_service(service);});
725 
726  current_collection_.guard_conditions.update(
727  collection.guard_conditions,
728  [this](auto guard_condition) {wait_set_.add_guard_condition(guard_condition);},
729  [this](auto guard_condition) {wait_set_.remove_guard_condition(guard_condition);});
730 
731  current_collection_.waitables.update(
732  collection.waitables,
733  [this](auto waitable) {wait_set_.add_waitable(waitable);},
734  [this](auto waitable) {wait_set_.remove_waitable(waitable);});
735 
736  // In the case that an entity already has an expired weak pointer
737  // before being removed from the waitset, additionally prune the waitset.
738  this->wait_set_.prune_deleted_entities();
739 }
740 
741 void
742 Executor::wait_for_work(std::chrono::nanoseconds timeout)
743 {
744  TRACETOOLS_TRACEPOINT(rclcpp_executor_wait_for_work, timeout.count());
745 
746  // Clear any previous wait result
747  this->wait_result_.reset();
748 
749  {
750  std::lock_guard<std::mutex> guard(mutex_);
751 
752  if (this->entities_need_rebuild_.exchange(false) || current_collection_.empty()) {
753  this->collect_entities();
754  }
755  }
756 
757  this->wait_result_.emplace(wait_set_.wait(timeout));
758 
759  if (!this->wait_result_ || this->wait_result_->kind() == WaitResultKind::Empty) {
760  RCUTILS_LOG_WARN_NAMED(
761  "rclcpp",
762  "empty wait set received in wait(). This should never happen.");
763  } else {
764  if (this->wait_result_->kind() == WaitResultKind::Ready && current_notify_waitable_) {
765  auto & rcl_wait_set = this->wait_result_->get_wait_set().get_rcl_wait_set();
766  if (current_notify_waitable_->is_ready(rcl_wait_set)) {
767  current_notify_waitable_->execute(current_notify_waitable_->take_data());
768  }
769  }
770  }
771 }
772 
773 bool
775 {
776  TRACETOOLS_TRACEPOINT(rclcpp_executor_get_next_ready);
777 
778  bool valid_executable = false;
779 
780  if (!wait_result_.has_value() || wait_result_->kind() != rclcpp::WaitResultKind::Ready) {
781  return false;
782  }
783 
784  if (!valid_executable) {
785  size_t current_timer_index = 0;
786  while (true) {
787  auto [timer, timer_index] = wait_result_->peek_next_ready_timer(current_timer_index);
788  if (nullptr == timer) {
789  break;
790  }
791  current_timer_index = timer_index;
792  auto entity_iter = current_collection_.timers.find(timer->get_timer_handle().get());
793  if (entity_iter != current_collection_.timers.end()) {
794  auto callback_group = entity_iter->second.callback_group.lock();
795  if (!callback_group || !callback_group->can_be_taken_from()) {
796  current_timer_index++;
797  continue;
798  }
799  // At this point the timer is either ready for execution or was perhaps
800  // it was canceled, based on the result of call(), but either way it
801  // should not be checked again from peek_next_ready_timer(), so clear
802  // it from the wait result.
803  wait_result_->clear_timer_with_index(current_timer_index);
804  // Check that the timer should be called still, i.e. it wasn't canceled.
805  any_executable.data = timer->call();
806  if (!any_executable.data) {
807  current_timer_index++;
808  continue;
809  }
810  any_executable.timer = timer;
811  any_executable.callback_group = callback_group;
812  valid_executable = true;
813  break;
814  }
815  current_timer_index++;
816  }
817  }
818 
819  if (!valid_executable) {
820  while (auto subscription = wait_result_->next_ready_subscription()) {
821  auto entity_iter = current_collection_.subscriptions.find(
822  subscription->get_subscription_handle().get());
823  if (entity_iter != current_collection_.subscriptions.end()) {
824  auto callback_group = entity_iter->second.callback_group.lock();
825  if (!callback_group || !callback_group->can_be_taken_from()) {
826  continue;
827  }
828  any_executable.subscription = subscription;
829  any_executable.callback_group = callback_group;
830  valid_executable = true;
831  break;
832  }
833  }
834  }
835 
836  if (!valid_executable) {
837  while (auto service = wait_result_->next_ready_service()) {
838  auto entity_iter = current_collection_.services.find(service->get_service_handle().get());
839  if (entity_iter != current_collection_.services.end()) {
840  auto callback_group = entity_iter->second.callback_group.lock();
841  if (!callback_group || !callback_group->can_be_taken_from()) {
842  continue;
843  }
844  any_executable.service = service;
845  any_executable.callback_group = callback_group;
846  valid_executable = true;
847  break;
848  }
849  }
850  }
851 
852  if (!valid_executable) {
853  while (auto client = wait_result_->next_ready_client()) {
854  auto entity_iter = current_collection_.clients.find(client->get_client_handle().get());
855  if (entity_iter != current_collection_.clients.end()) {
856  auto callback_group = entity_iter->second.callback_group.lock();
857  if (!callback_group || !callback_group->can_be_taken_from()) {
858  continue;
859  }
860  any_executable.client = client;
861  any_executable.callback_group = callback_group;
862  valid_executable = true;
863  break;
864  }
865  }
866  }
867 
868  if (!valid_executable) {
869  while (auto waitable = wait_result_->next_ready_waitable()) {
870  auto entity_iter = current_collection_.waitables.find(waitable.get());
871  if (entity_iter != current_collection_.waitables.end()) {
872  auto callback_group = entity_iter->second.callback_group.lock();
873  if (!callback_group || !callback_group->can_be_taken_from()) {
874  continue;
875  }
876  any_executable.waitable = waitable;
877  any_executable.callback_group = callback_group;
878  any_executable.data = waitable->take_data();
879  valid_executable = true;
880  break;
881  }
882  }
883  }
884 
885  if (any_executable.callback_group) {
886  if (any_executable.callback_group->type() == CallbackGroupType::MutuallyExclusive) {
887  assert(any_executable.callback_group->can_be_taken_from().load());
888  any_executable.callback_group->can_be_taken_from().store(false);
889  }
890  }
891 
892 
893  return valid_executable;
894 }
895 
896 bool
897 Executor::get_next_executable(AnyExecutable & any_executable, std::chrono::nanoseconds timeout)
898 {
899  bool success = false;
900  // Check to see if there are any subscriptions or timers needing service
901  // TODO(wjwwood): improve run to run efficiency of this function
902  success = get_next_ready_executable(any_executable);
903  // If there are none
904  if (!success) {
905  // Wait for subscriptions or timers to work on
906  wait_for_work(timeout);
907  if (!spinning.load()) {
908  return false;
909  }
910  // Try again
911  success = get_next_ready_executable(any_executable);
912  }
913  return success;
914 }
915 
916 bool
918 {
919  return spinning;
920 }
Coordinate the order and timing of available communication tasks.
Definition: executor.hpp:65
virtual RCLCPP_PUBLIC void spin_node_some(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node)
Add a node, complete all immediately available work, and remove the node.
Definition: executor.cpp:307
std::shared_ptr< rclcpp::GuardCondition > interrupt_guard_condition_
Guard condition for signaling the rmw layer to wake up for special events.
Definition: executor.hpp:564
RCLCPP_PUBLIC void spin_node_once_nanoseconds(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node, std::chrono::nanoseconds timeout)
Add a node to executor, execute the next available unit of work, and remove the node.
Definition: executor.cpp:243
virtual RCLCPP_PUBLIC ~Executor()
Default destructor.
Definition: executor.cpp:92
virtual RCLCPP_PUBLIC std::vector< rclcpp::CallbackGroup::WeakPtr > get_manually_added_callback_groups()
Get callback groups that belong to executor.
Definition: executor.cpp:156
virtual RCLCPP_PUBLIC void remove_callback_group(rclcpp::CallbackGroup::SharedPtr group_ptr, bool notify=true)
Remove a callback group from the executor.
Definition: executor.cpp:201
virtual RCLCPP_PUBLIC void add_callback_group(rclcpp::CallbackGroup::SharedPtr group_ptr, rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify=true)
Add a callback group to an executor.
Definition: executor.cpp:170
RCLCPP_PUBLIC void wait_for_work(std::chrono::nanoseconds timeout=std::chrono::nanoseconds(-1))
Block until more work becomes avilable or timeout is reached.
Definition: executor.cpp:742
static RCLCPP_PUBLIC void execute_service(rclcpp::ServiceBase::SharedPtr service)
Run service server executable.
Definition: executor.cpp:650
virtual RCLCPP_PUBLIC void remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify=true)
Remove a node from the executor.
Definition: executor.cpp:223
RCLCPP_PUBLIC Executor(const rclcpp::ExecutorOptions &options=rclcpp::ExecutorOptions())
Default constructor.
Definition: executor.cpp:61
virtual RCLCPP_PUBLIC void spin_node_all(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node, std::chrono::nanoseconds max_duration)
Add a node, complete all immediately available work exhaustively, and remove the node.
Definition: executor.cpp:326
RCLCPP_PUBLIC bool get_next_executable(AnyExecutable &any_executable, std::chrono::nanoseconds timeout=std::chrono::nanoseconds(-1))
Wait for executable in ready state and populate union structure.
Definition: executor.cpp:897
virtual RCLCPP_PUBLIC void spin_once(std::chrono::nanoseconds timeout=std::chrono::nanoseconds(-1))
Collect work once and execute the next available work, optionally within a duration.
Definition: executor.cpp:445
virtual RCLCPP_PUBLIC void spin_some(std::chrono::nanoseconds max_duration=std::chrono::nanoseconds(0))
Collect work once and execute all available work, optionally within a max duration.
Definition: executor.cpp:320
virtual RCLCPP_PUBLIC void cancel()
Cancel any running spin* function, causing it to return.
Definition: executor.cpp:455
RCLCPP_PUBLIC bool is_spinning()
Returns true if the executor is currently spinning.
Definition: executor.cpp:917
virtual RCLCPP_PUBLIC void handle_updated_entities(bool notify)
This function triggers a recollect of all entities that are registered to the executor.
Definition: executor.cpp:134
RCLCPP_PUBLIC bool get_next_ready_executable(AnyExecutable &any_executable)
Check for executable in ready state and populate union structure.
Definition: executor.cpp:774
std::shared_ptr< rclcpp::Context > context_
The context associated with this executor.
Definition: executor.hpp:572
static RCLCPP_PUBLIC void execute_timer(rclcpp::TimerBase::SharedPtr timer, const std::shared_ptr< void > &data_ptr)
Run timer executable.
Definition: executor.cpp:644
static RCLCPP_PUBLIC void execute_subscription(rclcpp::SubscriptionBase::SharedPtr subscription)
Run subscription executable.
Definition: executor.cpp:542
RCLCPP_PUBLIC void collect_entities()
Gather all of the waitable entities from associated nodes and callback groups.
Definition: executor.cpp:674
virtual RCLCPP_PUBLIC std::vector< rclcpp::CallbackGroup::WeakPtr > get_automatically_added_callback_groups_from_nodes()
Get callback groups that belong to executor.
Definition: executor.cpp:163
virtual RCLCPP_PUBLIC FutureReturnCode spin_until_future_complete_impl(std::chrono::nanoseconds timeout, const std::function< std::future_status(std::chrono::nanoseconds wait_time)> &wait_for_future)
Spin (blocking) until the future is complete, it times out waiting, or rclcpp is interrupted.
Definition: executor.cpp:254
rclcpp::OnShutdownCallbackHandle shutdown_callback_handle_
shutdown callback handle registered to Context
Definition: executor.hpp:607
virtual RCLCPP_PUBLIC void add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify=true)
Add a node to the executor.
Definition: executor.cpp:187
RCLCPP_PUBLIC void execute_any_executable(AnyExecutable &any_exec)
Find the next available executable and do the work associated with it.
Definition: executor.cpp:467
std::shared_ptr< rclcpp::executors::ExecutorNotifyWaitable > notify_waitable_
Waitable containing guard conditions controlling the executor flow.
Definition: executor.hpp:587
virtual RCLCPP_PUBLIC std::vector< rclcpp::CallbackGroup::WeakPtr > get_all_callback_groups()
Get callback groups that belong to executor.
Definition: executor.cpp:149
static RCLCPP_PUBLIC void execute_client(rclcpp::ClientBase::SharedPtr client)
Run service client executable.
Definition: executor.cpp:662
rclcpp::executors::ExecutorEntitiesCollector collector_
Collector used to associate executable entities from nodes and guard conditions.
Definition: executor.hpp:592
virtual RCLCPP_PUBLIC void spin_all(std::chrono::nanoseconds max_duration)
Collect and execute work repeatedly within a duration or until no more work is available.
Definition: executor.cpp:341
RCLCPP_PUBLIC void spin_some_impl(std::chrono::nanoseconds max_duration, bool exhaustive)
Collect work and execute available work, optionally within a duration.
Definition: executor.cpp:350
std::shared_ptr< rclcpp::GuardCondition > shutdown_guard_condition_
Guard condition for signaling the rmw layer to wake up for system shutdown.
Definition: executor.hpp:567
std::atomic_bool spinning
Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins.
Definition: executor.hpp:561
A condition that can be waited on in a single wait set and asynchronously triggered.
Additional meta data about messages taken from subscriptions.
const rmw_message_info_t & get_rmw_message_info() const
Return the message info as the underlying rmw message info type.
Options used to determine what parts of a subscription get added to or removed from a wait set.
Created when the return code does not match one of the other specialized exceptions.
Definition: exceptions.hpp:162
void update(const EntityCollection< EntityKeyType, EntityValueType > &other, std::function< void(const EntitySharedPtr &)> on_added, std::function< void(const EntitySharedPtr &)> on_removed)
Update this collection based on the contents of another collection.
RCLCPP_PUBLIC void add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr)
Add a node to the entity collector.
RCLCPP_PUBLIC void update_collections()
Update the underlying collections.
RCLCPP_PUBLIC void remove_callback_group(rclcpp::CallbackGroup::SharedPtr group_ptr)
Remove a callback group from the entity collector.
RCLCPP_PUBLIC std::vector< rclcpp::CallbackGroup::WeakPtr > get_all_callback_groups() const
Get all callback groups known to this entity collector.
RCLCPP_PUBLIC std::vector< rclcpp::CallbackGroup::WeakPtr > get_automatically_added_callback_groups() const
Get automatically-added callback groups known to this entity collector.
RCLCPP_PUBLIC void remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr)
Remove a node from the entity collector.
RCLCPP_PUBLIC std::vector< rclcpp::CallbackGroup::WeakPtr > get_manually_added_callback_groups() const
Get manually-added callback groups known to this entity collector.
RCLCPP_PUBLIC void add_callback_group(rclcpp::CallbackGroup::SharedPtr group_ptr)
Add a callback group to the entity collector.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
FutureReturnCode
Return codes to be used with spin_until_future_complete.
RCLCPP_PUBLIC bool ok(rclcpp::Context::SharedPtr context=nullptr)
Check rclcpp's status.
RCLCPP_PUBLIC Logger get_logger(const std::string &name)
Return a named logger.
Definition: logger.cpp:34
Options to be passed to the executor constructor.
Represent the total set of entities for a single executor.
TimerCollection timers
Collection of timers currently in use by the executor.
GuardConditionCollection guard_conditions
Collection of guard conditions currently in use by the executor.
ServiceCollection services
Collection of services currently in use by the executor.
SubscriptionCollection subscriptions
Collection of subscriptions currently in use by the executor.
WaitableCollection waitables
Collection of waitables currently in use by the executor.
ClientCollection clients
Collection of clients currently in use by the executor.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_return_loaned_message_from_subscription(const rcl_subscription_t *subscription, void *loaned_message)
Return a loaned message from a topic using a rcl subscription.
Definition: subscription.c:725
#define RCL_RET_OK
Success return code.
Definition: types.h:27
rmw_ret_t rcl_ret_t
The type that holds an rcl return code.
Definition: types.h:24