ROS 2 rclcpp + rcl - humble  humble
ROS 2 C++ Client Library with ROS Client Library
executor.hpp
1 // Copyright 2014 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef RCLCPP__EXECUTOR_HPP_
16 #define RCLCPP__EXECUTOR_HPP_
17 
18 #include <algorithm>
19 #include <cassert>
20 #include <chrono>
21 #include <cstdlib>
22 #include <iostream>
23 #include <list>
24 #include <map>
25 #include <memory>
26 #include <mutex>
27 #include <string>
28 #include <vector>
29 
30 #include "rcl/guard_condition.h"
31 #include "rcl/wait.h"
32 #include "rcpputils/scope_exit.hpp"
33 
34 #include "rclcpp/context.hpp"
35 #include "rclcpp/contexts/default_context.hpp"
36 #include "rclcpp/guard_condition.hpp"
37 #include "rclcpp/executor_options.hpp"
38 #include "rclcpp/future_return_code.hpp"
39 #include "rclcpp/memory_strategies.hpp"
40 #include "rclcpp/memory_strategy.hpp"
41 #include "rclcpp/node_interfaces/node_base_interface.hpp"
42 #include "rclcpp/utilities.hpp"
43 #include "rclcpp/visibility_control.hpp"
44 
45 namespace rclcpp
46 {
47 
48 typedef std::map<rclcpp::CallbackGroup::WeakPtr,
49  rclcpp::node_interfaces::NodeBaseInterface::WeakPtr,
50  std::owner_less<rclcpp::CallbackGroup::WeakPtr>> WeakCallbackGroupsToNodesMap;
51 
52 // Forward declaration is used in convenience method signature.
53 class Node;
54 
56 
65 class Executor
66 {
67 public:
68  RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(Executor)
69 
70 
74  RCLCPP_PUBLIC
75  explicit Executor(const rclcpp::ExecutorOptions & options = rclcpp::ExecutorOptions());
76 
78  RCLCPP_PUBLIC
79  virtual ~Executor();
80 
82  // It is up to the implementation of Executor to implement spin.
83  virtual void
84  spin() = 0;
85 
87 
103  RCLCPP_PUBLIC
104  virtual void
106  rclcpp::CallbackGroup::SharedPtr group_ptr,
107  rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
108  bool notify = true);
109 
111 
122  RCLCPP_PUBLIC
123  virtual std::vector<rclcpp::CallbackGroup::WeakPtr>
125 
127 
136  RCLCPP_PUBLIC
137  virtual std::vector<rclcpp::CallbackGroup::WeakPtr>
139 
141 
151  RCLCPP_PUBLIC
152  virtual std::vector<rclcpp::CallbackGroup::WeakPtr>
154 
156 
174  RCLCPP_PUBLIC
175  virtual void
177  rclcpp::CallbackGroup::SharedPtr group_ptr,
178  bool notify = true);
179 
181 
199  RCLCPP_PUBLIC
200  virtual void
201  add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify = true);
202 
204 
207  RCLCPP_PUBLIC
208  virtual void
209  add_node(std::shared_ptr<rclcpp::Node> node_ptr, bool notify = true);
210 
212 
227  RCLCPP_PUBLIC
228  virtual void
229  remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify = true);
230 
232 
235  RCLCPP_PUBLIC
236  virtual void
237  remove_node(std::shared_ptr<rclcpp::Node> node_ptr, bool notify = true);
238 
240 
246  template<typename RepT = int64_t, typename T = std::milli>
247  void
249  rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node,
250  std::chrono::duration<RepT, T> timeout = std::chrono::duration<RepT, T>(-1))
251  {
252  return spin_node_once_nanoseconds(
253  node,
254  std::chrono::duration_cast<std::chrono::nanoseconds>(timeout)
255  );
256  }
257 
259  template<typename NodeT = rclcpp::Node, typename RepT = int64_t, typename T = std::milli>
260  void
262  std::shared_ptr<NodeT> node,
263  std::chrono::duration<RepT, T> timeout = std::chrono::duration<RepT, T>(-1))
264  {
265  return spin_node_once_nanoseconds(
266  node->get_node_base_interface(),
267  std::chrono::duration_cast<std::chrono::nanoseconds>(timeout)
268  );
269  }
270 
272 
275  RCLCPP_PUBLIC
276  void
277  spin_node_some(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node);
278 
280  RCLCPP_PUBLIC
281  void
282  spin_node_some(std::shared_ptr<rclcpp::Node> node);
283 
285 
295  RCLCPP_PUBLIC
296  virtual void
297  spin_some(std::chrono::nanoseconds max_duration = std::chrono::nanoseconds(0));
298 
300 
314  RCLCPP_PUBLIC
315  virtual void
316  spin_all(std::chrono::nanoseconds max_duration);
317 
318  RCLCPP_PUBLIC
319  virtual void
320  spin_once(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));
321 
323 
332  template<typename FutureT, typename TimeRepT = int64_t, typename TimeT = std::milli>
335  const FutureT & future,
336  std::chrono::duration<TimeRepT, TimeT> timeout = std::chrono::duration<TimeRepT, TimeT>(-1))
337  {
338  // TODO(wjwwood): does not work recursively; can't call spin_node_until_future_complete
339  // inside a callback executed by an executor.
340 
341  // Check the future before entering the while loop.
342  // If the future is already complete, don't try to spin.
343  std::future_status status = future.wait_for(std::chrono::seconds(0));
344  if (status == std::future_status::ready) {
345  return FutureReturnCode::SUCCESS;
346  }
347 
348  auto end_time = std::chrono::steady_clock::now();
349  std::chrono::nanoseconds timeout_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
350  timeout);
351  if (timeout_ns > std::chrono::nanoseconds::zero()) {
352  end_time += timeout_ns;
353  }
354  std::chrono::nanoseconds timeout_left = timeout_ns;
355 
356  if (spinning.exchange(true)) {
357  throw std::runtime_error("spin_until_future_complete() called while already spinning");
358  }
359  RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); );
360  while (rclcpp::ok(this->context_) && spinning.load()) {
361  // Do one item of work.
362  spin_once_impl(timeout_left);
363 
364  // Check if the future is set, return SUCCESS if it is.
365  status = future.wait_for(std::chrono::seconds(0));
366  if (status == std::future_status::ready) {
367  return FutureReturnCode::SUCCESS;
368  }
369  // If the original timeout is < 0, then this is blocking, never TIMEOUT.
370  if (timeout_ns < std::chrono::nanoseconds::zero()) {
371  continue;
372  }
373  // Otherwise check if we still have time to wait, return TIMEOUT if not.
374  auto now = std::chrono::steady_clock::now();
375  if (now >= end_time) {
376  return FutureReturnCode::TIMEOUT;
377  }
378  // Subtract the elapsed time from the original timeout.
379  timeout_left = std::chrono::duration_cast<std::chrono::nanoseconds>(end_time - now);
380  }
381 
382  // The future did not complete before ok() returned false, return INTERRUPTED.
383  return FutureReturnCode::INTERRUPTED;
384  }
385 
387 
391  RCLCPP_PUBLIC
392  void
393  cancel();
394 
396 
402  RCLCPP_PUBLIC
403  void
404  set_memory_strategy(memory_strategy::MemoryStrategy::SharedPtr memory_strategy);
405 
407 
411  RCLCPP_PUBLIC
412  bool
413  is_spinning();
414 
415 protected:
416  RCLCPP_PUBLIC
417  void
418  spin_node_once_nanoseconds(
419  rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node,
420  std::chrono::nanoseconds timeout);
421 
422  RCLCPP_PUBLIC
423  void
424  spin_some_impl(std::chrono::nanoseconds max_duration, bool exhaustive);
425 
427 
432  RCLCPP_PUBLIC
433  void
435 
436  RCLCPP_PUBLIC
437  static void
438  execute_subscription(
439  rclcpp::SubscriptionBase::SharedPtr subscription);
440 
441  RCLCPP_PUBLIC
442  static void
443  execute_timer(rclcpp::TimerBase::SharedPtr timer);
444 
445  RCLCPP_PUBLIC
446  static void
447  execute_service(rclcpp::ServiceBase::SharedPtr service);
448 
449  RCLCPP_PUBLIC
450  static void
451  execute_client(rclcpp::ClientBase::SharedPtr client);
452 
456  RCLCPP_PUBLIC
457  void
458  wait_for_work(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));
459 
460  RCLCPP_PUBLIC
461  rclcpp::node_interfaces::NodeBaseInterface::SharedPtr
462  get_node_by_group(
463  const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes,
464  rclcpp::CallbackGroup::SharedPtr group);
465 
467 
472  RCLCPP_PUBLIC
473  bool
474  has_node(
475  const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
476  const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) const;
477 
478  RCLCPP_PUBLIC
479  rclcpp::CallbackGroup::SharedPtr
480  get_group_by_timer(rclcpp::TimerBase::SharedPtr timer);
481 
483 
486  RCLCPP_PUBLIC
487  virtual void
489  rclcpp::CallbackGroup::SharedPtr group_ptr,
490  rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
491  WeakCallbackGroupsToNodesMap & weak_groups_to_nodes,
492  bool notify = true) RCPPUTILS_TSA_REQUIRES(mutex_);
493 
495 
498  RCLCPP_PUBLIC
499  virtual void
501  rclcpp::CallbackGroup::SharedPtr group_ptr,
502  WeakCallbackGroupsToNodesMap & weak_groups_to_nodes,
503  bool notify = true) RCPPUTILS_TSA_REQUIRES(mutex_);
504 
505  RCLCPP_PUBLIC
506  bool
507  get_next_ready_executable(AnyExecutable & any_executable);
508 
509  RCLCPP_PUBLIC
510  bool
511  get_next_ready_executable_from_map(
512  AnyExecutable & any_executable,
513  const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes);
514 
515  RCLCPP_PUBLIC
516  bool
517  get_next_executable(
518  AnyExecutable & any_executable,
519  std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));
520 
522 
532  RCLCPP_PUBLIC
533  virtual void
534  add_callback_groups_from_nodes_associated_to_executor() RCPPUTILS_TSA_REQUIRES(mutex_);
535 
537  std::atomic_bool spinning;
538 
541 
542  std::shared_ptr<rclcpp::GuardCondition> shutdown_guard_condition_;
543 
546 
547  // Mutex to protect the subsequent memory_strategy_.
548  mutable std::mutex mutex_;
549 
551  memory_strategy::MemoryStrategy::SharedPtr
552  memory_strategy_ RCPPUTILS_TSA_PT_GUARDED_BY(mutex_);
553 
555  std::shared_ptr<rclcpp::Context> context_;
556 
557  RCLCPP_DISABLE_COPY(Executor)
558 
559  RCLCPP_PUBLIC
560  virtual void
561  spin_once_impl(std::chrono::nanoseconds timeout);
562 
563  typedef std::map<rclcpp::CallbackGroup::WeakPtr,
564  const rclcpp::GuardCondition *,
565  std::owner_less<rclcpp::CallbackGroup::WeakPtr>>
566  WeakCallbackGroupsToGuardConditionsMap;
567 
569  WeakCallbackGroupsToGuardConditionsMap
570  weak_groups_to_guard_conditions_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
571 
573  WeakCallbackGroupsToNodesMap
574  weak_groups_associated_with_executor_to_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
575 
577  WeakCallbackGroupsToNodesMap
578  weak_groups_to_nodes_associated_with_executor_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
579 
581  WeakCallbackGroupsToNodesMap
582  weak_groups_to_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
583 
585  std::list<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>
586  weak_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
587 
590 };
591 
592 } // namespace rclcpp
593 
594 #endif // RCLCPP__EXECUTOR_HPP_
Context which encapsulates shared state between nodes and other similar entities.
Definition: context.hpp:73
Coordinate the order and timing of available communication tasks.
Definition: executor.hpp:66
RCLCPP_PUBLIC void spin_node_some(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node)
Add a node, complete all immediately available work, and remove the node.
Definition: executor.cpp:403
rcl_wait_set_t wait_set_
Wait set for managing entities that the rmw layer waits on.
Definition: executor.hpp:545
RCLCPP_PUBLIC void set_memory_strategy(memory_strategy::MemoryStrategy::SharedPtr memory_strategy)
Support dynamic switching of the memory strategy.
Definition: executor.cpp:499
virtual RCLCPP_PUBLIC ~Executor()
Default destructor.
Definition: executor.cpp:85
virtual RCLCPP_PUBLIC std::vector< rclcpp::CallbackGroup::WeakPtr > get_manually_added_callback_groups()
Get callback groups that belong to executor.
Definition: executor.cpp:150
virtual RCLCPP_PUBLIC void remove_callback_group(rclcpp::CallbackGroup::SharedPtr group_ptr, bool notify=true)
Remove a callback group from the executor.
Definition: executor.cpp:324
FutureReturnCode spin_until_future_complete(const FutureT &future, std::chrono::duration< TimeRepT, TimeT > timeout=std::chrono::duration< TimeRepT, TimeT >(-1))
Spin (blocking) until the future is complete, it times out waiting, or rclcpp is interrupted.
Definition: executor.hpp:334
virtual RCLCPP_PUBLIC void add_callback_group(rclcpp::CallbackGroup::SharedPtr group_ptr, rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify=true)
Add a callback group to an executor.
Definition: executor.cpp:239
RCLCPP_PUBLIC void wait_for_work(std::chrono::nanoseconds timeout=std::chrono::nanoseconds(-1))
Definition: executor.cpp:690
virtual RCLCPP_PUBLIC void remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify=true)
Remove a node from the executor.
Definition: executor.cpp:342
RCLCPP_PUBLIC Executor(const rclcpp::ExecutorOptions &options=rclcpp::ExecutorOptions())
Default constructor.
Definition: executor.cpp:46
virtual RCLCPP_PUBLIC void spin_some(std::chrono::nanoseconds max_duration=std::chrono::nanoseconds(0))
Collect work once and execute all available work, optionally within a duration.
Definition: executor.cpp:416
RCLCPP_PUBLIC void cancel()
Cancel any running spin* function, causing it to return.
Definition: executor.cpp:487
WeakCallbackGroupsToGuardConditionsMap weak_groups_to_guard_conditions_ RCPPUTILS_TSA_GUARDED_BY(mutex_)
maps callback groups to guard conditions
RCLCPP_PUBLIC bool is_spinning()
Returns true if the executor is currently spinning.
Definition: executor.cpp:944
void spin_node_once(std::shared_ptr< NodeT > node, std::chrono::duration< RepT, T > timeout=std::chrono::duration< RepT, T >(-1))
Convenience function which takes Node and forwards NodeBaseInterface.
Definition: executor.hpp:261
std::shared_ptr< rclcpp::Context > context_
The context associated with this executor.
Definition: executor.hpp:555
virtual RCLCPP_PUBLIC std::vector< rclcpp::CallbackGroup::WeakPtr > get_automatically_added_callback_groups_from_nodes()
Get callback groups that belong to executor.
Definition: executor.cpp:161
rclcpp::GuardCondition interrupt_guard_condition_
Guard condition for signaling the rmw layer to wake up for special events.
Definition: executor.hpp:540
rclcpp::OnShutdownCallbackHandle shutdown_callback_handle_
shutdown callback handle registered to Context
Definition: executor.hpp:589
virtual RCLCPP_PUBLIC void add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify=true)
Add a node to the executor.
Definition: executor.cpp:253
RCLCPP_PUBLIC void execute_any_executable(AnyExecutable &any_exec)
Find the next available executable and do the work associated with it.
Definition: executor.cpp:509
virtual RCLCPP_PUBLIC void add_callback_group_to_map(rclcpp::CallbackGroup::SharedPtr group_ptr, rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, WeakCallbackGroupsToNodesMap &weak_groups_to_nodes, bool notify=true) RCPPUTILS_TSA_REQUIRES(mutex_)
Add a callback group to an executor.
Definition: executor.cpp:196
virtual RCLCPP_PUBLIC void add_callback_groups_from_nodes_associated_to_executor() RCPPUTILS_TSA_REQUIRES(mutex_)
Add all callback groups that can be automatically added from associated nodes.
Definition: executor.cpp:172
virtual void spin()=0
Do work periodically as it becomes available to us. Blocking call, may block indefinitely.
virtual RCLCPP_PUBLIC void remove_callback_group_from_map(rclcpp::CallbackGroup::SharedPtr group_ptr, WeakCallbackGroupsToNodesMap &weak_groups_to_nodes, bool notify=true) RCPPUTILS_TSA_REQUIRES(mutex_)
Remove a callback group from the executor.
Definition: executor.cpp:281
virtual RCLCPP_PUBLIC std::vector< rclcpp::CallbackGroup::WeakPtr > get_all_callback_groups()
Get callback groups that belong to executor.
Definition: executor.cpp:136
RCLCPP_PUBLIC bool has_node(const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, const WeakCallbackGroupsToNodesMap &weak_groups_to_nodes) const
Return true if the node has been added to this executor.
Definition: executor.cpp:929
virtual RCLCPP_PUBLIC void spin_all(std::chrono::nanoseconds max_duration)
Collect and execute work repeatedly within a duration or until no more work is available.
Definition: executor.cpp:421
void spin_node_once(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node, std::chrono::duration< RepT, T > timeout=std::chrono::duration< RepT, T >(-1))
Add a node to executor, execute the next available unit of work, and remove the node.
Definition: executor.hpp:248
memory_strategy::MemoryStrategy::SharedPtr memory_strategy_ RCPPUTILS_TSA_PT_GUARDED_BY(mutex_)
The memory strategy: an interface for handling user-defined memory allocation strategies.
std::atomic_bool spinning
Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins.
Definition: executor.hpp:537
A condition that can be waited on in a single wait set and asynchronously triggered.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
FutureReturnCode
Return codes to be used with spin_until_future_complete.
RCLCPP_PUBLIC bool ok(rclcpp::Context::SharedPtr context=nullptr)
Check rclcpp's status.
Container for subscription's, guard condition's, etc to be waited on.
Definition: wait.h:42
Options to be passed to the executor constructor.
RCL_PUBLIC RCL_WARN_UNUSED rcl_wait_set_t rcl_get_zero_initialized_wait_set(void)
Return a rcl_wait_set_t struct with members set to NULL.
Definition: wait.c:64