ROS 2 rclcpp + rcl - humble  humble
ROS 2 C++ Client Library with ROS Client Library
executor.cpp
1 // Copyright 2015 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <algorithm>
16 #include <memory>
17 #include <map>
18 #include <string>
19 #include <type_traits>
20 #include <utility>
21 #include <vector>
22 
23 #include "rcl/allocator.h"
24 #include "rcl/error_handling.h"
25 #include "rcpputils/scope_exit.hpp"
26 
27 #include "rclcpp/exceptions.hpp"
28 #include "rclcpp/executor.hpp"
29 #include "rclcpp/guard_condition.hpp"
30 #include "rclcpp/memory_strategy.hpp"
31 #include "rclcpp/node.hpp"
32 #include "rclcpp/utilities.hpp"
33 
34 #include "rcutils/logging_macros.h"
35 
36 #include "tracetools/tracetools.h"
37 
38 using namespace std::chrono_literals;
39 
40 using rclcpp::exceptions::throw_from_rcl_error;
42 using rclcpp::Executor;
45 
46 Executor::Executor(const rclcpp::ExecutorOptions & options)
47 : spinning(false),
48  interrupt_guard_condition_(options.context),
49  shutdown_guard_condition_(std::make_shared<rclcpp::GuardCondition>(options.context)),
50  memory_strategy_(options.memory_strategy)
51 {
52  // Store the context for later use.
53  context_ = options.context;
54 
55  shutdown_callback_handle_ = context_->add_on_shutdown_callback(
56  [weak_gc = std::weak_ptr<rclcpp::GuardCondition>{shutdown_guard_condition_}]() {
57  auto strong_gc = weak_gc.lock();
58  if (strong_gc) {
59  strong_gc->trigger();
60  }
61  });
62 
63  // The number of guard conditions is always at least 2: 1 for the ctrl-c guard cond,
64  // and one for the executor's guard cond (interrupt_guard_condition_)
65  memory_strategy_->add_guard_condition(*shutdown_guard_condition_.get());
66 
67  // Put the executor's guard condition in
68  memory_strategy_->add_guard_condition(interrupt_guard_condition_);
69  rcl_allocator_t allocator = memory_strategy_->get_allocator();
70 
72  &wait_set_,
73  0, 2, 0, 0, 0, 0,
74  context_->get_rcl_context().get(),
75  allocator);
76  if (RCL_RET_OK != ret) {
77  RCUTILS_LOG_ERROR_NAMED(
78  "rclcpp",
79  "failed to create wait set: %s", rcl_get_error_string().str);
80  rcl_reset_error();
81  throw_from_rcl_error(ret, "Failed to create wait set in Executor constructor");
82  }
83 }
84 
86 {
87  // Disassociate all callback groups.
88  for (auto & pair : weak_groups_to_nodes_) {
89  auto group = pair.first.lock();
90  if (group) {
91  std::atomic_bool & has_executor = group->get_associated_with_executor_atomic();
92  has_executor.store(false);
93  }
94  }
95  // Disassociate all nodes.
96  std::for_each(
97  weak_nodes_.begin(), weak_nodes_.end(), []
98  (rclcpp::node_interfaces::NodeBaseInterface::WeakPtr weak_node_ptr) {
99  auto shared_node_ptr = weak_node_ptr.lock();
100  if (shared_node_ptr) {
101  std::atomic_bool & has_executor = shared_node_ptr->get_associated_with_executor_atomic();
102  has_executor.store(false);
103  }
104  });
105  weak_nodes_.clear();
106  weak_groups_associated_with_executor_to_nodes_.clear();
107  weak_groups_to_nodes_associated_with_executor_.clear();
108  weak_groups_to_nodes_.clear();
109  for (const auto & pair : weak_groups_to_guard_conditions_) {
110  auto guard_condition = pair.second;
111  memory_strategy_->remove_guard_condition(guard_condition);
112  }
113  weak_groups_to_guard_conditions_.clear();
114 
115  // Finalize the wait set.
116  if (rcl_wait_set_fini(&wait_set_) != RCL_RET_OK) {
117  RCUTILS_LOG_ERROR_NAMED(
118  "rclcpp",
119  "failed to destroy wait set: %s", rcl_get_error_string().str);
120  rcl_reset_error();
121  }
122  // Remove and release the sigint guard condition
123  memory_strategy_->remove_guard_condition(shutdown_guard_condition_.get());
124  memory_strategy_->remove_guard_condition(&interrupt_guard_condition_);
125 
126  // Remove shutdown callback handle registered to Context
127  if (!context_->remove_on_shutdown_callback(shutdown_callback_handle_)) {
128  RCUTILS_LOG_ERROR_NAMED(
129  "rclcpp",
130  "failed to remove registered on_shutdown callback");
131  rcl_reset_error();
132  }
133 }
134 
135 std::vector<rclcpp::CallbackGroup::WeakPtr>
136 Executor::get_all_callback_groups()
137 {
138  std::vector<rclcpp::CallbackGroup::WeakPtr> groups;
139  std::lock_guard<std::mutex> guard{mutex_};
140  for (const auto & group_node_ptr : weak_groups_associated_with_executor_to_nodes_) {
141  groups.push_back(group_node_ptr.first);
142  }
143  for (auto const & group_node_ptr : weak_groups_to_nodes_associated_with_executor_) {
144  groups.push_back(group_node_ptr.first);
145  }
146  return groups;
147 }
148 
149 std::vector<rclcpp::CallbackGroup::WeakPtr>
151 {
152  std::vector<rclcpp::CallbackGroup::WeakPtr> groups;
153  std::lock_guard<std::mutex> guard{mutex_};
154  for (auto const & group_node_ptr : weak_groups_associated_with_executor_to_nodes_) {
155  groups.push_back(group_node_ptr.first);
156  }
157  return groups;
158 }
159 
160 std::vector<rclcpp::CallbackGroup::WeakPtr>
162 {
163  std::vector<rclcpp::CallbackGroup::WeakPtr> groups;
164  std::lock_guard<std::mutex> guard{mutex_};
165  for (auto const & group_node_ptr : weak_groups_to_nodes_associated_with_executor_) {
166  groups.push_back(group_node_ptr.first);
167  }
168  return groups;
169 }
170 
171 void
173 {
174  for (auto & weak_node : weak_nodes_) {
175  auto node = weak_node.lock();
176  if (node) {
177  node->for_each_callback_group(
178  [this, node](rclcpp::CallbackGroup::SharedPtr shared_group_ptr)
179  {
180  if (
181  shared_group_ptr->automatically_add_to_executor_with_node() &&
182  !shared_group_ptr->get_associated_with_executor_atomic().load())
183  {
184  this->add_callback_group_to_map(
185  shared_group_ptr,
186  node,
187  weak_groups_to_nodes_associated_with_executor_,
188  true);
189  }
190  });
191  }
192  }
193 }
194 
195 void
197  rclcpp::CallbackGroup::SharedPtr group_ptr,
198  rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
199  rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap & weak_groups_to_nodes,
200  bool notify)
201 {
202  // If the callback_group already has an executor
203  std::atomic_bool & has_executor = group_ptr->get_associated_with_executor_atomic();
204  if (has_executor.exchange(true)) {
205  throw std::runtime_error("Callback group has already been added to an executor.");
206  }
207 
208  rclcpp::CallbackGroup::WeakPtr weak_group_ptr = group_ptr;
209  auto insert_info =
210  weak_groups_to_nodes.insert(std::make_pair(weak_group_ptr, node_ptr));
211  bool was_inserted = insert_info.second;
212  if (!was_inserted) {
213  throw std::runtime_error("Callback group was already added to executor.");
214  }
215  // Also add to the map that contains all callback groups
216  weak_groups_to_nodes_.insert(std::make_pair(weak_group_ptr, node_ptr));
217 
218  if (node_ptr->get_context()->is_valid()) {
219  auto callback_group_guard_condition =
220  group_ptr->get_notify_guard_condition(node_ptr->get_context());
221  weak_groups_to_guard_conditions_[weak_group_ptr] = callback_group_guard_condition.get();
222  // Add the callback_group's notify condition to the guard condition handles
223  memory_strategy_->add_guard_condition(*callback_group_guard_condition);
224  }
225 
226  if (notify) {
227  // Interrupt waiting to handle new node
228  try {
230  } catch (const rclcpp::exceptions::RCLError & ex) {
231  throw std::runtime_error(
232  std::string(
233  "Failed to trigger guard condition on callback group add: ") + ex.what());
234  }
235  }
236 }
237 
238 void
240  rclcpp::CallbackGroup::SharedPtr group_ptr,
241  rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
242  bool notify)
243 {
244  std::lock_guard<std::mutex> guard{mutex_};
246  group_ptr,
247  node_ptr,
248  weak_groups_associated_with_executor_to_nodes_,
249  notify);
250 }
251 
252 void
253 Executor::add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify)
254 {
255  // If the node already has an executor
256  std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic();
257  if (has_executor.exchange(true)) {
258  throw std::runtime_error(
259  std::string("Node '") + node_ptr->get_fully_qualified_name() +
260  "' has already been added to an executor.");
261  }
262  std::lock_guard<std::mutex> guard{mutex_};
263  node_ptr->for_each_callback_group(
264  [this, node_ptr, notify](rclcpp::CallbackGroup::SharedPtr group_ptr)
265  {
266  if (!group_ptr->get_associated_with_executor_atomic().load() &&
267  group_ptr->automatically_add_to_executor_with_node())
268  {
269  this->add_callback_group_to_map(
270  group_ptr,
271  node_ptr,
272  weak_groups_to_nodes_associated_with_executor_,
273  notify);
274  }
275  });
276 
277  weak_nodes_.push_back(node_ptr);
278 }
279 
280 void
282  rclcpp::CallbackGroup::SharedPtr group_ptr,
283  rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap & weak_groups_to_nodes,
284  bool notify)
285 {
286  rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr;
287  rclcpp::CallbackGroup::WeakPtr weak_group_ptr = group_ptr;
288  auto iter = weak_groups_to_nodes.find(weak_group_ptr);
289  if (iter != weak_groups_to_nodes.end()) {
290  node_ptr = iter->second.lock();
291  if (node_ptr == nullptr) {
292  throw std::runtime_error("Node must not be deleted before its callback group(s).");
293  }
294  weak_groups_to_nodes.erase(iter);
295  weak_groups_to_nodes_.erase(group_ptr);
296  std::atomic_bool & has_executor = group_ptr->get_associated_with_executor_atomic();
297  has_executor.store(false);
298  } else {
299  throw std::runtime_error("Callback group needs to be associated with executor.");
300  }
301  // If the node was matched and removed, interrupt waiting.
302  if (!has_node(node_ptr, weak_groups_to_nodes_associated_with_executor_) &&
303  !has_node(node_ptr, weak_groups_associated_with_executor_to_nodes_))
304  {
305  auto iter = weak_groups_to_guard_conditions_.find(weak_group_ptr);
306  if (iter != weak_groups_to_guard_conditions_.end()) {
307  memory_strategy_->remove_guard_condition(iter->second);
308  }
309  weak_groups_to_guard_conditions_.erase(weak_group_ptr);
310 
311  if (notify) {
312  try {
314  } catch (const rclcpp::exceptions::RCLError & ex) {
315  throw std::runtime_error(
316  std::string(
317  "Failed to trigger guard condition on callback group remove: ") + ex.what());
318  }
319  }
320  }
321 }
322 
323 void
325  rclcpp::CallbackGroup::SharedPtr group_ptr,
326  bool notify)
327 {
328  std::lock_guard<std::mutex> guard{mutex_};
330  group_ptr,
331  weak_groups_associated_with_executor_to_nodes_,
332  notify);
333 }
334 
335 void
336 Executor::add_node(std::shared_ptr<rclcpp::Node> node_ptr, bool notify)
337 {
338  this->add_node(node_ptr->get_node_base_interface(), notify);
339 }
340 
341 void
342 Executor::remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify)
343 {
344  if (!node_ptr->get_associated_with_executor_atomic().load()) {
345  throw std::runtime_error("Node needs to be associated with an executor.");
346  }
347 
348  std::lock_guard<std::mutex> guard{mutex_};
349  bool found_node = false;
350  auto node_it = weak_nodes_.begin();
351  while (node_it != weak_nodes_.end()) {
352  bool matched = (node_it->lock() == node_ptr);
353  if (matched) {
354  found_node = true;
355  node_it = weak_nodes_.erase(node_it);
356  } else {
357  ++node_it;
358  }
359  }
360  if (!found_node) {
361  throw std::runtime_error("Node needs to be associated with this executor.");
362  }
363 
364  for (auto it = weak_groups_to_nodes_associated_with_executor_.begin();
365  it != weak_groups_to_nodes_associated_with_executor_.end(); )
366  {
367  auto weak_node_ptr = it->second;
368  auto shared_node_ptr = weak_node_ptr.lock();
369  auto group_ptr = it->first.lock();
370 
371  // Increment iterator before removing in case it's invalidated
372  it++;
373  if (shared_node_ptr == node_ptr) {
375  group_ptr,
376  weak_groups_to_nodes_associated_with_executor_,
377  notify);
378  }
379  }
380 
381  std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic();
382  has_executor.store(false);
383 }
384 
385 void
386 Executor::remove_node(std::shared_ptr<rclcpp::Node> node_ptr, bool notify)
387 {
388  this->remove_node(node_ptr->get_node_base_interface(), notify);
389 }
390 
391 void
392 Executor::spin_node_once_nanoseconds(
393  rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node,
394  std::chrono::nanoseconds timeout)
395 {
396  this->add_node(node, false);
397  // non-blocking = true
398  spin_once(timeout);
399  this->remove_node(node, false);
400 }
401 
402 void
403 Executor::spin_node_some(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node)
404 {
405  this->add_node(node, false);
406  spin_some();
407  this->remove_node(node, false);
408 }
409 
410 void
411 Executor::spin_node_some(std::shared_ptr<rclcpp::Node> node)
412 {
413  this->spin_node_some(node->get_node_base_interface());
414 }
415 
416 void Executor::spin_some(std::chrono::nanoseconds max_duration)
417 {
418  return this->spin_some_impl(max_duration, false);
419 }
420 
421 void Executor::spin_all(std::chrono::nanoseconds max_duration)
422 {
423  if (max_duration < 0ns) {
424  throw std::invalid_argument("max_duration must be greater than or equal to 0");
425  }
426  return this->spin_some_impl(max_duration, true);
427 }
428 
429 void
430 Executor::spin_some_impl(std::chrono::nanoseconds max_duration, bool exhaustive)
431 {
432  auto start = std::chrono::steady_clock::now();
433  auto max_duration_not_elapsed = [max_duration, start]() {
434  if (std::chrono::nanoseconds(0) == max_duration) {
435  // told to spin forever if need be
436  return true;
437  } else if (std::chrono::steady_clock::now() - start < max_duration) {
438  // told to spin only for some maximum amount of time
439  return true;
440  }
441  // spun too long
442  return false;
443  };
444 
445  if (spinning.exchange(true)) {
446  throw std::runtime_error("spin_some() called while already spinning");
447  }
448  RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); );
449  bool work_available = false;
450  while (rclcpp::ok(context_) && spinning.load() && max_duration_not_elapsed()) {
451  AnyExecutable any_exec;
452  if (!work_available) {
453  wait_for_work(std::chrono::milliseconds::zero());
454  }
455  if (get_next_ready_executable(any_exec)) {
456  execute_any_executable(any_exec);
457  work_available = true;
458  } else {
459  if (!work_available || !exhaustive) {
460  break;
461  }
462  work_available = false;
463  }
464  }
465 }
466 
467 void
468 Executor::spin_once_impl(std::chrono::nanoseconds timeout)
469 {
470  AnyExecutable any_exec;
471  if (get_next_executable(any_exec, timeout)) {
472  execute_any_executable(any_exec);
473  }
474 }
475 
476 void
477 Executor::spin_once(std::chrono::nanoseconds timeout)
478 {
479  if (spinning.exchange(true)) {
480  throw std::runtime_error("spin_once() called while already spinning");
481  }
482  RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); );
483  spin_once_impl(timeout);
484 }
485 
486 void
488 {
489  spinning.store(false);
490  try {
492  } catch (const rclcpp::exceptions::RCLError & ex) {
493  throw std::runtime_error(
494  std::string("Failed to trigger guard condition in cancel: ") + ex.what());
495  }
496 }
497 
498 void
499 Executor::set_memory_strategy(rclcpp::memory_strategy::MemoryStrategy::SharedPtr memory_strategy)
500 {
501  if (memory_strategy == nullptr) {
502  throw std::runtime_error("Received NULL memory strategy in executor.");
503  }
504  std::lock_guard<std::mutex> guard{mutex_};
505  memory_strategy_ = memory_strategy;
506 }
507 
508 void
510 {
511  if (!spinning.load()) {
512  return;
513  }
514  if (any_exec.timer) {
515  TRACEPOINT(
516  rclcpp_executor_execute,
517  static_cast<const void *>(any_exec.timer->get_timer_handle().get()));
518  execute_timer(any_exec.timer);
519  }
520  if (any_exec.subscription) {
521  TRACEPOINT(
522  rclcpp_executor_execute,
523  static_cast<const void *>(any_exec.subscription->get_subscription_handle().get()));
524  execute_subscription(any_exec.subscription);
525  }
526  if (any_exec.service) {
527  execute_service(any_exec.service);
528  }
529  if (any_exec.client) {
530  execute_client(any_exec.client);
531  }
532  if (any_exec.waitable) {
533  any_exec.waitable->execute(any_exec.data);
534  }
535  // Reset the callback_group, regardless of type
536  any_exec.callback_group->can_be_taken_from().store(true);
537  // Wake the wait, because it may need to be recalculated or work that
538  // was previously blocked is now available.
539  try {
541  } catch (const rclcpp::exceptions::RCLError & ex) {
542  throw std::runtime_error(
543  std::string(
544  "Failed to trigger guard condition from execute_any_executable: ") + ex.what());
545  }
546 }
547 
548 static
549 void
550 take_and_do_error_handling(
551  const char * action_description,
552  const char * topic_or_service_name,
553  std::function<bool()> take_action,
554  std::function<void()> handle_action)
555 {
556  bool taken = false;
557  try {
558  taken = take_action();
559  } catch (const rclcpp::exceptions::RCLError & rcl_error) {
560  RCLCPP_ERROR(
561  rclcpp::get_logger("rclcpp"),
562  "executor %s '%s' unexpectedly failed: %s",
563  action_description,
564  topic_or_service_name,
565  rcl_error.what());
566  }
567  if (taken) {
568  handle_action();
569  } else {
570  // Message or Service was not taken for some reason.
571  // Note that this can be normal, if the underlying middleware needs to
572  // interrupt wait spuriously it is allowed.
573  // So in that case the executor cannot tell the difference in a
574  // spurious wake up and an entity actually having data until trying
575  // to take the data.
576  RCLCPP_DEBUG(
577  rclcpp::get_logger("rclcpp"),
578  "executor %s '%s' failed to take anything",
579  action_description,
580  topic_or_service_name);
581  }
582 }
583 
584 void
585 Executor::execute_subscription(rclcpp::SubscriptionBase::SharedPtr subscription)
586 {
587  rclcpp::MessageInfo message_info;
588  message_info.get_rmw_message_info().from_intra_process = false;
589 
590  if (subscription->is_serialized()) {
591  // This is the case where a copy of the serialized message is taken from
592  // the middleware via inter-process communication.
593  std::shared_ptr<SerializedMessage> serialized_msg = subscription->create_serialized_message();
594  take_and_do_error_handling(
595  "taking a serialized message from topic",
596  subscription->get_topic_name(),
597  [&]() {return subscription->take_serialized(*serialized_msg.get(), message_info);},
598  [&]()
599  {
600  subscription->handle_serialized_message(serialized_msg, message_info);
601  });
602  subscription->return_serialized_message(serialized_msg);
603  } else if (subscription->can_loan_messages()) {
604  // This is the case where a loaned message is taken from the middleware via
605  // inter-process communication, given to the user for their callback,
606  // and then returned.
607  void * loaned_msg = nullptr;
608  // TODO(wjwwood): refactor this into methods on subscription when LoanedMessage
609  // is extened to support subscriptions as well.
610  take_and_do_error_handling(
611  "taking a loaned message from topic",
612  subscription->get_topic_name(),
613  [&]()
614  {
615  rcl_ret_t ret = rcl_take_loaned_message(
616  subscription->get_subscription_handle().get(),
617  &loaned_msg,
618  &message_info.get_rmw_message_info(),
619  nullptr);
620  if (RCL_RET_SUBSCRIPTION_TAKE_FAILED == ret) {
621  return false;
622  } else if (RCL_RET_OK != ret) {
623  rclcpp::exceptions::throw_from_rcl_error(ret);
624  }
625  return true;
626  },
627  [&]() {subscription->handle_loaned_message(loaned_msg, message_info);});
628  if (nullptr != loaned_msg) {
630  subscription->get_subscription_handle().get(),
631  loaned_msg);
632  if (RCL_RET_OK != ret) {
633  RCLCPP_ERROR(
634  rclcpp::get_logger("rclcpp"),
635  "rcl_return_loaned_message_from_subscription() failed for subscription on topic '%s': %s",
636  subscription->get_topic_name(), rcl_get_error_string().str);
637  }
638  loaned_msg = nullptr;
639  }
640  } else {
641  // This case is taking a copy of the message data from the middleware via
642  // inter-process communication.
643  std::shared_ptr<void> message = subscription->create_message();
644  take_and_do_error_handling(
645  "taking a message from topic",
646  subscription->get_topic_name(),
647  [&]() {return subscription->take_type_erased(message.get(), message_info);},
648  [&]() {subscription->handle_message(message, message_info);});
649  // TODO(clalancette): In the case that the user is using the MessageMemoryPool,
650  // and they take a shared_ptr reference to the message in the callback, this can
651  // inadvertently return the message to the pool when the user is still using it.
652  // This is a bug that needs to be fixed in the pool, and we should probably have
653  // a custom deleter for the message that actually does the return_message().
654  subscription->return_message(message);
655  }
656 }
657 
658 void
659 Executor::execute_timer(rclcpp::TimerBase::SharedPtr timer)
660 {
661  timer->execute_callback();
662 }
663 
664 void
665 Executor::execute_service(rclcpp::ServiceBase::SharedPtr service)
666 {
667  auto request_header = service->create_request_header();
668  std::shared_ptr<void> request = service->create_request();
669  take_and_do_error_handling(
670  "taking a service server request from service",
671  service->get_service_name(),
672  [&]() {return service->take_type_erased_request(request.get(), *request_header);},
673  [&]() {service->handle_request(request_header, request);});
674 }
675 
676 void
677 Executor::execute_client(
678  rclcpp::ClientBase::SharedPtr client)
679 {
680  auto request_header = client->create_request_header();
681  std::shared_ptr<void> response = client->create_response();
682  take_and_do_error_handling(
683  "taking a service client response from service",
684  client->get_service_name(),
685  [&]() {return client->take_type_erased_response(response.get(), *request_header);},
686  [&]() {client->handle_response(request_header, response);});
687 }
688 
689 void
690 Executor::wait_for_work(std::chrono::nanoseconds timeout)
691 {
692  TRACEPOINT(rclcpp_executor_wait_for_work, timeout.count());
693  {
694  std::lock_guard<std::mutex> guard(mutex_);
695 
696  // Check weak_nodes_ to find any callback group that is not owned
697  // by an executor and add it to the list of callbackgroups for
698  // collect entities. Also exchange to false so it is not
699  // allowed to add to another executor
701 
702  // Collect the subscriptions and timers to be waited on
703  memory_strategy_->clear_handles();
704  bool has_invalid_weak_groups_or_nodes =
705  memory_strategy_->collect_entities(weak_groups_to_nodes_);
706 
707  if (has_invalid_weak_groups_or_nodes) {
708  std::vector<rclcpp::CallbackGroup::WeakPtr> invalid_group_ptrs;
709  for (auto pair : weak_groups_to_nodes_) {
710  auto weak_group_ptr = pair.first;
711  auto weak_node_ptr = pair.second;
712  if (weak_group_ptr.expired() || weak_node_ptr.expired()) {
713  invalid_group_ptrs.push_back(weak_group_ptr);
714  }
715  }
716  std::for_each(
717  invalid_group_ptrs.begin(), invalid_group_ptrs.end(),
718  [this](rclcpp::CallbackGroup::WeakPtr group_ptr) {
719  if (weak_groups_to_nodes_associated_with_executor_.find(group_ptr) !=
720  weak_groups_to_nodes_associated_with_executor_.end())
721  {
722  weak_groups_to_nodes_associated_with_executor_.erase(group_ptr);
723  }
724  if (weak_groups_associated_with_executor_to_nodes_.find(group_ptr) !=
725  weak_groups_associated_with_executor_to_nodes_.end())
726  {
727  weak_groups_associated_with_executor_to_nodes_.erase(group_ptr);
728  }
729  auto callback_guard_pair = weak_groups_to_guard_conditions_.find(group_ptr);
730  if (callback_guard_pair != weak_groups_to_guard_conditions_.end()) {
731  auto guard_condition = callback_guard_pair->second;
732  weak_groups_to_guard_conditions_.erase(group_ptr);
733  memory_strategy_->remove_guard_condition(guard_condition);
734  }
735  weak_groups_to_nodes_.erase(group_ptr);
736  });
737  }
738 
739  // clear wait set
741  if (ret != RCL_RET_OK) {
742  throw_from_rcl_error(ret, "Couldn't clear wait set");
743  }
744 
745  // The size of waitables are accounted for in size of the other entities
746  ret = rcl_wait_set_resize(
747  &wait_set_, memory_strategy_->number_of_ready_subscriptions(),
748  memory_strategy_->number_of_guard_conditions(), memory_strategy_->number_of_ready_timers(),
749  memory_strategy_->number_of_ready_clients(), memory_strategy_->number_of_ready_services(),
750  memory_strategy_->number_of_ready_events());
751  if (RCL_RET_OK != ret) {
752  throw_from_rcl_error(ret, "Couldn't resize the wait set");
753  }
754 
755  if (!memory_strategy_->add_handles_to_wait_set(&wait_set_)) {
756  throw std::runtime_error("Couldn't fill wait set");
757  }
758  }
759 
760  rcl_ret_t status =
761  rcl_wait(&wait_set_, std::chrono::duration_cast<std::chrono::nanoseconds>(timeout).count());
762  if (status == RCL_RET_WAIT_SET_EMPTY) {
763  RCUTILS_LOG_WARN_NAMED(
764  "rclcpp",
765  "empty wait set received in rcl_wait(). This should never happen.");
766  } else if (status != RCL_RET_OK && status != RCL_RET_TIMEOUT) {
767  using rclcpp::exceptions::throw_from_rcl_error;
768  throw_from_rcl_error(status, "rcl_wait() failed");
769  }
770 
771  // check the null handles in the wait set and remove them from the handles in memory strategy
772  // for callback-based entities
773  std::lock_guard<std::mutex> guard(mutex_);
774  memory_strategy_->remove_null_handles(&wait_set_);
775 }
776 
777 rclcpp::node_interfaces::NodeBaseInterface::SharedPtr
778 Executor::get_node_by_group(
779  const rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap &
780  weak_groups_to_nodes,
781  rclcpp::CallbackGroup::SharedPtr group)
782 {
783  if (!group) {
784  return nullptr;
785  }
786  rclcpp::CallbackGroup::WeakPtr weak_group_ptr(group);
787  const auto finder = weak_groups_to_nodes.find(weak_group_ptr);
788  if (finder != weak_groups_to_nodes.end()) {
789  rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr = finder->second.lock();
790  return node_ptr;
791  }
792  return nullptr;
793 }
794 
795 rclcpp::CallbackGroup::SharedPtr
796 Executor::get_group_by_timer(rclcpp::TimerBase::SharedPtr timer)
797 {
798  std::lock_guard<std::mutex> guard{mutex_};
799  for (const auto & pair : weak_groups_associated_with_executor_to_nodes_) {
800  auto group = pair.first.lock();
801  if (!group) {
802  continue;
803  }
804  auto timer_ref = group->find_timer_ptrs_if(
805  [timer](const rclcpp::TimerBase::SharedPtr & timer_ptr) -> bool {
806  return timer_ptr == timer;
807  });
808  if (timer_ref) {
809  return group;
810  }
811  }
812 
813  for (const auto & pair : weak_groups_to_nodes_associated_with_executor_) {
814  auto group = pair.first.lock();
815  if (!group) {
816  continue;
817  }
818  auto timer_ref = group->find_timer_ptrs_if(
819  [timer](const rclcpp::TimerBase::SharedPtr & timer_ptr) -> bool {
820  return timer_ptr == timer;
821  });
822  if (timer_ref) {
823  return group;
824  }
825  }
826  return nullptr;
827 }
828 
829 bool
830 Executor::get_next_ready_executable(AnyExecutable & any_executable)
831 {
832  bool success = get_next_ready_executable_from_map(any_executable, weak_groups_to_nodes_);
833  return success;
834 }
835 
836 bool
837 Executor::get_next_ready_executable_from_map(
838  AnyExecutable & any_executable,
839  const rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap &
840  weak_groups_to_nodes)
841 {
842  TRACEPOINT(rclcpp_executor_get_next_ready);
843  bool success = false;
844  std::lock_guard<std::mutex> guard{mutex_};
845  // Check the timers to see if there are any that are ready
846  memory_strategy_->get_next_timer(any_executable, weak_groups_to_nodes);
847  if (any_executable.timer) {
848  success = true;
849  }
850  if (!success) {
851  // Check the subscriptions to see if there are any that are ready
852  memory_strategy_->get_next_subscription(any_executable, weak_groups_to_nodes);
853  if (any_executable.subscription) {
854  success = true;
855  }
856  }
857  if (!success) {
858  // Check the services to see if there are any that are ready
859  memory_strategy_->get_next_service(any_executable, weak_groups_to_nodes);
860  if (any_executable.service) {
861  success = true;
862  }
863  }
864  if (!success) {
865  // Check the clients to see if there are any that are ready
866  memory_strategy_->get_next_client(any_executable, weak_groups_to_nodes);
867  if (any_executable.client) {
868  success = true;
869  }
870  }
871  if (!success) {
872  // Check the waitables to see if there are any that are ready
873  memory_strategy_->get_next_waitable(any_executable, weak_groups_to_nodes);
874  if (any_executable.waitable) {
875  any_executable.data = any_executable.waitable->take_data();
876  success = true;
877  }
878  }
879  // At this point any_executable should be valid with either a valid subscription
880  // or a valid timer, or it should be a null shared_ptr
881  if (success) {
882  rclcpp::CallbackGroup::WeakPtr weak_group_ptr = any_executable.callback_group;
883  auto iter = weak_groups_to_nodes.find(weak_group_ptr);
884  if (iter == weak_groups_to_nodes.end()) {
885  success = false;
886  }
887  }
888 
889  if (success) {
890  // If it is valid, check to see if the group is mutually exclusive or
891  // not, then mark it accordingly ..Check if the callback_group belongs to this executor
892  if (any_executable.callback_group && any_executable.callback_group->type() == \
893  CallbackGroupType::MutuallyExclusive)
894  {
895  // It should not have been taken otherwise
896  assert(any_executable.callback_group->can_be_taken_from().load());
897  // Set to false to indicate something is being run from this group
898  // This is reset to true either when the any_executable is executed or when the
899  // any_executable is destructued
900  any_executable.callback_group->can_be_taken_from().store(false);
901  }
902  }
903  // If there is no ready executable, return false
904  return success;
905 }
906 
907 bool
908 Executor::get_next_executable(AnyExecutable & any_executable, std::chrono::nanoseconds timeout)
909 {
910  bool success = false;
911  // Check to see if there are any subscriptions or timers needing service
912  // TODO(wjwwood): improve run to run efficiency of this function
913  success = get_next_ready_executable(any_executable);
914  // If there are none
915  if (!success) {
916  // Wait for subscriptions or timers to work on
917  wait_for_work(timeout);
918  if (!spinning.load()) {
919  return false;
920  }
921  // Try again
922  success = get_next_ready_executable(any_executable);
923  }
924  return success;
925 }
926 
927 // Returns true iff the weak_groups_to_nodes map has node_ptr as the value in any of its entry.
928 bool
930  const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
931  const rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap &
932  weak_groups_to_nodes) const
933 {
934  return std::find_if(
935  weak_groups_to_nodes.begin(),
936  weak_groups_to_nodes.end(),
937  [&](const WeakCallbackGroupsToNodesMap::value_type & other) -> bool {
938  auto other_ptr = other.second.lock();
939  return other_ptr == node_ptr;
940  }) != weak_groups_to_nodes.end();
941 }
942 
943 bool
945 {
946  return spinning;
947 }
rcutils_allocator_t rcl_allocator_t
Encapsulation of an allocator.
Definition: allocator.h:31
Coordinate the order and timing of available communication tasks.
Definition: executor.hpp:66
RCLCPP_PUBLIC void spin_node_some(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node)
Add a node, complete all immediately available work, and remove the node.
Definition: executor.cpp:403
rcl_wait_set_t wait_set_
Wait set for managing entities that the rmw layer waits on.
Definition: executor.hpp:545
RCLCPP_PUBLIC void set_memory_strategy(memory_strategy::MemoryStrategy::SharedPtr memory_strategy)
Support dynamic switching of the memory strategy.
Definition: executor.cpp:499
virtual RCLCPP_PUBLIC ~Executor()
Default destructor.
Definition: executor.cpp:85
virtual RCLCPP_PUBLIC std::vector< rclcpp::CallbackGroup::WeakPtr > get_manually_added_callback_groups()
Get callback groups that belong to executor.
Definition: executor.cpp:150
virtual RCLCPP_PUBLIC void remove_callback_group(rclcpp::CallbackGroup::SharedPtr group_ptr, bool notify=true)
Remove a callback group from the executor.
Definition: executor.cpp:324
virtual RCLCPP_PUBLIC void add_callback_group(rclcpp::CallbackGroup::SharedPtr group_ptr, rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify=true)
Add a callback group to an executor.
Definition: executor.cpp:239
RCLCPP_PUBLIC void wait_for_work(std::chrono::nanoseconds timeout=std::chrono::nanoseconds(-1))
Definition: executor.cpp:690
virtual RCLCPP_PUBLIC void remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify=true)
Remove a node from the executor.
Definition: executor.cpp:342
virtual RCLCPP_PUBLIC void spin_some(std::chrono::nanoseconds max_duration=std::chrono::nanoseconds(0))
Collect work once and execute all available work, optionally within a duration.
Definition: executor.cpp:416
RCLCPP_PUBLIC void cancel()
Cancel any running spin* function, causing it to return.
Definition: executor.cpp:487
RCLCPP_PUBLIC bool is_spinning()
Returns true if the executor is currently spinning.
Definition: executor.cpp:944
std::shared_ptr< rclcpp::Context > context_
The context associated with this executor.
Definition: executor.hpp:555
virtual RCLCPP_PUBLIC std::vector< rclcpp::CallbackGroup::WeakPtr > get_automatically_added_callback_groups_from_nodes()
Get callback groups that belong to executor.
Definition: executor.cpp:161
rclcpp::GuardCondition interrupt_guard_condition_
Guard condition for signaling the rmw layer to wake up for special events.
Definition: executor.hpp:540
rclcpp::OnShutdownCallbackHandle shutdown_callback_handle_
shutdown callback handle registered to Context
Definition: executor.hpp:589
virtual RCLCPP_PUBLIC void add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify=true)
Add a node to the executor.
Definition: executor.cpp:253
RCLCPP_PUBLIC void execute_any_executable(AnyExecutable &any_exec)
Find the next available executable and do the work associated with it.
Definition: executor.cpp:509
virtual RCLCPP_PUBLIC void add_callback_group_to_map(rclcpp::CallbackGroup::SharedPtr group_ptr, rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, WeakCallbackGroupsToNodesMap &weak_groups_to_nodes, bool notify=true) RCPPUTILS_TSA_REQUIRES(mutex_)
Add a callback group to an executor.
Definition: executor.cpp:196
virtual RCLCPP_PUBLIC void add_callback_groups_from_nodes_associated_to_executor() RCPPUTILS_TSA_REQUIRES(mutex_)
Add all callback groups that can be automatically added from associated nodes.
Definition: executor.cpp:172
virtual RCLCPP_PUBLIC void remove_callback_group_from_map(rclcpp::CallbackGroup::SharedPtr group_ptr, WeakCallbackGroupsToNodesMap &weak_groups_to_nodes, bool notify=true) RCPPUTILS_TSA_REQUIRES(mutex_)
Remove a callback group from the executor.
Definition: executor.cpp:281
RCLCPP_PUBLIC bool has_node(const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, const WeakCallbackGroupsToNodesMap &weak_groups_to_nodes) const
Return true if the node has been added to this executor.
Definition: executor.cpp:929
virtual RCLCPP_PUBLIC void spin_all(std::chrono::nanoseconds max_duration)
Collect and execute work repeatedly within a duration or until no more work is available.
Definition: executor.cpp:421
std::atomic_bool spinning
Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins.
Definition: executor.hpp:537
A condition that can be waited on in a single wait set and asynchronously triggered.
RCLCPP_PUBLIC void trigger()
Notify the wait set waiting on this condition, if any, that the condition had been met.
Additional meta data about messages taken from subscriptions.
const rmw_message_info_t & get_rmw_message_info() const
Return the message info as the underlying rmw message info type.
Created when the return code does not match one of the other specialized exceptions.
Definition: exceptions.hpp:153
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
FutureReturnCode
Return codes to be used with spin_until_future_complete.
RCLCPP_PUBLIC bool ok(rclcpp::Context::SharedPtr context=nullptr)
Check rclcpp's status.
RCLCPP_PUBLIC Logger get_logger(const std::string &name)
Return a named logger.
Definition: logger.cpp:27
Options to be passed to the executor constructor.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_return_loaned_message_from_subscription(const rcl_subscription_t *subscription, void *loaned_message)
Return a loaned message from a topic using a rcl subscription.
Definition: subscription.c:648
#define RCL_RET_WAIT_SET_EMPTY
Given rcl_wait_set_t is empty return code.
Definition: types.h:98
#define RCL_RET_OK
Success return code.
Definition: types.h:26
#define RCL_RET_TIMEOUT
Timeout occurred return code.
Definition: types.h:30
rmw_ret_t rcl_ret_t
The type that holds an rcl return code.
Definition: types.h:23
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_wait_set_init(rcl_wait_set_t *wait_set, size_t number_of_subscriptions, size_t number_of_guard_conditions, size_t number_of_timers, size_t number_of_clients, size_t number_of_services, size_t number_of_events, rcl_context_t *context, rcl_allocator_t allocator)
Initialize a rcl wait set with space for items to be waited on.
Definition: wait.c:103
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_wait_set_clear(rcl_wait_set_t *wait_set)
Remove (sets to NULL) all entities in the wait set.
Definition: wait.c:334
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_wait_set_fini(rcl_wait_set_t *wait_set)
Finalize a rcl wait set.
Definition: wait.c:189
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_wait(rcl_wait_set_t *wait_set, int64_t timeout)
Block until the wait set is ready or until the timeout has been exceeded.
Definition: wait.c:522
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_wait_set_resize(rcl_wait_set_t *wait_set, size_t subscriptions_size, size_t guard_conditions_size, size_t timers_size, size_t clients_size, size_t services_size, size_t events_size)
Reallocate space for entities in the wait set.
Definition: wait.c:376