ROS 2 rclcpp + rcl - humble  humble
ROS 2 C++ Client Library with ROS Client Library
multi_threaded_executor.cpp
1 // Copyright 2015 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "rclcpp/executors/multi_threaded_executor.hpp"
16 
17 #include <chrono>
18 #include <functional>
19 #include <memory>
20 #include <vector>
21 
22 #include "rcpputils/scope_exit.hpp"
23 
24 #include "rclcpp/utilities.hpp"
25 
27 
28 MultiThreadedExecutor::MultiThreadedExecutor(
29  const rclcpp::ExecutorOptions & options,
30  size_t number_of_threads,
31  bool yield_before_execute,
32  std::chrono::nanoseconds next_exec_timeout)
33 : rclcpp::Executor(options),
34  yield_before_execute_(yield_before_execute),
35  next_exec_timeout_(next_exec_timeout)
36 {
37  number_of_threads_ = number_of_threads ? number_of_threads : std::thread::hardware_concurrency();
38  if (number_of_threads_ == 0) {
39  number_of_threads_ = 1;
40  }
41 }
42 
43 MultiThreadedExecutor::~MultiThreadedExecutor() {}
44 
45 void
47 {
48  if (spinning.exchange(true)) {
49  throw std::runtime_error("spin() called while already spinning");
50  }
51  RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); );
52  std::vector<std::thread> threads;
53  size_t thread_id = 0;
54  {
55  std::lock_guard wait_lock{wait_mutex_};
56  for (; thread_id < number_of_threads_ - 1; ++thread_id) {
57  auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id);
58  threads.emplace_back(func);
59  }
60  }
61 
62  run(thread_id);
63  for (auto & thread : threads) {
64  thread.join();
65  }
66 }
67 
68 size_t
69 MultiThreadedExecutor::get_number_of_threads()
70 {
71  return number_of_threads_;
72 }
73 
74 void
75 MultiThreadedExecutor::run(size_t this_thread_number)
76 {
77  (void)this_thread_number;
78  while (rclcpp::ok(this->context_) && spinning.load()) {
79  rclcpp::AnyExecutable any_exec;
80  {
81  std::lock_guard wait_lock{wait_mutex_};
82  if (!rclcpp::ok(this->context_) || !spinning.load()) {
83  return;
84  }
85  if (!get_next_executable(any_exec, next_exec_timeout_)) {
86  continue;
87  }
88  }
89  if (yield_before_execute_) {
90  std::this_thread::yield();
91  }
92 
93  execute_any_executable(any_exec);
94 
95  // Clear the callback_group to prevent the AnyExecutable destructor from
96  // resetting the callback group `can_be_taken_from`
97  any_exec.callback_group.reset();
98  }
99 }
Coordinate the order and timing of available communication tasks.
Definition: executor.hpp:66
std::shared_ptr< rclcpp::Context > context_
The context associated with this executor.
Definition: executor.hpp:555
RCLCPP_PUBLIC void execute_any_executable(AnyExecutable &any_exec)
Find the next available executable and do the work associated with it.
Definition: executor.cpp:509
std::atomic_bool spinning
Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins.
Definition: executor.hpp:537
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
RCLCPP_PUBLIC bool ok(rclcpp::Context::SharedPtr context=nullptr)
Check rclcpp's status.
Options to be passed to the executor constructor.