15 #include "rclcpp/graph_listener.hpp"
22 #include "rcl/error_handling.h"
24 #include "rclcpp/detail/add_guard_condition_to_rcl_wait_set.hpp"
25 #include "rclcpp/exceptions.hpp"
26 #include "rclcpp/logging.hpp"
27 #include "rclcpp/node.hpp"
28 #include "rmw/impl/cpp/demangle.hpp"
30 #include "rcutils/logging_macros.h"
32 using rclcpp::exceptions::throw_from_rcl_error;
36 namespace graph_listener
39 GraphListener::GraphListener(
const std::shared_ptr<Context> & parent_context)
40 : rcl_parent_context_(parent_context->get_rcl_context()),
43 interrupt_guard_condition_(parent_context)
47 GraphListener::~GraphListener()
49 GraphListener::shutdown(std::nothrow);
52 void GraphListener::init_wait_set()
62 rcl_parent_context_.get(),
65 throw_from_rcl_error(ret,
"failed to initialize wait set");
70 GraphListener::start_if_not_started()
72 std::lock_guard<std::mutex> shutdown_lock(shutdown_mutex_);
81 listener_thread_ = std::thread(&GraphListener::run,
this);
91 }
catch (
const std::exception & exc) {
92 RCUTILS_LOG_ERROR_NAMED(
94 "caught %s exception in GraphListener thread: %s",
95 rmw::impl::cpp::demangle(exc).c_str(),
97 std::rethrow_exception(std::current_exception());
99 RCUTILS_LOG_ERROR_NAMED(
101 "unknown error in GraphListener thread");
102 std::rethrow_exception(std::current_exception());
107 GraphListener::run_loop()
118 std::lock_guard<std::mutex> nodes_barrier_lock(node_graph_interfaces_barrier_mutex_);
120 node_graph_interfaces_mutex_.lock();
123 std::lock_guard<std::mutex> nodes_lock(node_graph_interfaces_mutex_, std::adopt_lock);
125 const size_t node_graph_interfaces_size = node_graph_interfaces_.size();
127 if (wait_set_.size_of_guard_conditions < (node_graph_interfaces_size + 2)) {
130 throw_from_rcl_error(ret,
"failed to resize wait set");
136 throw_from_rcl_error(ret,
"failed to clear wait set");
139 detail::add_guard_condition_to_rcl_wait_set(wait_set_, interrupt_guard_condition_);
142 std::vector<size_t> graph_gc_indexes(node_graph_interfaces_size, 0u);
143 for (
size_t i = 0u; i < node_graph_interfaces_size; ++i) {
144 auto node_ptr = node_graph_interfaces_[i];
146 if (node_ptr->count_graph_users() == 0) {
150 auto graph_gc = node_ptr->get_graph_guard_condition();
152 throw_from_rcl_error(
RCL_RET_ERROR,
"failed to get graph guard condition");
156 throw_from_rcl_error(ret,
"failed to add graph guard condition to wait set");
163 throw std::runtime_error(
"rcl_wait unexpectedly timed out");
166 throw_from_rcl_error(ret,
"failed to wait on wait set");
170 for (
size_t i = 0u; i < node_graph_interfaces_size; ++i) {
171 const auto node_ptr = node_graph_interfaces_[i];
172 auto graph_gc = node_ptr->get_graph_guard_condition();
174 throw_from_rcl_error(
RCL_RET_ERROR,
"failed to get graph guard condition");
176 if (graph_gc == wait_set_.guard_conditions[graph_gc_indexes[i]]) {
177 node_ptr->notify_graph_change();
181 node_ptr->notify_shutdown();
188 interrupt_(GuardCondition * interrupt_guard_condition)
190 interrupt_guard_condition->trigger();
195 std::mutex * node_graph_interfaces_barrier_mutex,
196 std::mutex * node_graph_interfaces_mutex,
197 GuardCondition * interrupt_guard_condition)
202 std::lock_guard<std::mutex> nodes_barrier_lock(*node_graph_interfaces_barrier_mutex);
204 interrupt_(interrupt_guard_condition);
205 node_graph_interfaces_mutex->lock();
211 std::vector<rclcpp::node_interfaces::NodeGraphInterface *> * node_graph_interfaces,
214 for (
const auto node_ptr : (*node_graph_interfaces)) {
215 if (node_graph == node_ptr) {
231 &node_graph_interfaces_barrier_mutex_,
232 &node_graph_interfaces_mutex_,
233 &interrupt_guard_condition_);
235 std::lock_guard<std::mutex> nodes_lock(node_graph_interfaces_mutex_, std::adopt_lock);
236 return has_node_(&node_graph_interfaces_, node_graph);
243 throw std::invalid_argument(
"node is nullptr");
245 std::lock_guard<std::mutex> shutdown_lock(shutdown_mutex_);
253 &node_graph_interfaces_barrier_mutex_,
254 &node_graph_interfaces_mutex_,
255 &interrupt_guard_condition_);
257 std::lock_guard<std::mutex> nodes_lock(node_graph_interfaces_mutex_, std::adopt_lock);
258 if (has_node_(&node_graph_interfaces_, node_graph)) {
261 node_graph_interfaces_.push_back(node_graph);
268 std::vector<rclcpp::node_interfaces::NodeGraphInterface *> * node_graph_interfaces,
272 for (
auto it = node_graph_interfaces->begin(); it != node_graph_interfaces->end(); ++it) {
273 if (node_graph == *it) {
275 node_graph_interfaces->erase(it);
281 throw NodeNotFoundError();
288 throw std::invalid_argument(
"node is nullptr");
290 std::lock_guard<std::mutex> shutdown_lock(shutdown_mutex_);
293 return remove_node_(&node_graph_interfaces_, node_graph);
299 &node_graph_interfaces_barrier_mutex_,
300 &node_graph_interfaces_mutex_,
301 &interrupt_guard_condition_);
303 std::lock_guard<std::mutex> nodes_lock(node_graph_interfaces_mutex_, std::adopt_lock);
304 remove_node_(&node_graph_interfaces_, node_graph);
308 GraphListener::cleanup_wait_set()
312 throw_from_rcl_error(ret,
"failed to finalize wait set");
317 GraphListener::__shutdown()
319 std::lock_guard<std::mutex> shutdown_lock(shutdown_mutex_);
320 if (!is_shutdown_.exchange(
true)) {
322 interrupt_(&interrupt_guard_condition_);
323 listener_thread_.join();
332 GraphListener::shutdown()
338 GraphListener::shutdown(
const std::nothrow_t &) noexcept
342 }
catch (
const std::exception & exc) {
345 "caught %s exception when shutting down GraphListener: %s",
346 rmw::impl::cpp::demangle(exc).c_str(), exc.what());
350 "caught unknown exception when shutting down GraphListener");
355 GraphListener::is_started()
361 GraphListener::is_shutdown()
363 return is_shutdown_.load();
#define rcl_get_default_allocator
Return a properly initialized rcl_allocator_t with default values.
Thrown when a function is called on a GraphListener that is already shutdown.
Thrown when a node has already been added to the GraphListener.
Pure virtual interface class for the NodeGraph part of the Node API.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
RCLCPP_PUBLIC Logger get_logger(const std::string &name)
Return a named logger.
#define RCL_RET_OK
Success return code.
#define RCL_RET_ERROR
Unspecified error return code.
#define RCL_RET_TIMEOUT
Timeout occurred return code.
rmw_ret_t rcl_ret_t
The type that holds an rcl return code.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_wait_set_init(rcl_wait_set_t *wait_set, size_t number_of_subscriptions, size_t number_of_guard_conditions, size_t number_of_timers, size_t number_of_clients, size_t number_of_services, size_t number_of_events, rcl_context_t *context, rcl_allocator_t allocator)
Initialize a rcl wait set with space for items to be waited on.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_wait_set_clear(rcl_wait_set_t *wait_set)
Remove (sets to NULL) all entities in the wait set.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_wait_set_fini(rcl_wait_set_t *wait_set)
Finalize a rcl wait set.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_wait(rcl_wait_set_t *wait_set, int64_t timeout)
Block until the wait set is ready or until the timeout has been exceeded.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_wait_set_resize(rcl_wait_set_t *wait_set, size_t subscriptions_size, size_t guard_conditions_size, size_t timers_size, size_t clients_size, size_t services_size, size_t events_size)
Reallocate space for entities in the wait set.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_wait_set_add_guard_condition(rcl_wait_set_t *wait_set, const rcl_guard_condition_t *guard_condition, size_t *index)
Store a pointer to the guard condition in the next empty spot in the set.