ROS 2 rclcpp + rcl - jazzy  jazzy
ROS 2 C++ Client Library with ROS Client Library
multi_threaded_executor.cpp
1 // Copyright 2015 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "rclcpp/executors/multi_threaded_executor.hpp"
16 
17 #include <chrono>
18 #include <functional>
19 #include <memory>
20 #include <vector>
21 
22 #include "rcpputils/scope_exit.hpp"
23 
24 #include "rclcpp/logging.hpp"
25 #include "rclcpp/utilities.hpp"
26 
28 
29 MultiThreadedExecutor::MultiThreadedExecutor(
30  const rclcpp::ExecutorOptions & options,
31  size_t number_of_threads,
32  bool yield_before_execute,
33  std::chrono::nanoseconds next_exec_timeout)
34 : rclcpp::Executor(options),
35  yield_before_execute_(yield_before_execute),
36  next_exec_timeout_(next_exec_timeout)
37 {
38  number_of_threads_ = number_of_threads > 0 ?
39  number_of_threads :
40  std::max(std::thread::hardware_concurrency(), 2U);
41 
42  if (number_of_threads_ == 1) {
43  RCLCPP_WARN(
44  rclcpp::get_logger("rclcpp"),
45  "MultiThreadedExecutor is used with a single thread.\n"
46  "Use the SingleThreadedExecutor instead.");
47  }
48 }
49 
50 MultiThreadedExecutor::~MultiThreadedExecutor() {}
51 
52 void
54 {
55  if (spinning.exchange(true)) {
56  throw std::runtime_error("spin() called while already spinning");
57  }
58  RCPPUTILS_SCOPE_EXIT(wait_result_.reset();this->spinning.store(false););
59  std::vector<std::thread> threads;
60  size_t thread_id = 0;
61  {
62  std::lock_guard wait_lock{wait_mutex_};
63  for (; thread_id < number_of_threads_ - 1; ++thread_id) {
64  auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id);
65  threads.emplace_back(func);
66  }
67  }
68 
69  run(thread_id);
70  for (auto & thread : threads) {
71  thread.join();
72  }
73 }
74 
75 size_t
76 MultiThreadedExecutor::get_number_of_threads()
77 {
78  return number_of_threads_;
79 }
80 
81 void
82 MultiThreadedExecutor::run(size_t this_thread_number)
83 {
84  (void)this_thread_number;
85  while (rclcpp::ok(this->context_) && spinning.load()) {
86  rclcpp::AnyExecutable any_exec;
87  {
88  std::lock_guard wait_lock{wait_mutex_};
89  if (!rclcpp::ok(this->context_) || !spinning.load()) {
90  return;
91  }
92  if (!get_next_executable(any_exec, next_exec_timeout_)) {
93  continue;
94  }
95  }
96  if (yield_before_execute_) {
97  std::this_thread::yield();
98  }
99 
100  execute_any_executable(any_exec);
101 
102  if (any_exec.callback_group &&
103  any_exec.callback_group->type() == CallbackGroupType::MutuallyExclusive)
104  {
105  try {
106  interrupt_guard_condition_->trigger();
107  } catch (const rclcpp::exceptions::RCLError & ex) {
108  throw std::runtime_error(
109  std::string(
110  "Failed to trigger guard condition on callback group change: ") + ex.what());
111  }
112  }
113 
114  // Clear the callback_group to prevent the AnyExecutable destructor from
115  // resetting the callback group `can_be_taken_from`
116  any_exec.callback_group.reset();
117  }
118 }
Coordinate the order and timing of available communication tasks.
Definition: executor.hpp:65
std::shared_ptr< rclcpp::GuardCondition > interrupt_guard_condition_
Guard condition for signaling the rmw layer to wake up for special events.
Definition: executor.hpp:551
RCLCPP_PUBLIC bool get_next_executable(AnyExecutable &any_executable, std::chrono::nanoseconds timeout=std::chrono::nanoseconds(-1))
Wait for executable in ready state and populate union structure.
Definition: executor.cpp:914
std::shared_ptr< rclcpp::Context > context_
The context associated with this executor.
Definition: executor.hpp:559
RCLCPP_PUBLIC void execute_any_executable(AnyExecutable &any_exec)
Find the next available executable and do the work associated with it.
Definition: executor.cpp:468
std::atomic_bool spinning
Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins.
Definition: executor.hpp:548
Created when the return code does not match one of the other specialized exceptions.
Definition: exceptions.hpp:153
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
RCLCPP_PUBLIC bool ok(rclcpp::Context::SharedPtr context=nullptr)
Check rclcpp's status.
RCLCPP_PUBLIC Logger get_logger(const std::string &name)
Return a named logger.
Definition: logger.cpp:33
Options to be passed to the executor constructor.