ROS 2 rclcpp + rcl - kilted  kilted
ROS 2 C++ Client Library with ROS Client Library
multi_threaded_executor.cpp
1 // Copyright 2015 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "rclcpp/executors/multi_threaded_executor.hpp"
16 
17 #include <chrono>
18 #include <functional>
19 #include <memory>
20 #include <vector>
21 
22 #include "rcpputils/scope_exit.hpp"
23 
24 #include "rclcpp/logging.hpp"
25 #include "rclcpp/utilities.hpp"
26 
28 
29 MultiThreadedExecutor::MultiThreadedExecutor(
30  const rclcpp::ExecutorOptions & options,
31  size_t number_of_threads,
32  bool yield_before_execute,
33  std::chrono::nanoseconds next_exec_timeout)
34 : rclcpp::Executor(options),
35  yield_before_execute_(yield_before_execute),
36  next_exec_timeout_(next_exec_timeout)
37 {
38  number_of_threads_ = number_of_threads > 0 ?
39  number_of_threads :
40  std::max(std::thread::hardware_concurrency(), 2U);
41 
42  if (number_of_threads_ == 1) {
43  RCLCPP_WARN(
44  rclcpp::get_logger("rclcpp"),
45  "MultiThreadedExecutor is used with a single thread.\n"
46  "Use the SingleThreadedExecutor instead.");
47  }
48 }
49 
50 MultiThreadedExecutor::~MultiThreadedExecutor() {}
51 
52 void
54 {
55  if (spinning.exchange(true)) {
56  throw std::runtime_error("spin() called while already spinning");
57  }
58  RCPPUTILS_SCOPE_EXIT(wait_result_.reset();this->spinning.store(false););
59  std::vector<std::thread> threads;
60  size_t thread_id = 0;
61  {
62  std::lock_guard wait_lock{wait_mutex_};
63  for (; thread_id < number_of_threads_ - 1; ++thread_id) {
64  auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id);
65  threads.emplace_back(func);
66  }
67  }
68 
69  run(thread_id);
70  for (auto & thread : threads) {
71  thread.join();
72  }
73 }
74 
75 size_t
76 MultiThreadedExecutor::get_number_of_threads()
77 {
78  return number_of_threads_;
79 }
80 
81 void
82 MultiThreadedExecutor::run([[maybe_unused]] size_t this_thread_number)
83 {
84  while (rclcpp::ok(this->context_) && spinning.load()) {
85  rclcpp::AnyExecutable any_exec;
86  {
87  std::lock_guard wait_lock{wait_mutex_};
88  if (!rclcpp::ok(this->context_) || !spinning.load()) {
89  return;
90  }
91  if (!get_next_executable(any_exec, next_exec_timeout_)) {
92  continue;
93  }
94  }
95  if (yield_before_execute_) {
96  std::this_thread::yield();
97  }
98 
99  execute_any_executable(any_exec);
100 
101  if (any_exec.callback_group &&
102  any_exec.callback_group->type() == CallbackGroupType::MutuallyExclusive)
103  {
104  try {
105  interrupt_guard_condition_->trigger();
106  } catch (const rclcpp::exceptions::RCLError & ex) {
107  throw std::runtime_error(
108  std::string(
109  "Failed to trigger guard condition on callback group change: ") + ex.what());
110  }
111  }
112 
113  // Clear the callback_group to prevent the AnyExecutable destructor from
114  // resetting the callback group `can_be_taken_from`
115  any_exec.callback_group.reset();
116  }
117 }
Coordinate the order and timing of available communication tasks.
Definition: executor.hpp:65
std::shared_ptr< rclcpp::GuardCondition > interrupt_guard_condition_
Guard condition for signaling the rmw layer to wake up for special events.
Definition: executor.hpp:564
RCLCPP_PUBLIC bool get_next_executable(AnyExecutable &any_executable, std::chrono::nanoseconds timeout=std::chrono::nanoseconds(-1))
Wait for executable in ready state and populate union structure.
Definition: executor.cpp:897
std::shared_ptr< rclcpp::Context > context_
The context associated with this executor.
Definition: executor.hpp:572
RCLCPP_PUBLIC void execute_any_executable(AnyExecutable &any_exec)
Find the next available executable and do the work associated with it.
Definition: executor.cpp:467
std::atomic_bool spinning
Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins.
Definition: executor.hpp:561
Created when the return code does not match one of the other specialized exceptions.
Definition: exceptions.hpp:162
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
RCLCPP_PUBLIC bool ok(rclcpp::Context::SharedPtr context=nullptr)
Check rclcpp's status.
RCLCPP_PUBLIC Logger get_logger(const std::string &name)
Return a named logger.
Definition: logger.cpp:34
Options to be passed to the executor constructor.