15 #include "rclcpp/experimental/executors/events_executor/events_executor.hpp"
21 #include "rcpputils/scope_exit.hpp"
23 using namespace std::chrono_literals;
27 EventsExecutor::EventsExecutor(
28 rclcpp::experimental::executors::EventsQueue::UniquePtr events_queue,
29 bool execute_timers_separate_thread,
35 throw std::invalid_argument(
"events_queue can't be a null pointer");
37 events_queue_ = std::move(events_queue);
44 const std::shared_ptr<void> &)> timer_on_ready_cb =
nullptr;
45 if (!execute_timers_separate_thread) {
48 ExecutorEvent event = {timer_id, data, -1, ExecutorEventType::TIMER_EVENT, 1};
49 this->events_queue_->enqueue(event);
53 std::make_shared<rclcpp::experimental::TimersManager>(
context_, timer_on_ready_cb);
55 entities_need_rebuild_ =
false;
57 this->setup_notify_waitable();
61 this->current_collection_.clear();
65 this->add_notify_waitable_to_collection(current_collection_.waitables);
69 EventsExecutor::setup_notify_waitable()
73 assert(
notify_waitable_ &&
"The notify waitable should have already been constructed");
87 [
this, notify_waitable_entity_id](
size_t num_events,
int waitable_data) {
94 if (entities_need_rebuild_.exchange(
true)) {
99 {notify_waitable_entity_id,
nullptr, waitable_data, ExecutorEventType::WAITABLE_EVENT, 1};
100 this->events_queue_->enqueue(event);
108 this->refresh_current_collection({});
115 throw std::runtime_error(
"spin() called while already spinning");
117 RCPPUTILS_SCOPE_EXIT(this->
spinning.store(
false); );
119 timers_manager_->start();
120 RCPPUTILS_SCOPE_EXIT(timers_manager_->stop(); );
125 bool has_event = events_queue_->dequeue(event);
127 this->execute_event(event);
141 if (max_duration <= 0ns) {
142 throw std::invalid_argument(
"max_duration must be positive");
151 throw std::runtime_error(
"spin_some() called while already spinning");
154 RCPPUTILS_SCOPE_EXIT(this->
spinning.store(
false); );
156 auto start = std::chrono::steady_clock::now();
158 auto max_duration_not_elapsed = [max_duration, start]() {
159 if (std::chrono::nanoseconds(0) == max_duration) {
162 }
else if (std::chrono::steady_clock::now() - start < max_duration) {
179 const size_t ready_events_at_start = events_queue_->size();
180 size_t executed_events = 0;
181 const size_t ready_timers_at_start = timers_manager_->get_number_ready_timers();
182 size_t executed_timers = 0;
186 if (exhaustive || (executed_events < ready_events_at_start)) {
187 bool has_event = !events_queue_->empty();
191 bool ret = events_queue_->dequeue(event, std::chrono::nanoseconds(0));
193 this->execute_event(event);
201 if (exhaustive || (executed_timers < ready_timers_at_start)) {
202 bool timer_executed = timers_manager_->execute_head_timer();
203 if (timer_executed) {
219 timeout = std::chrono::nanoseconds::max();
224 bool is_timer_timeout =
false;
225 auto next_timer_timeout = timers_manager_->get_head_timeout();
226 if (next_timer_timeout.has_value() && next_timer_timeout.value() < timeout) {
227 timeout = next_timer_timeout.value();
228 is_timer_timeout =
true;
232 bool has_event = events_queue_->dequeue(event, timeout);
237 this->execute_event(event);
238 }
else if (is_timer_timeout) {
239 timers_manager_->execute_head_timer();
247 switch (event.type) {
248 case ExecutorEventType::CLIENT_EVENT:
250 rclcpp::ClientBase::SharedPtr client;
252 client = this->retrieve_entity(
254 current_collection_.clients);
257 for (
size_t i = 0; i <
event.num_events; i++) {
264 case ExecutorEventType::SUBSCRIPTION_EVENT:
266 rclcpp::SubscriptionBase::SharedPtr subscription;
268 subscription = this->retrieve_entity(
270 current_collection_.subscriptions);
273 for (
size_t i = 0; i <
event.num_events; i++) {
279 case ExecutorEventType::SERVICE_EVENT:
281 rclcpp::ServiceBase::SharedPtr service;
283 service = this->retrieve_entity(
285 current_collection_.services);
288 for (
size_t i = 0; i <
event.num_events; i++) {
295 case ExecutorEventType::TIMER_EVENT:
297 timers_manager_->execute_ready_timer(
301 case ExecutorEventType::WAITABLE_EVENT:
303 rclcpp::Waitable::SharedPtr waitable;
305 waitable = this->retrieve_entity(
307 current_collection_.waitables);
310 for (
size_t i = 0; i <
event.num_events; i++) {
311 const auto data = waitable->take_data_by_entity_id(event.waitable_data);
312 waitable->execute(data);
328 const bool notify_waitable_triggered = entities_need_rebuild_.exchange(
false);
337 rclcpp::executors::build_entities_collection(callback_groups, new_collection);
348 this->add_notify_waitable_to_collection(new_collection.
waitables);
350 this->refresh_current_collection(new_collection);
354 EventsExecutor::refresh_current_collection(
358 std::lock_guard<std::mutex> guard(mutex_);
363 current_collection_.remove_expired_entities();
365 current_collection_.timers.update(
367 [
this](rclcpp::TimerBase::SharedPtr timer) {timers_manager_->add_timer(timer);},
368 [
this](rclcpp::TimerBase::SharedPtr timer) {timers_manager_->remove_timer(timer);});
370 current_collection_.subscriptions.update(
372 [
this](
auto subscription) {
373 subscription->set_on_new_message_callback(
374 this->create_entity_callback(
375 subscription->get_subscription_handle().get(), ExecutorEventType::SUBSCRIPTION_EVENT));
377 [](
auto subscription) {subscription->clear_on_new_message_callback();});
379 current_collection_.clients.update(
381 [
this](
auto client) {
382 client->set_on_new_response_callback(
383 this->create_entity_callback(
384 client->get_client_handle().get(), ExecutorEventType::CLIENT_EVENT));
386 [](
auto client) {client->clear_on_new_response_callback();});
388 current_collection_.services.update(
390 [
this](
auto service) {
391 service->set_on_new_request_callback(
392 this->create_entity_callback(
393 service->get_service_handle().get(), ExecutorEventType::SERVICE_EVENT));
395 [](
auto service) {service->clear_on_new_request_callback();});
404 current_collection_.waitables.update(
406 [
this](
auto waitable) {
407 waitable->set_on_ready_callback(
408 this->create_waitable_callback(waitable.get()));
409 for (const auto & t : waitable->get_timers()) {
410 timers_manager_->add_timer(t);
413 [
this](
auto waitable) {
414 waitable->clear_on_ready_callback();
415 for (
const auto & t : waitable->get_timers()) {
416 timers_manager_->remove_timer(t);
421 std::function<void(
size_t)>
422 EventsExecutor::create_entity_callback(
423 void * entity_key, ExecutorEventType event_type)
425 std::function<void(
size_t)>
426 callback = [
this, entity_key, event_type](
size_t num_events) {
427 ExecutorEvent
event = {entity_key,
nullptr, -1, event_type, num_events};
428 this->events_queue_->enqueue(event);
433 std::function<void(
size_t,
int)>
434 EventsExecutor::create_waitable_callback(
const rclcpp::Waitable * entity_key)
436 std::function<void(
size_t,
int)>
437 callback = [
this, entity_key](
size_t num_events,
int waitable_data) {
438 ExecutorEvent
event =
439 {entity_key,
nullptr, waitable_data, ExecutorEventType::WAITABLE_EVENT, num_events};
440 this->events_queue_->enqueue(event);
446 EventsExecutor::add_notify_waitable_to_collection(
450 rclcpp::CallbackGroup::WeakPtr weak_group_ptr;
453 this->notify_waitable_.get(),
454 {this->notify_waitable_, weak_group_ptr}
Coordinate the order and timing of available communication tasks.
static RCLCPP_PUBLIC void execute_service(rclcpp::ServiceBase::SharedPtr service)
Run service server executable.
std::shared_ptr< rclcpp::Context > context_
The context associated with this executor.
static RCLCPP_PUBLIC void execute_subscription(rclcpp::SubscriptionBase::SharedPtr subscription)
Run subscription executable.
std::shared_ptr< rclcpp::executors::ExecutorNotifyWaitable > notify_waitable_
Waitable containing guard conditions controlling the executor flow.
static RCLCPP_PUBLIC void execute_client(rclcpp::ClientBase::SharedPtr client)
Run service client executable.
rclcpp::executors::ExecutorEntitiesCollector collector_
Collector used to associate executable entities from nodes and guard conditions.
std::atomic_bool spinning
Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins.
RCLCPP_PUBLIC void update_collections()
Update the underlying collections.
RCLCPP_PUBLIC std::vector< rclcpp::CallbackGroup::WeakPtr > get_all_callback_groups() const
Get all callback groups known to this entity collector.
bool has_pending() const
Indicate if the entities collector has pending additions or removals.
Events executor implementation.
RCLCPP_PUBLIC void spin_once_impl(std::chrono::nanoseconds timeout) override
Internal implementation of spin_once.
RCLCPP_PUBLIC void spin() override
Events executor implementation of spin.
virtual RCLCPP_PUBLIC ~EventsExecutor()
Default destructor.
RCLCPP_PUBLIC void spin_some_impl(std::chrono::nanoseconds max_duration, bool exhaustive)
Internal implementation of spin_some.
RCLCPP_PUBLIC void spin_all(std::chrono::nanoseconds max_duration) override
Events executor implementation of spin all.
RCLCPP_PUBLIC void handle_updated_entities(bool notify) override
Collect entities from callback groups and refresh the current collection with them.
RCLCPP_PUBLIC void spin_some(std::chrono::nanoseconds max_duration=std::chrono::nanoseconds(0)) override
Events executor implementation of spin some.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
RCLCPP_PUBLIC bool ok(rclcpp::Context::SharedPtr context=nullptr)
Check rclcpp's status.
Structure which encapsulates a ROS Client.
Structure which encapsulates a ROS Service.
Structure which encapsulates a ROS Subscription.
Options to be passed to the executor constructor.
Represent the total set of entities for a single executor.
TimerCollection timers
Collection of timers currently in use by the executor.
ServiceCollection services
Collection of services currently in use by the executor.
SubscriptionCollection subscriptions
Collection of subscriptions currently in use by the executor.
WaitableCollection waitables
Collection of waitables currently in use by the executor.
ClientCollection clients
Collection of clients currently in use by the executor.