ROS 2 rclcpp + rcl - jazzy  jazzy
ROS 2 C++ Client Library with ROS Client Library
static_single_threaded_executor.cpp
1 // Copyright 2019 Nobleo Technology
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "rclcpp/executors/executor_entities_collection.hpp"
16 #include "rcpputils/scope_exit.hpp"
17 
18 #include "rclcpp/executors/static_single_threaded_executor.hpp"
19 #include "rclcpp/any_executable.hpp"
20 
22 
23 StaticSingleThreadedExecutor::StaticSingleThreadedExecutor(const rclcpp::ExecutorOptions & options)
24 : rclcpp::Executor(options)
25 {
26 }
27 
29 
30 void
32 {
33  if (spinning.exchange(true)) {
34  throw std::runtime_error("spin() called while already spinning");
35  }
36  RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); );
37 
38  // This is essentially the contents of the rclcpp::Executor::wait_for_work method,
39  // except we need to keep the wait result to reproduce the StaticSingleThreadedExecutor
40  // behavior.
41  while (rclcpp::ok(this->context_) && spinning.load()) {
42  this->spin_once_impl(std::chrono::nanoseconds(-1));
43  }
44 }
45 
46 void
47 StaticSingleThreadedExecutor::spin_some(std::chrono::nanoseconds max_duration)
48 {
49  // In this context a 0 input max_duration means no duration limit
50  if (std::chrono::nanoseconds(0) == max_duration) {
51  max_duration = std::chrono::nanoseconds::max();
52  }
53  return this->spin_some_impl(max_duration, false);
54 }
55 
56 void
57 StaticSingleThreadedExecutor::spin_all(std::chrono::nanoseconds max_duration)
58 {
59  if (max_duration < std::chrono::nanoseconds(0)) {
60  throw std::invalid_argument("max_duration must be greater than or equal to 0");
61  }
62  return this->spin_some_impl(max_duration, true);
63 }
64 
65 void
66 StaticSingleThreadedExecutor::spin_some_impl(std::chrono::nanoseconds max_duration, bool exhaustive)
67 {
68  auto start = std::chrono::steady_clock::now();
69  auto max_duration_not_elapsed = [max_duration, start]() {
70  const auto spin_forever = std::chrono::nanoseconds(0) == max_duration;
71  const auto cur_duration = std::chrono::steady_clock::now() - start;
72  return spin_forever || (cur_duration < max_duration);
73  };
74 
75  if (spinning.exchange(true)) {
76  throw std::runtime_error("spin_some() called while already spinning");
77  }
78  RCPPUTILS_SCOPE_EXIT(this->spinning.store(false););
79 
80  while (rclcpp::ok(context_) && spinning.load() && max_duration_not_elapsed()) {
81  // Get executables that are ready now
82  std::lock_guard<std::mutex> guard(mutex_);
83 
84  auto wait_result = this->collect_and_wait(std::chrono::nanoseconds(0));
85  if (wait_result.has_value()) {
86  // Execute ready executables
87  bool work_available = this->execute_ready_executables(
88  current_collection_,
89  wait_result.value(),
90  false);
91  if (!work_available || !exhaustive) {
92  break;
93  }
94  }
95  }
96 }
97 
98 void
99 StaticSingleThreadedExecutor::spin_once_impl(std::chrono::nanoseconds timeout)
100 {
101  if (rclcpp::ok(context_) && spinning.load()) {
102  std::lock_guard<std::mutex> guard(mutex_);
103  auto wait_result = this->collect_and_wait(timeout);
104  if (wait_result.has_value()) {
105  this->execute_ready_executables(current_collection_, wait_result.value(), true);
106  }
107  }
108 }
109 
110 std::optional<rclcpp::WaitResult<rclcpp::WaitSet>>
111 StaticSingleThreadedExecutor::collect_and_wait(std::chrono::nanoseconds timeout)
112 {
113  // we need to make sure that callback groups don't get out of scope
114  // during the wait. As in jazzy, they are not covered by the DynamicStorage,
115  // we explicitly hold them here as a bugfix
116  std::vector<rclcpp::CallbackGroup::SharedPtr> cbgs;
117 
118  if (this->entities_need_rebuild_.exchange(false) || current_collection_.empty()) {
119  this->collect_entities();
120  }
121 
122  auto callback_groups = this->collector_.get_all_callback_groups();
123  cbgs.resize(callback_groups.size());
124  for(const auto & w_ptr : callback_groups) {
125  auto shr_ptr = w_ptr.lock();
126  if(shr_ptr) {
127  cbgs.push_back(std::move(shr_ptr));
128  }
129  }
130 
131  auto wait_result = wait_set_.wait(std::chrono::nanoseconds(timeout));
132 
133  // drop references to the callback groups, before trying to execute anything
134  cbgs.clear();
135 
136  if (wait_result.kind() == WaitResultKind::Empty) {
137  RCUTILS_LOG_WARN_NAMED(
138  "rclcpp",
139  "empty wait set received in wait(). This should never happen.");
140  return {};
141  } else {
142  if (wait_result.kind() == WaitResultKind::Ready && current_notify_waitable_) {
143  auto & rcl_wait_set = wait_result.get_wait_set().get_rcl_wait_set();
144  if (current_notify_waitable_->is_ready(rcl_wait_set)) {
145  current_notify_waitable_->execute(current_notify_waitable_->take_data());
146  }
147  }
148  }
149  return wait_result;
150 }
151 
152 // This preserves the "scheduling semantics" of the StaticSingleThreadedExecutor
153 // from the original implementation.
157  bool spin_once)
158 {
159  bool any_ready_executable = false;
160  if (wait_result.kind() != rclcpp::WaitResultKind::Ready) {
161  return any_ready_executable;
162  }
163 
164  while (auto subscription = wait_result.next_ready_subscription()) {
165  auto entity_iter = collection.subscriptions.find(subscription->get_subscription_handle().get());
166  if (entity_iter != collection.subscriptions.end()) {
167  execute_subscription(subscription);
168  any_ready_executable = true;
169  if (spin_once) {return any_ready_executable;}
170  }
171  }
172 
173  size_t current_timer_index = 0;
174  while (true) {
175  auto [timer, timer_index] = wait_result.peek_next_ready_timer(current_timer_index);
176  if (nullptr == timer) {
177  break;
178  }
179  current_timer_index = timer_index;
180  auto entity_iter = collection.timers.find(timer->get_timer_handle().get());
181  if (entity_iter != collection.timers.end()) {
182  wait_result.clear_timer_with_index(current_timer_index);
183  auto data = timer->call();
184  if (!data) {
185  // someone canceled the timer between is_ready and call
186  continue;
187  }
188 
189  execute_timer(std::move(timer), data);
190  any_ready_executable = true;
191  if (spin_once) {return any_ready_executable;}
192  }
193  }
194 
195  while (auto client = wait_result.next_ready_client()) {
196  auto entity_iter = collection.clients.find(client->get_client_handle().get());
197  if (entity_iter != collection.clients.end()) {
198  execute_client(client);
199  any_ready_executable = true;
200  if (spin_once) {return any_ready_executable;}
201  }
202  }
203 
204  while (auto service = wait_result.next_ready_service()) {
205  auto entity_iter = collection.services.find(service->get_service_handle().get());
206  if (entity_iter != collection.services.end()) {
207  execute_service(service);
208  any_ready_executable = true;
209  if (spin_once) {return any_ready_executable;}
210  }
211  }
212 
213  while (auto waitable = wait_result.next_ready_waitable()) {
214  auto entity_iter = collection.waitables.find(waitable.get());
215  if (entity_iter != collection.waitables.end()) {
216  const auto data = waitable->take_data();
217  waitable->execute(data);
218  any_ready_executable = true;
219  if (spin_once) {return any_ready_executable;}
220  }
221  }
222  return any_ready_executable;
223 }
Coordinate the order and timing of available communication tasks.
Definition: executor.hpp:65
static RCLCPP_PUBLIC void execute_service(rclcpp::ServiceBase::SharedPtr service)
Run service server executable.
Definition: executor.cpp:650
virtual RCLCPP_PUBLIC void spin_once(std::chrono::nanoseconds timeout=std::chrono::nanoseconds(-1))
Collect work once and execute the next available work, optionally within a duration.
Definition: executor.cpp:446
std::shared_ptr< rclcpp::Context > context_
The context associated with this executor.
Definition: executor.hpp:559
static RCLCPP_PUBLIC void execute_timer(rclcpp::TimerBase::SharedPtr timer, const std::shared_ptr< void > &data_ptr)
Run timer executable.
Definition: executor.cpp:644
static RCLCPP_PUBLIC void execute_subscription(rclcpp::SubscriptionBase::SharedPtr subscription)
Run subscription executable.
Definition: executor.cpp:543
RCLCPP_PUBLIC void collect_entities()
Gather all of the waitable entities from associated nodes and callback groups.
Definition: executor.cpp:674
static RCLCPP_PUBLIC void execute_client(rclcpp::ClientBase::SharedPtr client)
Run service client executable.
Definition: executor.cpp:662
rclcpp::executors::ExecutorEntitiesCollector collector_
Collector used to associate executable entities from nodes and guard conditions.
Definition: executor.hpp:579
std::atomic_bool spinning
Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins.
Definition: executor.hpp:548
Interface for introspecting a wait set after waiting on it.
Definition: wait_result.hpp:63
std::shared_ptr< rclcpp::SubscriptionBase > next_ready_subscription()
Get the next ready subscription, clearing it from the wait result.
std::pair< std::shared_ptr< rclcpp::TimerBase >, size_t > peek_next_ready_timer(size_t start_index=0)
Get the next ready timer and its index in the wait result, but do not clear it.
std::shared_ptr< rclcpp::ServiceBase > next_ready_service()
Get the next ready service, clearing it from the wait result.
WaitResultKind kind() const
Return the kind of the WaitResult.
Definition: wait_result.hpp:96
void clear_timer_with_index(size_t index)
Clear the timer at the given index.
std::shared_ptr< rclcpp::ClientBase > next_ready_client()
Get the next ready client, clearing it from the wait result.
std::shared_ptr< rclcpp::Waitable > next_ready_waitable()
Get the next ready waitable, clearing it from the wait result.
RCLCPP_PUBLIC std::vector< rclcpp::CallbackGroup::WeakPtr > get_all_callback_groups() const
Get all callback groups known to this entity collector.
RCLCPP_PUBLIC void spin_some(std::chrono::nanoseconds max_duration=std::chrono::nanoseconds(0)) override
Static executor implementation of spin some.
RCLCPP_PUBLIC void spin() override
Static executor implementation of spin.
virtual RCLCPP_PUBLIC ~StaticSingleThreadedExecutor()
Default destructor.
RCLCPP_PUBLIC void spin_all(std::chrono::nanoseconds max_duration) override
Static executor implementation of spin all.
bool execute_ready_executables(const rclcpp::executors::ExecutorEntitiesCollection &collection, rclcpp::WaitResult< rclcpp::WaitSet > &wait_result, bool spin_once)
Executes ready executables from wait set.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
RCLCPP_PUBLIC bool ok(rclcpp::Context::SharedPtr context=nullptr)
Check rclcpp's status.
Options to be passed to the executor constructor.
Represent the total set of entities for a single executor.
TimerCollection timers
Collection of timers currently in use by the executor.
ServiceCollection services
Collection of services currently in use by the executor.
SubscriptionCollection subscriptions
Collection of subscriptions currently in use by the executor.
WaitableCollection waitables
Collection of waitables currently in use by the executor.
ClientCollection clients
Collection of clients currently in use by the executor.