ROS 2 rclcpp + rcl - humble  humble
ROS 2 C++ Client Library with ROS Client Library
static_single_threaded_executor.cpp
1 // Copyright 2019 Nobleo Technology
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "rclcpp/executors/static_single_threaded_executor.hpp"
16 
17 #include <chrono>
18 #include <memory>
19 #include <utility>
20 #include <vector>
21 
22 #include "rcpputils/scope_exit.hpp"
23 
26 
27 StaticSingleThreadedExecutor::StaticSingleThreadedExecutor(
28  const rclcpp::ExecutorOptions & options)
29 : rclcpp::Executor(options)
30 {
31  entities_collector_ = std::make_shared<StaticExecutorEntitiesCollector>();
32 }
33 
35 {
36  if (entities_collector_->is_init()) {
37  entities_collector_->fini();
38  }
39 }
40 
41 void
43 {
44  if (spinning.exchange(true)) {
45  throw std::runtime_error("spin() called while already spinning");
46  }
47  RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); );
48 
49  // Set memory_strategy_ and exec_list_ based on weak_nodes_
50  // Prepare wait_set_ based on memory_strategy_
51  entities_collector_->init(&wait_set_, memory_strategy_);
52 
53  while (rclcpp::ok(this->context_) && spinning.load()) {
54  // Refresh wait set and wait for work
55  entities_collector_->refresh_wait_set();
57  }
58 }
59 
60 void
61 StaticSingleThreadedExecutor::spin_some(std::chrono::nanoseconds max_duration)
62 {
63  // In this context a 0 input max_duration means no duration limit
64  if (std::chrono::nanoseconds(0) == max_duration) {
65  max_duration = std::chrono::nanoseconds::max();
66  }
67 
68  return this->spin_some_impl(max_duration, false);
69 }
70 
71 void
72 StaticSingleThreadedExecutor::spin_all(std::chrono::nanoseconds max_duration)
73 {
74  if (max_duration < std::chrono::nanoseconds(0)) {
75  throw std::invalid_argument("max_duration must be greater than or equal to 0");
76  }
77  return this->spin_some_impl(max_duration, true);
78 }
79 
80 void
81 StaticSingleThreadedExecutor::spin_some_impl(std::chrono::nanoseconds max_duration, bool exhaustive)
82 {
83  // Make sure the entities collector has been initialized
84  if (!entities_collector_->is_init()) {
85  entities_collector_->init(&wait_set_, memory_strategy_);
86  }
87 
88  auto start = std::chrono::steady_clock::now();
89  auto max_duration_not_elapsed = [max_duration, start]() {
90  if (std::chrono::nanoseconds(0) == max_duration) {
91  // told to spin forever if need be
92  return true;
93  } else if (std::chrono::steady_clock::now() - start < max_duration) {
94  // told to spin only for some maximum amount of time
95  return true;
96  }
97  // spun too long
98  return false;
99  };
100 
101  if (spinning.exchange(true)) {
102  throw std::runtime_error("spin_some() called while already spinning");
103  }
104  RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); );
105 
106  while (rclcpp::ok(context_) && spinning.load() && max_duration_not_elapsed()) {
107  // Get executables that are ready now
108  entities_collector_->refresh_wait_set(std::chrono::milliseconds::zero());
109  // Execute ready executables
110  bool work_available = execute_ready_executables();
111  if (!work_available || !exhaustive) {
112  break;
113  }
114  }
115 }
116 
117 void
118 StaticSingleThreadedExecutor::spin_once_impl(std::chrono::nanoseconds timeout)
119 {
120  // Make sure the entities collector has been initialized
121  if (!entities_collector_->is_init()) {
122  entities_collector_->init(&wait_set_, memory_strategy_);
123  }
124 
125  if (rclcpp::ok(context_) && spinning.load()) {
126  // Wait until we have a ready entity or timeout expired
127  entities_collector_->refresh_wait_set(timeout);
128  // Execute ready executables
130  }
131 }
132 
133 void
135  rclcpp::CallbackGroup::SharedPtr group_ptr,
136  rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
137  bool notify)
138 {
139  bool is_new_node = entities_collector_->add_callback_group(group_ptr, node_ptr);
140  if (is_new_node && notify) {
141  // Interrupt waiting to handle new node
143  }
144 }
145 
146 void
148  rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify)
149 {
150  bool is_new_node = entities_collector_->add_node(node_ptr);
151  if (is_new_node && notify) {
152  // Interrupt waiting to handle new node
154  }
155 }
156 
157 void
158 StaticSingleThreadedExecutor::add_node(std::shared_ptr<rclcpp::Node> node_ptr, bool notify)
159 {
160  this->add_node(node_ptr->get_node_base_interface(), notify);
161 }
162 
163 void
165  rclcpp::CallbackGroup::SharedPtr group_ptr, bool notify)
166 {
167  bool node_removed = entities_collector_->remove_callback_group(group_ptr);
168  // If the node was matched and removed, interrupt waiting
169  if (node_removed && notify) {
171  }
172 }
173 
174 void
176  rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify)
177 {
178  bool node_removed = entities_collector_->remove_node(node_ptr);
179  if (!node_removed) {
180  throw std::runtime_error("Node needs to be associated with this executor.");
181  }
182  // If the node was matched and removed, interrupt waiting
183  if (notify) {
185  }
186 }
187 
188 std::vector<rclcpp::CallbackGroup::WeakPtr>
190 {
191  return entities_collector_->get_all_callback_groups();
192 }
193 
194 std::vector<rclcpp::CallbackGroup::WeakPtr>
196 {
197  return entities_collector_->get_manually_added_callback_groups();
198 }
199 
200 std::vector<rclcpp::CallbackGroup::WeakPtr>
202 {
203  return entities_collector_->get_automatically_added_callback_groups_from_nodes();
204 }
205 
206 void
207 StaticSingleThreadedExecutor::remove_node(std::shared_ptr<rclcpp::Node> node_ptr, bool notify)
208 {
209  this->remove_node(node_ptr->get_node_base_interface(), notify);
210 }
211 
212 bool
214 {
215  bool any_ready_executable = false;
216 
217  // Execute all the ready subscriptions
218  for (size_t i = 0; i < wait_set_.size_of_subscriptions; ++i) {
219  if (i < entities_collector_->get_number_of_subscriptions()) {
220  if (wait_set_.subscriptions[i]) {
221  execute_subscription(entities_collector_->get_subscription(i));
222  if (spin_once) {
223  return true;
224  }
225  any_ready_executable = true;
226  }
227  }
228  }
229  // Execute all the ready timers
230  for (size_t i = 0; i < wait_set_.size_of_timers; ++i) {
231  if (i < entities_collector_->get_number_of_timers()) {
232  if (wait_set_.timers[i] && entities_collector_->get_timer(i)->is_ready()) {
233  auto timer = entities_collector_->get_timer(i);
234  timer->call();
235  execute_timer(std::move(timer));
236  if (spin_once) {
237  return true;
238  }
239  any_ready_executable = true;
240  }
241  }
242  }
243  // Execute all the ready services
244  for (size_t i = 0; i < wait_set_.size_of_services; ++i) {
245  if (i < entities_collector_->get_number_of_services()) {
246  if (wait_set_.services[i]) {
247  execute_service(entities_collector_->get_service(i));
248  if (spin_once) {
249  return true;
250  }
251  any_ready_executable = true;
252  }
253  }
254  }
255  // Execute all the ready clients
256  for (size_t i = 0; i < wait_set_.size_of_clients; ++i) {
257  if (i < entities_collector_->get_number_of_clients()) {
258  if (wait_set_.clients[i]) {
259  execute_client(entities_collector_->get_client(i));
260  if (spin_once) {
261  return true;
262  }
263  any_ready_executable = true;
264  }
265  }
266  }
267  // Execute all the ready waitables
268  for (size_t i = 0; i < entities_collector_->get_number_of_waitables(); ++i) {
269  auto waitable = entities_collector_->get_waitable(i);
270  if (waitable->is_ready(&wait_set_)) {
271  auto data = waitable->take_data();
272  waitable->execute(data);
273  if (spin_once) {
274  return true;
275  }
276  any_ready_executable = true;
277  }
278  }
279  return any_ready_executable;
280 }
Coordinate the order and timing of available communication tasks.
Definition: executor.hpp:66
rcl_wait_set_t wait_set_
Wait set for managing entities that the rmw layer waits on.
Definition: executor.hpp:545
std::shared_ptr< rclcpp::Context > context_
The context associated with this executor.
Definition: executor.hpp:555
rclcpp::GuardCondition interrupt_guard_condition_
Guard condition for signaling the rmw layer to wake up for special events.
Definition: executor.hpp:540
std::atomic_bool spinning
Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins.
Definition: executor.hpp:537
RCLCPP_PUBLIC void trigger()
Notify the wait set waiting on this condition, if any, that the condition had been met.
RCLCPP_PUBLIC void spin_some(std::chrono::nanoseconds max_duration=std::chrono::nanoseconds(0)) override
Static executor implementation of spin some.
RCLCPP_PUBLIC std::vector< rclcpp::CallbackGroup::WeakPtr > get_all_callback_groups() override
Get callback groups that belong to executor.
RCLCPP_PUBLIC void spin() override
Static executor implementation of spin.
RCLCPP_PUBLIC void add_callback_group(rclcpp::CallbackGroup::SharedPtr group_ptr, rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify=true) override
Add a callback group to an executor.
RCLCPP_PUBLIC std::vector< rclcpp::CallbackGroup::WeakPtr > get_manually_added_callback_groups() override
Get callback groups that belong to executor.
virtual RCLCPP_PUBLIC ~StaticSingleThreadedExecutor()
Default destrcutor.
RCLCPP_PUBLIC void spin_all(std::chrono::nanoseconds max_duration) override
Static executor implementation of spin all.
RCLCPP_PUBLIC void remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify=true) override
Remove a node from the executor.
RCLCPP_PUBLIC bool execute_ready_executables(bool spin_once=false)
Executes ready executables from wait set.
RCLCPP_PUBLIC void add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify=true) override
Add a node to the executor.
RCLCPP_PUBLIC std::vector< rclcpp::CallbackGroup::WeakPtr > get_automatically_added_callback_groups_from_nodes() override
Get callback groups that belong to executor.
RCLCPP_PUBLIC void remove_callback_group(rclcpp::CallbackGroup::SharedPtr group_ptr, bool notify=true) override
Remove callback group from the executor.
This class contains subscriptionbase, timerbase, etc. which can be used to run callbacks.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
RCLCPP_PUBLIC bool ok(rclcpp::Context::SharedPtr context=nullptr)
Check rclcpp's status.
const rcl_timer_t ** timers
Storage for timer pointers.
Definition: wait.h:52
size_t size_of_timers
Number of timers.
Definition: wait.h:54
size_t size_of_subscriptions
Number of subscriptions.
Definition: wait.h:46
const rcl_service_t ** services
Storage for service pointers.
Definition: wait.h:60
const rcl_client_t ** clients
Storage for client pointers.
Definition: wait.h:56
const rcl_subscription_t ** subscriptions
Storage for subscription pointers.
Definition: wait.h:44
size_t size_of_clients
Number of clients.
Definition: wait.h:58
size_t size_of_services
Number of services.
Definition: wait.h:62
Options to be passed to the executor constructor.