15 #include "rclcpp/graph_listener.hpp"
23 #include "rcl/error_handling.h"
25 #include "rclcpp/detail/add_guard_condition_to_rcl_wait_set.hpp"
26 #include "rclcpp/exceptions.hpp"
27 #include "rclcpp/logging.hpp"
28 #include "rclcpp/node.hpp"
29 #include "rmw/impl/cpp/demangle.hpp"
31 #include "rcutils/logging_macros.h"
33 using rclcpp::exceptions::throw_from_rcl_error;
37 namespace graph_listener
40 GraphListener::GraphListener(
const std::shared_ptr<Context> & parent_context)
41 : weak_parent_context_(parent_context),
42 rcl_parent_context_(parent_context->get_rcl_context()),
45 interrupt_guard_condition_(parent_context)
49 GraphListener::~GraphListener()
54 void GraphListener::init_wait_set()
64 rcl_parent_context_.get(),
67 throw_from_rcl_error(ret,
"failed to initialize wait set");
72 GraphListener::start_if_not_started()
74 std::lock_guard<std::mutex> shutdown_lock(shutdown_mutex_);
75 if (is_shutdown_.load()) {
78 auto parent_context = weak_parent_context_.lock();
79 if (!is_started_ && parent_context) {
83 std::weak_ptr<GraphListener> weak_this = shared_from_this();
84 parent_context->on_shutdown(
86 auto shared_this = weak_this.lock();
89 shared_this->shutdown(std::nothrow);
95 listener_thread_ = std::thread(&GraphListener::run,
this);
105 }
catch (
const std::exception & exc) {
106 RCUTILS_LOG_ERROR_NAMED(
108 "caught %s exception in GraphListener thread: %s",
109 rmw::impl::cpp::demangle(exc).c_str(),
111 std::rethrow_exception(std::current_exception());
113 RCUTILS_LOG_ERROR_NAMED(
115 "unknown error in GraphListener thread");
116 std::rethrow_exception(std::current_exception());
121 GraphListener::run_loop()
125 if (is_shutdown_.load()) {
132 std::lock_guard<std::mutex> nodes_barrier_lock(node_graph_interfaces_barrier_mutex_);
134 node_graph_interfaces_mutex_.lock();
137 std::lock_guard<std::mutex> nodes_lock(node_graph_interfaces_mutex_, std::adopt_lock);
139 const size_t node_graph_interfaces_size = node_graph_interfaces_.size();
141 if (wait_set_.size_of_guard_conditions < (node_graph_interfaces_size + 2)) {
144 throw_from_rcl_error(ret,
"failed to resize wait set");
150 throw_from_rcl_error(ret,
"failed to clear wait set");
153 detail::add_guard_condition_to_rcl_wait_set(wait_set_, interrupt_guard_condition_);
156 std::vector<size_t> graph_gc_indexes(node_graph_interfaces_size, 0u);
157 for (
size_t i = 0u; i < node_graph_interfaces_size; ++i) {
158 auto node_ptr = node_graph_interfaces_[i];
160 if (node_ptr->count_graph_users() == 0) {
164 auto graph_gc = node_ptr->get_graph_guard_condition();
166 throw_from_rcl_error(
RCL_RET_ERROR,
"failed to get graph guard condition");
170 throw_from_rcl_error(ret,
"failed to add graph guard condition to wait set");
177 throw std::runtime_error(
"rcl_wait unexpectedly timed out");
180 throw_from_rcl_error(ret,
"failed to wait on wait set");
184 for (
size_t i = 0u; i < node_graph_interfaces_size; ++i) {
185 const auto node_ptr = node_graph_interfaces_[i];
186 auto graph_gc = node_ptr->get_graph_guard_condition();
188 throw_from_rcl_error(
RCL_RET_ERROR,
"failed to get graph guard condition");
190 if (graph_gc == wait_set_.guard_conditions[graph_gc_indexes[i]]) {
191 node_ptr->notify_graph_change();
195 node_ptr->notify_shutdown();
202 interrupt_(GuardCondition * interrupt_guard_condition)
204 interrupt_guard_condition->trigger();
209 std::mutex * node_graph_interfaces_barrier_mutex,
210 std::mutex * node_graph_interfaces_mutex,
211 GuardCondition * interrupt_guard_condition)
216 std::lock_guard<std::mutex> nodes_barrier_lock(*node_graph_interfaces_barrier_mutex);
218 interrupt_(interrupt_guard_condition);
219 node_graph_interfaces_mutex->lock();
225 std::vector<rclcpp::node_interfaces::NodeGraphInterface *> * node_graph_interfaces,
228 for (
const auto node_ptr : (*node_graph_interfaces)) {
229 if (node_graph == node_ptr) {
245 &node_graph_interfaces_barrier_mutex_,
246 &node_graph_interfaces_mutex_,
247 &interrupt_guard_condition_);
249 std::lock_guard<std::mutex> nodes_lock(node_graph_interfaces_mutex_, std::adopt_lock);
250 return has_node_(&node_graph_interfaces_, node_graph);
257 throw std::invalid_argument(
"node is nullptr");
259 std::lock_guard<std::mutex> shutdown_lock(shutdown_mutex_);
260 if (is_shutdown_.load()) {
267 &node_graph_interfaces_barrier_mutex_,
268 &node_graph_interfaces_mutex_,
269 &interrupt_guard_condition_);
271 std::lock_guard<std::mutex> nodes_lock(node_graph_interfaces_mutex_, std::adopt_lock);
272 if (has_node_(&node_graph_interfaces_, node_graph)) {
275 node_graph_interfaces_.push_back(node_graph);
282 std::vector<rclcpp::node_interfaces::NodeGraphInterface *> * node_graph_interfaces,
286 for (
auto it = node_graph_interfaces->begin(); it != node_graph_interfaces->end(); ++it) {
287 if (node_graph == *it) {
289 node_graph_interfaces->erase(it);
295 throw NodeNotFoundError();
302 throw std::invalid_argument(
"node is nullptr");
304 std::lock_guard<std::mutex> shutdown_lock(shutdown_mutex_);
307 return remove_node_(&node_graph_interfaces_, node_graph);
313 &node_graph_interfaces_barrier_mutex_,
314 &node_graph_interfaces_mutex_,
315 &interrupt_guard_condition_);
317 std::lock_guard<std::mutex> nodes_lock(node_graph_interfaces_mutex_, std::adopt_lock);
318 remove_node_(&node_graph_interfaces_, node_graph);
322 GraphListener::cleanup_wait_set()
326 throw_from_rcl_error(ret,
"failed to finalize wait set");
331 GraphListener::__shutdown()
333 std::lock_guard<std::mutex> shutdown_lock(shutdown_mutex_);
334 if (!is_shutdown_.exchange(
true)) {
336 interrupt_(&interrupt_guard_condition_);
337 listener_thread_.join();
346 GraphListener::shutdown()
352 GraphListener::shutdown(
const std::nothrow_t &) noexcept
356 }
catch (
const std::exception & exc) {
359 "caught %s exception when shutting down GraphListener: %s",
360 rmw::impl::cpp::demangle(exc).c_str(), exc.what());
364 "caught unknown exception when shutting down GraphListener");
369 GraphListener::is_shutdown()
371 return is_shutdown_.load();
#define rcl_get_default_allocator
Return a properly initialized rcl_allocator_t with default values.
Thrown when a function is called on a GraphListener that is already shutdown.
Thrown when a node has already been added to the GraphListener.
Pure virtual interface class for the NodeGraph part of the Node API.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
RCLCPP_PUBLIC bool shutdown(rclcpp::Context::SharedPtr context=nullptr, const std::string &reason="user called rclcpp::shutdown()")
Shutdown rclcpp context, invalidating it for derived entities.
RCLCPP_PUBLIC Logger get_logger(const std::string &name)
Return a named logger.
#define RCL_RET_OK
Success return code.
#define RCL_RET_ERROR
Unspecified error return code.
#define RCL_RET_TIMEOUT
Timeout occurred return code.
rmw_ret_t rcl_ret_t
The type that holds an rcl return code.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_wait_set_init(rcl_wait_set_t *wait_set, size_t number_of_subscriptions, size_t number_of_guard_conditions, size_t number_of_timers, size_t number_of_clients, size_t number_of_services, size_t number_of_events, rcl_context_t *context, rcl_allocator_t allocator)
Initialize a rcl wait set with space for items to be waited on.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_wait_set_clear(rcl_wait_set_t *wait_set)
Remove (sets to NULL) all entities in the wait set.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_wait_set_fini(rcl_wait_set_t *wait_set)
Finalize a rcl wait set.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_wait(rcl_wait_set_t *wait_set, int64_t timeout)
Block until the wait set is ready or until the timeout has been exceeded.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_wait_set_resize(rcl_wait_set_t *wait_set, size_t subscriptions_size, size_t guard_conditions_size, size_t timers_size, size_t clients_size, size_t services_size, size_t events_size)
Reallocate space for entities in the wait set.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_wait_set_add_guard_condition(rcl_wait_set_t *wait_set, const rcl_guard_condition_t *guard_condition, size_t *index)
Store a pointer to the guard condition in the next empty spot in the set.