ROS 2 rclcpp + rcl - rolling  rolling-b14af74a
ROS 2 C++ Client Library with ROS Client Library
executor.cpp
1 // Copyright 2015 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <algorithm>
16 #include <cassert>
17 #include <chrono>
18 #include <iterator>
19 #include <memory>
20 #include <map>
21 #include <stdexcept>
22 #include <string>
23 #include <type_traits>
24 #include <utility>
25 #include <vector>
26 
27 #include "rcl/allocator.h"
28 #include "rcl/error_handling.h"
29 #include "rclcpp/executors/executor_notify_waitable.hpp"
30 #include "rclcpp/subscription_wait_set_mask.hpp"
31 #include "rcpputils/scope_exit.hpp"
32 
33 #include "rclcpp/dynamic_typesupport/dynamic_message.hpp"
34 #include "rclcpp/exceptions.hpp"
35 #include "rclcpp/executor.hpp"
36 #include "rclcpp/guard_condition.hpp"
37 #include "rclcpp/node.hpp"
38 #include "rclcpp/utilities.hpp"
39 
40 #include "rcutils/logging_macros.h"
41 
42 #include "tracetools/tracetools.h"
43 
44 using namespace std::chrono_literals;
45 
46 using rclcpp::Executor;
47 
50 static constexpr rclcpp::SubscriptionWaitSetMask kDefaultSubscriptionMask = {true, false, false};
51 
53 
54 Executor::Executor(const std::shared_ptr<rclcpp::Context> & context)
55 : spinning(false),
56  context_(context),
57  entities_need_rebuild_(true),
58  collector_(nullptr),
59  wait_set_({}, {}, {}, {}, {}, {}, context)
60 {
61 }
62 
64 : spinning(false),
65  interrupt_guard_condition_(std::make_shared<rclcpp::GuardCondition>(options.context)),
66  shutdown_guard_condition_(std::make_shared<rclcpp::GuardCondition>(options.context)),
67  context_(options.context),
68  notify_waitable_(std::make_shared<rclcpp::executors::ExecutorNotifyWaitable>(
69  [this]() {
70  this->entities_need_rebuild_.store(true);
71  }, options.context)),
72  entities_need_rebuild_(true),
73  collector_(notify_waitable_),
74  wait_set_({}, {}, {}, {}, {}, {}, options.context),
75  current_notify_waitable_(notify_waitable_),
76  impl_(std::make_unique<rclcpp::ExecutorImplementation>())
77 {
78  shutdown_callback_handle_ = context_->add_on_shutdown_callback(
79  [weak_gc = std::weak_ptr<rclcpp::GuardCondition>{shutdown_guard_condition_}]() {
80  auto strong_gc = weak_gc.lock();
81  if (strong_gc) {
82  strong_gc->trigger();
83  }
84  });
85 
86  notify_waitable_->add_guard_condition(interrupt_guard_condition_);
87  notify_waitable_->add_guard_condition(shutdown_guard_condition_);
88 
89  // we need to initially rebuild the collection,
90  // so that the notify_waitable_ is added
91  collect_entities();
92 }
93 
95 {
96  std::lock_guard<std::mutex> guard(mutex_);
97 
98  notify_waitable_->remove_guard_condition(interrupt_guard_condition_);
99  notify_waitable_->remove_guard_condition(shutdown_guard_condition_);
100  current_collection_.timers.update(
101  {}, {},
102  [this](auto timer) {wait_set_.remove_timer(timer);});
103 
104  current_collection_.subscriptions.update(
105  {}, {},
106  [this](auto subscription) {
107  wait_set_.remove_subscription(subscription, kDefaultSubscriptionMask);
108  });
109 
110  current_collection_.clients.update(
111  {}, {},
112  [this](auto client) {wait_set_.remove_client(client);});
113 
114  current_collection_.services.update(
115  {}, {},
116  [this](auto service) {wait_set_.remove_service(service);});
117 
118  current_collection_.guard_conditions.update(
119  {}, {},
120  [this](auto guard_condition) {wait_set_.remove_guard_condition(guard_condition);});
121 
122  current_collection_.waitables.update(
123  {}, {},
124  [this](auto waitable) {wait_set_.remove_waitable(waitable);});
125 
126  // Remove shutdown callback handle registered to Context
127  if (!context_->remove_on_shutdown_callback(shutdown_callback_handle_)) {
128  RCUTILS_LOG_ERROR_NAMED(
129  "rclcpp",
130  "failed to remove registered on_shutdown callback");
131  rcl_reset_error();
132  }
133 }
134 
135 void
137 {
138  this->entities_need_rebuild_.store(true);
139 
140  if (!spinning.load() && entities_need_rebuild_.exchange(false)) {
141  std::lock_guard<std::mutex> guard(mutex_);
142  this->collect_entities();
143  }
144 
145  if (notify) {
146  interrupt_guard_condition_->trigger();
147  }
148 }
149 
150 std::vector<rclcpp::CallbackGroup::WeakPtr>
152 {
154  return this->collector_.get_all_callback_groups();
155 }
156 
157 std::vector<rclcpp::CallbackGroup::WeakPtr>
159 {
162 }
163 
164 std::vector<rclcpp::CallbackGroup::WeakPtr>
166 {
169 }
170 
171 void
173  rclcpp::CallbackGroup::SharedPtr group_ptr,
174  [[maybe_unused]] rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
175  bool notify)
176 {
177  this->collector_.add_callback_group(group_ptr);
178 
179  try {
180  this->handle_updated_entities(notify);
181  } catch (const rclcpp::exceptions::RCLError & ex) {
182  throw std::runtime_error(
183  std::string(
184  "Failed to handle entities update on callback group add: ") + ex.what());
185  }
186 }
187 
188 void
189 Executor::add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify)
190 {
191  if (node_ptr->get_context() != context_) {
192  throw std::runtime_error(
193  "add_node() called with a node with a different context from this executor");
194  }
195 
196  this->collector_.add_node(node_ptr);
197 
198  try {
199  this->handle_updated_entities(notify);
200  } catch (const rclcpp::exceptions::RCLError & ex) {
201  throw std::runtime_error(
202  std::string(
203  "Failed to handle entities update on node add: ") + ex.what());
204  }
205 }
206 
207 void
209  rclcpp::CallbackGroup::SharedPtr group_ptr,
210  bool notify)
211 {
212  this->collector_.remove_callback_group(group_ptr);
213 
214  try {
215  this->handle_updated_entities(notify);
216  } catch (const rclcpp::exceptions::RCLError & ex) {
217  throw std::runtime_error(
218  std::string(
219  "Failed to handle entities update on callback group remove: ") + ex.what());
220  }
221 }
222 
223 void
224 Executor::add_node(std::shared_ptr<rclcpp::Node> node_ptr, bool notify)
225 {
226  this->add_node(node_ptr->get_node_base_interface(), notify);
227 }
228 
229 void
230 Executor::remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify)
231 {
232  this->collector_.remove_node(node_ptr);
233 
234  try {
235  this->handle_updated_entities(notify);
236  } catch (const rclcpp::exceptions::RCLError & ex) {
237  throw std::runtime_error(
238  std::string(
239  "Failed to handle entities update on node remove: ") + ex.what());
240  }
241 }
242 
243 void
244 Executor::remove_node(std::shared_ptr<rclcpp::Node> node_ptr, bool notify)
245 {
246  this->remove_node(node_ptr->get_node_base_interface(), notify);
247 }
248 
249 void
251  rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node,
252  std::chrono::nanoseconds timeout)
253 {
254  this->add_node(node, false);
255  // non-blocking = true
256  spin_once(timeout);
257  this->remove_node(node, false);
258 }
259 
262  std::chrono::nanoseconds timeout,
263  const std::function<std::future_status(std::chrono::nanoseconds wait_time)> & wait_for_future)
264 {
265  // TODO(wjwwood): does not work recursively; can't call spin_node_until_future_complete
266  // inside a callback executed by an executor.
267 
268  // Check the future before entering the while loop.
269  // If the future is already complete, don't try to spin.
270  std::future_status status = wait_for_future(std::chrono::seconds(0));
271  if (status == std::future_status::ready) {
272  return FutureReturnCode::SUCCESS;
273  }
274 
275  auto end_time = std::chrono::steady_clock::now();
276  std::chrono::nanoseconds timeout_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
277  timeout);
278  if (timeout_ns > std::chrono::nanoseconds::zero()) {
279  end_time += timeout_ns;
280  }
281  std::chrono::nanoseconds timeout_left = timeout_ns;
282 
283  if (spinning.exchange(true)) {
284  throw std::runtime_error("spin_until_future_complete() called while already spinning");
285  }
286  RCPPUTILS_SCOPE_EXIT(wait_result_.reset();this->spinning.store(false););
287  while (rclcpp::ok(this->context_) && spinning.load()) {
288  // Do one item of work.
289  spin_once_impl(timeout_left);
290 
291  // Check if the future is set, return SUCCESS if it is.
292  status = wait_for_future(std::chrono::seconds(0));
293  if (status == std::future_status::ready) {
294  return FutureReturnCode::SUCCESS;
295  }
296  // If the original timeout is < 0, then this is blocking, never TIMEOUT.
297  if (timeout_ns < std::chrono::nanoseconds::zero()) {
298  continue;
299  }
300  // Otherwise check if we still have time to wait, return TIMEOUT if not.
301  auto now = std::chrono::steady_clock::now();
302  if (now >= end_time) {
303  return FutureReturnCode::TIMEOUT;
304  }
305  // Subtract the elapsed time from the original timeout.
306  timeout_left = std::chrono::duration_cast<std::chrono::nanoseconds>(end_time - now);
307  }
308 
309  // The future did not complete before ok() returned false, return INTERRUPTED.
310  return FutureReturnCode::INTERRUPTED;
311 }
312 
313 void
314 Executor::spin_node_some(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node)
315 {
316  this->add_node(node, false);
317  spin_some();
318  this->remove_node(node, false);
319 }
320 
321 void
322 Executor::spin_node_some(std::shared_ptr<rclcpp::Node> node)
323 {
324  this->spin_node_some(node->get_node_base_interface());
325 }
326 
327 void Executor::spin_some(std::chrono::nanoseconds max_duration)
328 {
329  return this->spin_some_impl(max_duration, false);
330 }
331 
332 void
334  rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node,
335  std::chrono::nanoseconds max_duration)
336 {
337  this->add_node(node, false);
338  spin_all(max_duration);
339  this->remove_node(node, false);
340 }
341 
342 void
343 Executor::spin_node_all(std::shared_ptr<rclcpp::Node> node, std::chrono::nanoseconds max_duration)
344 {
345  this->spin_node_all(node->get_node_base_interface(), max_duration);
346 }
347 
348 void Executor::spin_all(std::chrono::nanoseconds max_duration)
349 {
350  if (max_duration < 0ns) {
351  throw std::invalid_argument("max_duration must be greater than or equal to 0");
352  }
353  return this->spin_some_impl(max_duration, true);
354 }
355 
356 void
357 Executor::spin_some_impl(std::chrono::nanoseconds max_duration, bool exhaustive)
358 {
359  auto start = std::chrono::steady_clock::now();
360  auto max_duration_not_elapsed = [max_duration, start]() {
361  if (std::chrono::nanoseconds(0) == max_duration) {
362  // told to spin forever if need be
363  return true;
364  } else if (std::chrono::steady_clock::now() - start < max_duration) {
365  // told to spin only for some maximum amount of time
366  return true;
367  }
368  // spun too long
369  return false;
370  };
371 
372  if (spinning.exchange(true)) {
373  throw std::runtime_error("spin_some() called while already spinning");
374  }
375  RCPPUTILS_SCOPE_EXIT(wait_result_.reset();this->spinning.store(false););
376 
377  // clear the wait result and wait for work without blocking to collect the work
378  // for the first time
379  // both spin_some and spin_all wait for work at the beginning
380  wait_result_.reset();
381  wait_for_work(std::chrono::milliseconds(0));
382  bool entity_states_fully_polled = true;
383 
384  if (entities_need_rebuild_) {
385  // if the last wait triggered a collection rebuild, we need to call
386  // wait_for_work once more, in order to do a collection rebuild and collect
387  // events from the just added entities
388  entity_states_fully_polled = false;
389  }
390 
391  // The logic of this while loop is as follows:
392  //
393  // - while not shutdown, and spinning (not canceled), and not max duration reached...
394  // - try to get an executable item to execute, and execute it if available
395  // - otherwise, reset the wait result, and ...
396  // - if there was no work available just after waiting, break the loop unconditionally
397  // - this is appropriate for both spin_some and spin_all which use this function
398  // - else if exhaustive = true, then wait for work again
399  // - this is only used for spin_all and not spin_some
400  // - else break
401  // - this only occurs with spin_some
402  //
403  // The logic of this loop is subtle and should be carefully changed if at all.
404  // See also:
405  // https://github.com/ros2/rclcpp/issues/2508
406  // https://github.com/ros2/rclcpp/pull/2517
407  while (rclcpp::ok(context_) && spinning.load() && max_duration_not_elapsed()) {
408  AnyExecutable any_exec;
409  if (get_next_ready_executable(any_exec)) {
410  execute_any_executable(any_exec);
411  // during the execution some entity might got ready therefore we need
412  // to repoll the states of all entities
413  entity_states_fully_polled = false;
414  } else {
415  // if nothing is ready, reset the result to clear it
416  wait_result_.reset();
417 
418  if (entity_states_fully_polled) {
419  // there was no work after just waiting, always exit in this case
420  // before the exhaustive condition can be checked
421  break;
422  }
423 
424  if (exhaustive) {
425  // if exhaustive, wait for work again
426  // this only happens for spin_all; spin_some only waits at the start
427  wait_for_work(std::chrono::milliseconds(0));
428  entity_states_fully_polled = true;
429  if (entities_need_rebuild_) {
430  // if the last wait triggered a collection rebuild, we need to call
431  // wait_for_work once more, in order to do a collection rebuild and
432  // collect events from the just added entities
433  entity_states_fully_polled = false;
434  }
435  } else {
436  break;
437  }
438  }
439  }
440 }
441 
442 void
443 Executor::spin_once_impl(std::chrono::nanoseconds timeout)
444 {
445  AnyExecutable any_exec;
446  if (get_next_executable(any_exec, timeout)) {
447  execute_any_executable(any_exec);
448  }
449 }
450 
451 void
452 Executor::spin_once(std::chrono::nanoseconds timeout)
453 {
454  if (spinning.exchange(true)) {
455  throw std::runtime_error("spin_once() called while already spinning");
456  }
457  RCPPUTILS_SCOPE_EXIT(wait_result_.reset();this->spinning.store(false););
458  spin_once_impl(timeout);
459 }
460 
461 void
463 {
464  spinning.store(false);
465  try {
466  interrupt_guard_condition_->trigger();
467  } catch (const rclcpp::exceptions::RCLError & ex) {
468  throw std::runtime_error(
469  std::string("Failed to trigger guard condition in cancel: ") + ex.what());
470  }
471 }
472 
473 void
475 {
476  if (!spinning.load()) {
477  return;
478  }
479 
480  assert(
481  (void("cannot execute an AnyExecutable without a valid callback group"),
482  any_exec.callback_group));
483 
484  if (any_exec.timer) {
485  TRACETOOLS_TRACEPOINT(
486  rclcpp_executor_execute,
487  static_cast<const void *>(any_exec.timer->get_timer_handle().get()));
488  execute_timer(any_exec.timer, any_exec.data);
489  }
490  if (any_exec.subscription) {
491  TRACETOOLS_TRACEPOINT(
492  rclcpp_executor_execute,
493  static_cast<const void *>(any_exec.subscription->get_subscription_handle().get()));
494  execute_subscription(any_exec.subscription);
495  }
496  if (any_exec.service) {
497  execute_service(any_exec.service);
498  }
499  if (any_exec.client) {
500  execute_client(any_exec.client);
501  }
502  if (any_exec.waitable) {
503  const std::shared_ptr<void> & const_data = any_exec.data;
504  any_exec.waitable->execute(const_data);
505  }
506 
507  // Reset the callback_group, regardless of type
508  any_exec.callback_group->can_be_taken_from().store(true);
509 }
510 
511 template<typename Taker, typename Handler>
512 static
513 void
514 take_and_do_error_handling(
515  const char * action_description,
516  const char * topic_or_service_name,
517  Taker take_action,
518  Handler handle_action)
519 {
520  bool taken = false;
521  try {
522  taken = take_action();
523  } catch (const rclcpp::exceptions::RCLError & rcl_error) {
524  RCLCPP_ERROR(
525  rclcpp::get_logger("rclcpp"),
526  "executor %s '%s' unexpectedly failed: %s",
527  action_description,
528  topic_or_service_name,
529  rcl_error.what());
530  }
531  if (taken) {
532  handle_action();
533  } else {
534  // Message or Service was not taken for some reason.
535  // Note that this can be normal, if the underlying middleware needs to
536  // interrupt wait spuriously it is allowed.
537  // So in that case the executor cannot tell the difference in a
538  // spurious wake up and an entity actually having data until trying
539  // to take the data.
540  RCLCPP_DEBUG(
541  rclcpp::get_logger("rclcpp"),
542  "executor %s '%s' failed to take anything",
543  action_description,
544  topic_or_service_name);
545  }
546 }
547 
548 void
549 Executor::execute_subscription(rclcpp::SubscriptionBase::SharedPtr subscription)
550 {
552 
553  rclcpp::MessageInfo message_info;
554  message_info.get_rmw_message_info().from_intra_process = false;
555 
556  switch (subscription->get_delivered_message_kind()) {
557  // Deliver ROS message
558  case rclcpp::DeliveredMessageKind::ROS_MESSAGE:
559  {
560  if (subscription->can_loan_messages()) {
561  // This is the case where a loaned message is taken from the middleware via
562  // inter-process communication, given to the user for their callback,
563  // and then returned.
564  void * loaned_msg = nullptr;
565  // TODO(wjwwood): refactor this into methods on subscription when LoanedMessage
566  // is extened to support subscriptions as well.
567  take_and_do_error_handling(
568  "taking a loaned message from topic",
569  subscription->get_topic_name(),
570  [&]()
571  {
572  rcl_ret_t ret = rcl_take_loaned_message(
573  subscription->get_subscription_handle().get(),
574  &loaned_msg,
575  &message_info.get_rmw_message_info(),
576  nullptr);
577  if (RCL_RET_SUBSCRIPTION_TAKE_FAILED == ret) {
578  return false;
579  } else if (RCL_RET_OK != ret) {
580  rclcpp::exceptions::throw_from_rcl_error(ret);
581  }
582  return true;
583  },
584  [&]() {subscription->handle_loaned_message(loaned_msg, message_info);});
585  if (nullptr != loaned_msg) {
587  subscription->get_subscription_handle().get(), loaned_msg);
588  if (RCL_RET_OK != ret) {
589  RCLCPP_ERROR(
590  rclcpp::get_logger("rclcpp"),
591  "rcl_return_loaned_message_from_subscription() failed for subscription on topic "
592  "'%s': %s",
593  subscription->get_topic_name(), rcl_get_error_string().str);
594  rcl_reset_error();
595  }
596  loaned_msg = nullptr;
597  }
598  } else {
599  // This case is taking a copy of the message data from the middleware via
600  // inter-process communication.
601  std::shared_ptr<void> message = subscription->create_message();
602  take_and_do_error_handling(
603  "taking a message from topic",
604  subscription->get_topic_name(),
605  [&]() {return subscription->take_type_erased(message.get(), message_info);},
606  [&]() {subscription->handle_message(message, message_info);});
607  // TODO(clalancette): In the case that the user is using the MessageMemoryPool,
608  // and they take a shared_ptr reference to the message in the callback, this can
609  // inadvertently return the message to the pool when the user is still using it.
610  // This is a bug that needs to be fixed in the pool, and we should probably have
611  // a custom deleter for the message that actually does the return_message().
612  subscription->return_message(message);
613  }
614  break;
615  }
616 
617  // Deliver serialized message
618  case rclcpp::DeliveredMessageKind::SERIALIZED_MESSAGE:
619  {
620  // This is the case where a copy of the serialized message is taken from
621  // the middleware via inter-process communication.
622  std::shared_ptr<SerializedMessage> serialized_msg =
623  subscription->create_serialized_message();
624  take_and_do_error_handling(
625  "taking a serialized message from topic",
626  subscription->get_topic_name(),
627  [&]() {return subscription->take_serialized(*serialized_msg.get(), message_info);},
628  [&]()
629  {
630  subscription->handle_serialized_message(serialized_msg, message_info);
631  });
632  subscription->return_serialized_message(serialized_msg);
633  break;
634  }
635 
636  // DYNAMIC SUBSCRIPTION ========================================================================
637  // Deliver dynamic message
638  case rclcpp::DeliveredMessageKind::DYNAMIC_MESSAGE:
639  {
640  throw std::runtime_error("Unimplemented");
641  }
642 
643  default:
644  {
645  throw std::runtime_error("Delivered message kind is not supported");
646  }
647  }
648 }
649 
650 void
651 Executor::execute_timer(rclcpp::TimerBase::SharedPtr timer, const std::shared_ptr<void> & data_ptr)
652 {
653  timer->execute_callback(data_ptr);
654 }
655 
656 void
657 Executor::execute_service(rclcpp::ServiceBase::SharedPtr service)
658 {
659  auto request_header = service->create_request_header();
660  std::shared_ptr<void> request = service->create_request();
661  take_and_do_error_handling(
662  "taking a service server request from service",
663  service->get_service_name(),
664  [&]() {return service->take_type_erased_request(request.get(), *request_header);},
665  [&]() {service->handle_request(request_header, request);});
666 }
667 
668 void
669 Executor::execute_client(rclcpp::ClientBase::SharedPtr client)
670 {
671  auto request_header = client->create_request_header();
672  std::shared_ptr<void> response = client->create_response();
673  take_and_do_error_handling(
674  "taking a service client response from service",
675  client->get_service_name(),
676  [&]() {return client->take_type_erased_response(response.get(), *request_header);},
677  [&]() {client->handle_response(request_header, response);});
678 }
679 
680 void
682 {
683  // Updating the entity collection and waitset expires any active result
684  this->wait_result_.reset();
685 
686  // Get the current list of available waitables from the collector.
689  auto callback_groups = this->collector_.get_all_callback_groups();
690  rclcpp::executors::build_entities_collection(callback_groups, collection);
691 
692  // Make a copy of notify waitable so we can continue to mutate the original
693  // one outside of the execute loop.
694  // This prevents the collection of guard conditions in the waitable from changing
695  // while we are waiting on it.
696  if (notify_waitable_) {
697  current_notify_waitable_ = std::make_shared<rclcpp::executors::ExecutorNotifyWaitable>(
699  auto notify_waitable = std::static_pointer_cast<rclcpp::Waitable>(current_notify_waitable_);
700  collection.waitables.insert({notify_waitable.get(), {notify_waitable, {}}});
701  }
702 
703  // We must remove expired entities here, so that we don't continue to use older entities.
704  // See https://github.com/ros2/rclcpp/issues/2180 for more information.
705  current_collection_.remove_expired_entities();
706 
707  // Update each of the groups of entities in the current collection, adding or removing
708  // from the wait set as necessary.
709  current_collection_.timers.update(
710  collection.timers,
711  [this](auto timer) {wait_set_.add_timer(timer);},
712  [this](auto timer) {wait_set_.remove_timer(timer);});
713 
714  current_collection_.subscriptions.update(
715  collection.subscriptions,
716  [this](auto subscription) {
717  wait_set_.add_subscription(subscription, kDefaultSubscriptionMask);
718  },
719  [this](auto subscription) {
720  wait_set_.remove_subscription(subscription, kDefaultSubscriptionMask);
721  });
722 
723  current_collection_.clients.update(
724  collection.clients,
725  [this](auto client) {wait_set_.add_client(client);},
726  [this](auto client) {wait_set_.remove_client(client);});
727 
728  current_collection_.services.update(
729  collection.services,
730  [this](auto service) {wait_set_.add_service(service);},
731  [this](auto service) {wait_set_.remove_service(service);});
732 
733  current_collection_.guard_conditions.update(
734  collection.guard_conditions,
735  [this](auto guard_condition) {wait_set_.add_guard_condition(guard_condition);},
736  [this](auto guard_condition) {wait_set_.remove_guard_condition(guard_condition);});
737 
738  current_collection_.waitables.update(
739  collection.waitables,
740  [this](auto waitable) {wait_set_.add_waitable(waitable);},
741  [this](auto waitable) {wait_set_.remove_waitable(waitable);});
742 
743  // In the case that an entity already has an expired weak pointer
744  // before being removed from the waitset, additionally prune the waitset.
745  this->wait_set_.prune_deleted_entities();
746 }
747 
748 void
749 Executor::wait_for_work(std::chrono::nanoseconds timeout)
750 {
751  TRACETOOLS_TRACEPOINT(rclcpp_executor_wait_for_work, timeout.count());
752 
753  // Clear any previous wait result
754  this->wait_result_.reset();
755 
756  {
757  std::lock_guard<std::mutex> guard(mutex_);
758 
759  if (this->entities_need_rebuild_.exchange(false) || current_collection_.empty()) {
760  this->collect_entities();
761  }
762  }
763 
764  this->wait_result_.emplace(wait_set_.wait(timeout));
765 
766  if (!this->wait_result_ || this->wait_result_->kind() == WaitResultKind::Empty) {
767  RCUTILS_LOG_WARN_NAMED(
768  "rclcpp",
769  "empty wait set received in wait(). This should never happen.");
770  } else {
771  if (this->wait_result_->kind() == WaitResultKind::Ready && current_notify_waitable_) {
772  auto & rcl_wait_set = this->wait_result_->get_wait_set().get_rcl_wait_set();
773  if (current_notify_waitable_->is_ready(rcl_wait_set)) {
774  current_notify_waitable_->execute(current_notify_waitable_->take_data());
775  }
776  }
777  }
778 }
779 
780 bool
782 {
783  TRACETOOLS_TRACEPOINT(rclcpp_executor_get_next_ready);
784 
785  bool valid_executable = false;
786 
787  if (!wait_result_.has_value() || wait_result_->kind() != rclcpp::WaitResultKind::Ready) {
788  return false;
789  }
790 
791  if (!valid_executable) {
792  size_t current_timer_index = 0;
793  while (true) {
794  auto [timer, timer_index] = wait_result_->peek_next_ready_timer(current_timer_index);
795  if (nullptr == timer) {
796  break;
797  }
798  current_timer_index = timer_index;
799  auto entity_iter = current_collection_.timers.find(timer->get_timer_handle().get());
800  if (entity_iter != current_collection_.timers.end()) {
801  auto callback_group = entity_iter->second.callback_group.lock();
802  if (!callback_group || !callback_group->can_be_taken_from()) {
803  current_timer_index++;
804  continue;
805  }
806  // At this point the timer is either ready for execution or was perhaps
807  // it was canceled, based on the result of call(), but either way it
808  // should not be checked again from peek_next_ready_timer(), so clear
809  // it from the wait result.
810  wait_result_->clear_timer_with_index(current_timer_index);
811  // Check that the timer should be called still, i.e. it wasn't canceled.
812  any_executable.data = timer->call();
813  if (!any_executable.data) {
814  current_timer_index++;
815  continue;
816  }
817  any_executable.timer = timer;
818  any_executable.callback_group = callback_group;
819  valid_executable = true;
820  break;
821  }
822  current_timer_index++;
823  }
824  }
825 
826  if (!valid_executable) {
827  while (auto subscription = wait_result_->next_ready_subscription()) {
828  auto entity_iter = current_collection_.subscriptions.find(
829  subscription->get_subscription_handle().get());
830  if (entity_iter != current_collection_.subscriptions.end()) {
831  auto callback_group = entity_iter->second.callback_group.lock();
832  if (!callback_group || !callback_group->can_be_taken_from()) {
833  continue;
834  }
835  any_executable.subscription = subscription;
836  any_executable.callback_group = callback_group;
837  valid_executable = true;
838  break;
839  }
840  }
841  }
842 
843  if (!valid_executable) {
844  while (auto service = wait_result_->next_ready_service()) {
845  auto entity_iter = current_collection_.services.find(service->get_service_handle().get());
846  if (entity_iter != current_collection_.services.end()) {
847  auto callback_group = entity_iter->second.callback_group.lock();
848  if (!callback_group || !callback_group->can_be_taken_from()) {
849  continue;
850  }
851  any_executable.service = service;
852  any_executable.callback_group = callback_group;
853  valid_executable = true;
854  break;
855  }
856  }
857  }
858 
859  if (!valid_executable) {
860  while (auto client = wait_result_->next_ready_client()) {
861  auto entity_iter = current_collection_.clients.find(client->get_client_handle().get());
862  if (entity_iter != current_collection_.clients.end()) {
863  auto callback_group = entity_iter->second.callback_group.lock();
864  if (!callback_group || !callback_group->can_be_taken_from()) {
865  continue;
866  }
867  any_executable.client = client;
868  any_executable.callback_group = callback_group;
869  valid_executable = true;
870  break;
871  }
872  }
873  }
874 
875  if (!valid_executable) {
876  while (auto waitable = wait_result_->next_ready_waitable()) {
877  auto entity_iter = current_collection_.waitables.find(waitable.get());
878  if (entity_iter != current_collection_.waitables.end()) {
879  auto callback_group = entity_iter->second.callback_group.lock();
880  if (!callback_group || !callback_group->can_be_taken_from()) {
881  continue;
882  }
883  any_executable.waitable = waitable;
884  any_executable.callback_group = callback_group;
885  any_executable.data = waitable->take_data();
886  valid_executable = true;
887  break;
888  }
889  }
890  }
891 
892  if (any_executable.callback_group) {
893  if (any_executable.callback_group->type() == CallbackGroupType::MutuallyExclusive) {
894  assert(any_executable.callback_group->can_be_taken_from().load());
895  any_executable.callback_group->can_be_taken_from().store(false);
896  }
897  }
898 
899 
900  return valid_executable;
901 }
902 
903 bool
904 Executor::get_next_executable(AnyExecutable & any_executable, std::chrono::nanoseconds timeout)
905 {
906  bool success = false;
907  // Check to see if there are any subscriptions or timers needing service
908  // TODO(wjwwood): improve run to run efficiency of this function
909  success = get_next_ready_executable(any_executable);
910  // If there are none
911  if (!success) {
912  // Wait for subscriptions or timers to work on
913  wait_for_work(timeout);
914  if (!spinning.load()) {
915  return false;
916  }
917  // Try again
918  success = get_next_ready_executable(any_executable);
919  }
920  return success;
921 }
922 
923 bool
925 {
926  return spinning;
927 }
Coordinate the order and timing of available communication tasks.
Definition: executor.hpp:65
virtual RCLCPP_PUBLIC void spin_node_some(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node)
Add a node, complete all immediately available work, and remove the node.
Definition: executor.cpp:314
std::shared_ptr< rclcpp::GuardCondition > interrupt_guard_condition_
Guard condition for signaling the rmw layer to wake up for special events.
Definition: executor.hpp:567
RCLCPP_PUBLIC void spin_node_once_nanoseconds(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node, std::chrono::nanoseconds timeout)
Add a node to executor, execute the next available unit of work, and remove the node.
Definition: executor.cpp:250
virtual RCLCPP_PUBLIC ~Executor()
Default destructor.
Definition: executor.cpp:94
virtual RCLCPP_PUBLIC std::vector< rclcpp::CallbackGroup::WeakPtr > get_manually_added_callback_groups()
Get callback groups that belong to executor.
Definition: executor.cpp:158
virtual RCLCPP_PUBLIC void remove_callback_group(rclcpp::CallbackGroup::SharedPtr group_ptr, bool notify=true)
Remove a callback group from the executor.
Definition: executor.cpp:208
virtual RCLCPP_PUBLIC void add_callback_group(rclcpp::CallbackGroup::SharedPtr group_ptr, rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify=true)
Add a callback group to an executor.
Definition: executor.cpp:172
RCLCPP_PUBLIC void wait_for_work(std::chrono::nanoseconds timeout=std::chrono::nanoseconds(-1))
Block until more work becomes avilable or timeout is reached.
Definition: executor.cpp:749
static RCLCPP_PUBLIC void execute_service(rclcpp::ServiceBase::SharedPtr service)
Run service server executable.
Definition: executor.cpp:657
virtual RCLCPP_PUBLIC void remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify=true)
Remove a node from the executor.
Definition: executor.cpp:230
RCLCPP_PUBLIC Executor(const rclcpp::ExecutorOptions &options=rclcpp::ExecutorOptions())
Default constructor.
Definition: executor.cpp:63
virtual RCLCPP_PUBLIC void spin_node_all(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node, std::chrono::nanoseconds max_duration)
Add a node, complete all immediately available work exhaustively, and remove the node.
Definition: executor.cpp:333
RCLCPP_PUBLIC bool get_next_executable(AnyExecutable &any_executable, std::chrono::nanoseconds timeout=std::chrono::nanoseconds(-1))
Wait for executable in ready state and populate union structure.
Definition: executor.cpp:904
virtual RCLCPP_PUBLIC void spin_once(std::chrono::nanoseconds timeout=std::chrono::nanoseconds(-1))
Collect work once and execute the next available work, optionally within a duration.
Definition: executor.cpp:452
virtual RCLCPP_PUBLIC void spin_some(std::chrono::nanoseconds max_duration=std::chrono::nanoseconds(0))
Collect work once and execute all available work, optionally within a max duration.
Definition: executor.cpp:327
virtual RCLCPP_PUBLIC void cancel()
Cancel any running spin* function, causing it to return.
Definition: executor.cpp:462
RCLCPP_PUBLIC bool is_spinning()
Returns true if the executor is currently spinning.
Definition: executor.cpp:924
virtual RCLCPP_PUBLIC void handle_updated_entities(bool notify)
This function triggers a recollect of all entities that are registered to the executor.
Definition: executor.cpp:136
RCLCPP_PUBLIC bool get_next_ready_executable(AnyExecutable &any_executable)
Check for executable in ready state and populate union structure.
Definition: executor.cpp:781
std::shared_ptr< rclcpp::Context > context_
The context associated with this executor.
Definition: executor.hpp:575
static RCLCPP_PUBLIC void execute_timer(rclcpp::TimerBase::SharedPtr timer, const std::shared_ptr< void > &data_ptr)
Run timer executable.
Definition: executor.cpp:651
static RCLCPP_PUBLIC void execute_subscription(rclcpp::SubscriptionBase::SharedPtr subscription)
Run subscription executable.
Definition: executor.cpp:549
RCLCPP_PUBLIC void collect_entities()
Gather all of the waitable entities from associated nodes and callback groups.
Definition: executor.cpp:681
virtual RCLCPP_PUBLIC std::vector< rclcpp::CallbackGroup::WeakPtr > get_automatically_added_callback_groups_from_nodes()
Get callback groups that belong to executor.
Definition: executor.cpp:165
virtual RCLCPP_PUBLIC FutureReturnCode spin_until_future_complete_impl(std::chrono::nanoseconds timeout, const std::function< std::future_status(std::chrono::nanoseconds wait_time)> &wait_for_future)
Spin (blocking) until the future is complete, it times out waiting, or rclcpp is interrupted.
Definition: executor.cpp:261
rclcpp::OnShutdownCallbackHandle shutdown_callback_handle_
shutdown callback handle registered to Context
Definition: executor.hpp:610
virtual RCLCPP_PUBLIC void add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify=true)
Add a node to the executor.
Definition: executor.cpp:189
RCLCPP_PUBLIC void execute_any_executable(AnyExecutable &any_exec)
Find the next available executable and do the work associated with it.
Definition: executor.cpp:474
std::shared_ptr< rclcpp::executors::ExecutorNotifyWaitable > notify_waitable_
Waitable containing guard conditions controlling the executor flow.
Definition: executor.hpp:590
virtual RCLCPP_PUBLIC std::vector< rclcpp::CallbackGroup::WeakPtr > get_all_callback_groups()
Get callback groups that belong to executor.
Definition: executor.cpp:151
static RCLCPP_PUBLIC void execute_client(rclcpp::ClientBase::SharedPtr client)
Run service client executable.
Definition: executor.cpp:669
rclcpp::executors::ExecutorEntitiesCollector collector_
Collector used to associate executable entities from nodes and guard conditions.
Definition: executor.hpp:595
virtual RCLCPP_PUBLIC void spin_all(std::chrono::nanoseconds max_duration)
Collect and execute work repeatedly within a duration or until no more work is available.
Definition: executor.cpp:348
RCLCPP_PUBLIC void spin_some_impl(std::chrono::nanoseconds max_duration, bool exhaustive)
Collect work and execute available work, optionally within a duration.
Definition: executor.cpp:357
std::shared_ptr< rclcpp::GuardCondition > shutdown_guard_condition_
Guard condition for signaling the rmw layer to wake up for system shutdown.
Definition: executor.hpp:570
std::atomic_bool spinning
Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins.
Definition: executor.hpp:564
A condition that can be waited on in a single wait set and asynchronously triggered.
Additional meta data about messages taken from subscriptions.
const rmw_message_info_t & get_rmw_message_info() const
Return the message info as the underlying rmw message info type.
Options used to determine what parts of a subscription get added to or removed from a wait set.
Created when the return code does not match one of the other specialized exceptions.
Definition: exceptions.hpp:162
void update(const EntityCollection< EntityKeyType, EntityValueType > &other, std::function< void(const EntitySharedPtr &)> on_added, std::function< void(const EntitySharedPtr &)> on_removed)
Update this collection based on the contents of another collection.
RCLCPP_PUBLIC void add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr)
Add a node to the entity collector.
RCLCPP_PUBLIC void update_collections()
Update the underlying collections.
RCLCPP_PUBLIC void remove_callback_group(rclcpp::CallbackGroup::SharedPtr group_ptr)
Remove a callback group from the entity collector.
RCLCPP_PUBLIC std::vector< rclcpp::CallbackGroup::WeakPtr > get_all_callback_groups() const
Get all callback groups known to this entity collector.
RCLCPP_PUBLIC std::vector< rclcpp::CallbackGroup::WeakPtr > get_automatically_added_callback_groups() const
Get automatically-added callback groups known to this entity collector.
RCLCPP_PUBLIC void remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr)
Remove a node from the entity collector.
RCLCPP_PUBLIC std::vector< rclcpp::CallbackGroup::WeakPtr > get_manually_added_callback_groups() const
Get manually-added callback groups known to this entity collector.
RCLCPP_PUBLIC void add_callback_group(rclcpp::CallbackGroup::SharedPtr group_ptr)
Add a callback group to the entity collector.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
FutureReturnCode
Return codes to be used with spin_until_future_complete.
RCLCPP_PUBLIC bool ok(rclcpp::Context::SharedPtr context=nullptr)
Check rclcpp's status.
RCLCPP_PUBLIC Logger get_logger(const std::string &name)
Return a named logger.
Definition: logger.cpp:34
Options to be passed to the executor constructor.
Represent the total set of entities for a single executor.
TimerCollection timers
Collection of timers currently in use by the executor.
GuardConditionCollection guard_conditions
Collection of guard conditions currently in use by the executor.
ServiceCollection services
Collection of services currently in use by the executor.
SubscriptionCollection subscriptions
Collection of subscriptions currently in use by the executor.
WaitableCollection waitables
Collection of waitables currently in use by the executor.
ClientCollection clients
Collection of clients currently in use by the executor.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_return_loaned_message_from_subscription(const rcl_subscription_t *subscription, void *loaned_message)
Return a loaned message from a topic using a rcl subscription.
Definition: subscription.c:725
#define RCL_RET_OK
Success return code.
Definition: types.h:27
rmw_ret_t rcl_ret_t
The type that holds an rcl return code.
Definition: types.h:24