15 #include "rclcpp/experimental/executors/events_executor/events_executor.hpp"
21 #include "rcpputils/scope_exit.hpp"
23 using namespace std::chrono_literals;
27 EventsExecutor::EventsExecutor(
28 rclcpp::experimental::executors::EventsQueue::UniquePtr events_queue,
29 bool execute_timers_separate_thread,
35 throw std::invalid_argument(
"events_queue can't be a null pointer");
37 events_queue_ = std::move(events_queue);
44 const std::shared_ptr<void> &)> timer_on_ready_cb =
nullptr;
45 if (!execute_timers_separate_thread) {
48 ExecutorEvent event = {timer_id, data, -1, ExecutorEventType::TIMER_EVENT, 1};
49 this->events_queue_->enqueue(event);
53 std::make_shared<rclcpp::experimental::TimersManager>(
context_, timer_on_ready_cb);
55 this->current_entities_collection_ =
56 std::make_shared<rclcpp::executors::ExecutorEntitiesCollection>();
58 notify_waitable_ = std::make_shared<rclcpp::executors::ExecutorNotifyWaitable>(
65 this->refresh_current_collection_from_callback_groups();
70 this->add_notify_waitable_to_collection(current_entities_collection_->waitables);
75 notify_waitable_->set_on_ready_callback(
76 this->create_waitable_callback(notify_waitable_.get()));
78 auto notify_waitable_entity_id = notify_waitable_.get();
79 notify_waitable_->set_on_ready_callback(
80 [
this, notify_waitable_entity_id](
size_t num_events,
int waitable_data) {
87 if (notify_waitable_event_pushed_.exchange(
true)) {
92 {notify_waitable_entity_id,
nullptr, waitable_data, ExecutorEventType::WAITABLE_EVENT, 1};
93 this->events_queue_->enqueue(event);
96 this->entities_collector_ =
97 std::make_shared<rclcpp::executors::ExecutorEntitiesCollector>(notify_waitable_);
103 notify_waitable_->clear_on_ready_callback();
104 this->refresh_current_collection({});
111 throw std::runtime_error(
"spin() called while already spinning");
113 RCPPUTILS_SCOPE_EXIT(this->
spinning.store(
false); );
115 timers_manager_->start();
116 RCPPUTILS_SCOPE_EXIT(timers_manager_->stop(); );
121 bool has_event = events_queue_->dequeue(event);
123 this->execute_event(event);
137 if (max_duration <= 0ns) {
138 throw std::invalid_argument(
"max_duration must be positive");
147 throw std::runtime_error(
"spin_some() called while already spinning");
150 RCPPUTILS_SCOPE_EXIT(this->
spinning.store(
false); );
152 auto start = std::chrono::steady_clock::now();
154 auto max_duration_not_elapsed = [max_duration, start]() {
155 if (std::chrono::nanoseconds(0) == max_duration) {
158 }
else if (std::chrono::steady_clock::now() - start < max_duration) {
171 this->refresh_current_collection_from_callback_groups();
175 const size_t ready_events_at_start = events_queue_->size();
176 size_t executed_events = 0;
177 const size_t ready_timers_at_start = timers_manager_->get_number_ready_timers();
178 size_t executed_timers = 0;
182 if (exhaustive || (executed_events < ready_events_at_start)) {
183 bool has_event = !events_queue_->empty();
187 bool ret = events_queue_->dequeue(event, std::chrono::nanoseconds(0));
189 this->execute_event(event);
197 if (exhaustive || (executed_timers < ready_timers_at_start)) {
198 bool timer_executed = timers_manager_->execute_head_timer();
199 if (timer_executed) {
215 timeout = std::chrono::nanoseconds::max();
220 bool is_timer_timeout =
false;
221 auto next_timer_timeout = timers_manager_->get_head_timeout();
222 if (next_timer_timeout.has_value() && next_timer_timeout.value() < timeout) {
223 timeout = next_timer_timeout.value();
224 is_timer_timeout =
true;
228 bool has_event = events_queue_->dequeue(event, timeout);
233 this->execute_event(event);
234 }
else if (is_timer_timeout) {
235 timers_manager_->execute_head_timer();
241 rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
bool notify)
247 this->entities_collector_->add_node(node_ptr);
249 this->refresh_current_collection_from_callback_groups();
255 this->
add_node(node_ptr->get_node_base_interface(), notify);
260 rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
bool notify)
269 this->entities_collector_->remove_node(node_ptr);
271 this->refresh_current_collection_from_callback_groups();
277 this->
remove_node(node_ptr->get_node_base_interface(), notify);
283 switch (event.type) {
284 case ExecutorEventType::CLIENT_EVENT:
286 rclcpp::ClientBase::SharedPtr client;
288 std::lock_guard<std::recursive_mutex> lock(collection_mutex_);
289 client = this->retrieve_entity(
291 current_entities_collection_->clients);
294 for (
size_t i = 0; i <
event.num_events; i++) {
301 case ExecutorEventType::SUBSCRIPTION_EVENT:
303 rclcpp::SubscriptionBase::SharedPtr subscription;
305 std::lock_guard<std::recursive_mutex> lock(collection_mutex_);
306 subscription = this->retrieve_entity(
308 current_entities_collection_->subscriptions);
311 for (
size_t i = 0; i <
event.num_events; i++) {
317 case ExecutorEventType::SERVICE_EVENT:
319 rclcpp::ServiceBase::SharedPtr service;
321 std::lock_guard<std::recursive_mutex> lock(collection_mutex_);
322 service = this->retrieve_entity(
324 current_entities_collection_->services);
327 for (
size_t i = 0; i <
event.num_events; i++) {
334 case ExecutorEventType::TIMER_EVENT:
336 timers_manager_->execute_ready_timer(
340 case ExecutorEventType::WAITABLE_EVENT:
342 rclcpp::Waitable::SharedPtr waitable;
344 std::lock_guard<std::recursive_mutex> lock(collection_mutex_);
345 waitable = this->retrieve_entity(
347 current_entities_collection_->waitables);
350 for (
size_t i = 0; i <
event.num_events; i++) {
351 const auto data = waitable->take_data_by_entity_id(event.waitable_data);
352 waitable->execute(data);
362 rclcpp::CallbackGroup::SharedPtr group_ptr,
363 rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
371 this->entities_collector_->add_callback_group(group_ptr);
373 this->refresh_current_collection_from_callback_groups();
378 rclcpp::CallbackGroup::SharedPtr group_ptr,
bool notify)
384 this->entities_collector_->remove_callback_group(group_ptr);
386 this->refresh_current_collection_from_callback_groups();
389 std::vector<rclcpp::CallbackGroup::WeakPtr>
392 this->entities_collector_->update_collections();
393 return this->entities_collector_->get_all_callback_groups();
396 std::vector<rclcpp::CallbackGroup::WeakPtr>
399 this->entities_collector_->update_collections();
400 return this->entities_collector_->get_manually_added_callback_groups();
403 std::vector<rclcpp::CallbackGroup::WeakPtr>
406 this->entities_collector_->update_collections();
407 return this->entities_collector_->get_automatically_added_callback_groups();
411 EventsExecutor::refresh_current_collection_from_callback_groups()
418 const bool notify_waitable_triggered = notify_waitable_event_pushed_.exchange(
false);
419 if (!notify_waitable_triggered && !this->entities_collector_->has_pending()) {
424 this->entities_collector_->update_collections();
425 auto callback_groups = this->entities_collector_->get_all_callback_groups();
427 rclcpp::executors::build_entities_collection(callback_groups, new_collection);
437 this->add_notify_waitable_to_collection(new_collection.
waitables);
440 std::lock_guard<std::recursive_mutex> lock(collection_mutex_);
441 this->add_notify_waitable_to_collection(current_entities_collection_->waitables);
443 this->refresh_current_collection(new_collection);
447 EventsExecutor::refresh_current_collection(
451 std::lock_guard<std::recursive_mutex> lock(collection_mutex_);
453 current_entities_collection_->timers.update(
455 [
this](rclcpp::TimerBase::SharedPtr timer) {timers_manager_->add_timer(timer);},
456 [
this](rclcpp::TimerBase::SharedPtr timer) {timers_manager_->remove_timer(timer);});
458 current_entities_collection_->subscriptions.update(
460 [
this](
auto subscription) {
461 subscription->set_on_new_message_callback(
462 this->create_entity_callback(
463 subscription->get_subscription_handle().get(), ExecutorEventType::SUBSCRIPTION_EVENT));
465 [](
auto subscription) {subscription->clear_on_new_message_callback();});
467 current_entities_collection_->clients.update(
469 [
this](
auto client) {
470 client->set_on_new_response_callback(
471 this->create_entity_callback(
472 client->get_client_handle().get(), ExecutorEventType::CLIENT_EVENT));
474 [](
auto client) {client->clear_on_new_response_callback();});
476 current_entities_collection_->services.update(
478 [
this](
auto service) {
479 service->set_on_new_request_callback(
480 this->create_entity_callback(
481 service->get_service_handle().get(), ExecutorEventType::SERVICE_EVENT));
483 [](
auto service) {service->clear_on_new_request_callback();});
492 current_entities_collection_->waitables.update(
494 [
this](
auto waitable) {
495 waitable->set_on_ready_callback(
496 this->create_waitable_callback(waitable.get()));
498 [](
auto waitable) {waitable->clear_on_ready_callback();});
501 std::function<void(
size_t)>
502 EventsExecutor::create_entity_callback(
503 void * entity_key, ExecutorEventType event_type)
505 std::function<void(
size_t)>
506 callback = [
this, entity_key, event_type](
size_t num_events) {
507 ExecutorEvent
event = {entity_key,
nullptr, -1, event_type, num_events};
508 this->events_queue_->enqueue(event);
513 std::function<void(
size_t,
int)>
514 EventsExecutor::create_waitable_callback(
const rclcpp::Waitable * entity_key)
516 std::function<void(
size_t,
int)>
517 callback = [
this, entity_key](
size_t num_events,
int waitable_data) {
518 ExecutorEvent
event =
519 {entity_key,
nullptr, waitable_data, ExecutorEventType::WAITABLE_EVENT, num_events};
520 this->events_queue_->enqueue(event);
526 EventsExecutor::add_notify_waitable_to_collection(
530 rclcpp::CallbackGroup::WeakPtr weak_group_ptr;
533 this->notify_waitable_.get(),
534 {this->notify_waitable_, weak_group_ptr}
Coordinate the order and timing of available communication tasks.
std::shared_ptr< rclcpp::GuardCondition > interrupt_guard_condition_
Guard condition for signaling the rmw layer to wake up for special events.
static RCLCPP_PUBLIC void execute_service(rclcpp::ServiceBase::SharedPtr service)
Run service server executable.
std::shared_ptr< rclcpp::Context > context_
The context associated with this executor.
static RCLCPP_PUBLIC void execute_subscription(rclcpp::SubscriptionBase::SharedPtr subscription)
Run subscription executable.
static RCLCPP_PUBLIC void execute_client(rclcpp::ClientBase::SharedPtr client)
Run service client executable.
std::shared_ptr< rclcpp::GuardCondition > shutdown_guard_condition_
Guard condition for signaling the rmw layer to wake up for system shutdown.
std::atomic_bool spinning
Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins.
Events executor implementation.
RCLCPP_PUBLIC void remove_callback_group(rclcpp::CallbackGroup::SharedPtr group_ptr, bool notify=true) override
Remove callback group from the executor.
RCLCPP_PUBLIC void spin_once_impl(std::chrono::nanoseconds timeout) override
Internal implementation of spin_once.
RCLCPP_PUBLIC void spin() override
Events executor implementation of spin.
RCLCPP_PUBLIC std::vector< rclcpp::CallbackGroup::WeakPtr > get_all_callback_groups() override
Get callback groups that belong to executor.
virtual RCLCPP_PUBLIC ~EventsExecutor()
Default destructor.
RCLCPP_PUBLIC void add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify=true) override
Add a node to the executor.
RCLCPP_PUBLIC void remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify=true) override
Remove a node from the executor.
RCLCPP_PUBLIC void spin_some_impl(std::chrono::nanoseconds max_duration, bool exhaustive)
Internal implementation of spin_some.
RCLCPP_PUBLIC void add_callback_group(rclcpp::CallbackGroup::SharedPtr group_ptr, rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify=true) override
Add a callback group to an executor.
RCLCPP_PUBLIC std::vector< rclcpp::CallbackGroup::WeakPtr > get_automatically_added_callback_groups_from_nodes() override
Get callback groups that belong to executor.
RCLCPP_PUBLIC void spin_all(std::chrono::nanoseconds max_duration) override
Events executor implementation of spin all.
RCLCPP_PUBLIC std::vector< rclcpp::CallbackGroup::WeakPtr > get_manually_added_callback_groups() override
Get callback groups that belong to executor.
RCLCPP_PUBLIC void spin_some(std::chrono::nanoseconds max_duration=std::chrono::nanoseconds(0)) override
Events executor implementation of spin some.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
RCLCPP_PUBLIC bool ok(rclcpp::Context::SharedPtr context=nullptr)
Check rclcpp's status.
Structure which encapsulates a ROS Client.
Structure which encapsulates a ROS Service.
Structure which encapsulates a ROS Subscription.
Options to be passed to the executor constructor.
Represent the total set of entities for a single executor.
TimerCollection timers
Collection of timers currently in use by the executor.
ServiceCollection services
Collection of services currently in use by the executor.
SubscriptionCollection subscriptions
Collection of subscriptions currently in use by the executor.
WaitableCollection waitables
Collection of waitables currently in use by the executor.
ClientCollection clients
Collection of clients currently in use by the executor.