ROS 2 rclcpp + rcl - humble  humble
ROS 2 C++ Client Library with ROS Client Library
graph_listener.cpp
1 // Copyright 2016 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "rclcpp/graph_listener.hpp"
16 
17 #include <cstdio>
18 #include <exception>
19 #include <memory>
20 #include <string>
21 #include <vector>
22 
23 #include "rcl/error_handling.h"
24 #include "rcl/types.h"
25 #include "rclcpp/detail/add_guard_condition_to_rcl_wait_set.hpp"
26 #include "rclcpp/exceptions.hpp"
27 #include "rclcpp/logging.hpp"
28 #include "rclcpp/node.hpp"
29 #include "rmw/impl/cpp/demangle.hpp"
30 
31 #include "rcutils/logging_macros.h"
32 
33 using rclcpp::exceptions::throw_from_rcl_error;
34 
35 namespace rclcpp
36 {
37 namespace graph_listener
38 {
39 
40 GraphListener::GraphListener(const std::shared_ptr<Context> & parent_context)
41 : weak_parent_context_(parent_context),
42  rcl_parent_context_(parent_context->get_rcl_context()),
43  is_started_(false),
44  is_shutdown_(false),
45  interrupt_guard_condition_(parent_context)
46 {
47 }
48 
49 GraphListener::~GraphListener()
50 {
51  this->shutdown(std::nothrow);
52 }
53 
54 void GraphListener::init_wait_set()
55 {
57  &wait_set_,
58  0, // number_of_subscriptions
59  2, // number_of_guard_conditions
60  0, // number_of_timers
61  0, // number_of_clients
62  0, // number_of_services
63  0, // number_of_events
64  rcl_parent_context_.get(),
66  if (RCL_RET_OK != ret) {
67  throw_from_rcl_error(ret, "failed to initialize wait set");
68  }
69 }
70 
71 void
72 GraphListener::start_if_not_started()
73 {
74  std::lock_guard<std::mutex> shutdown_lock(shutdown_mutex_);
75  if (is_shutdown_.load()) {
77  }
78  auto parent_context = weak_parent_context_.lock();
79  if (!is_started_ && parent_context) {
80  // Register an on_shutdown hook to shtudown the graph listener.
81  // This is important to ensure that the wait set is finalized before
82  // destruction of static objects occurs.
83  std::weak_ptr<GraphListener> weak_this = shared_from_this();
84  parent_context->on_shutdown(
85  [weak_this]() {
86  auto shared_this = weak_this.lock();
87  if (shared_this) {
88  // should not throw from on_shutdown if it can be avoided
89  shared_this->shutdown(std::nothrow);
90  }
91  });
92  // Initialize the wait set before starting.
93  init_wait_set();
94  // Start the listener thread.
95  listener_thread_ = std::thread(&GraphListener::run, this);
96  is_started_ = true;
97  }
98 }
99 
100 void
101 GraphListener::run()
102 {
103  try {
104  run_loop();
105  } catch (const std::exception & exc) {
106  RCUTILS_LOG_ERROR_NAMED(
107  "rclcpp",
108  "caught %s exception in GraphListener thread: %s",
109  rmw::impl::cpp::demangle(exc).c_str(),
110  exc.what());
111  std::rethrow_exception(std::current_exception());
112  } catch (...) {
113  RCUTILS_LOG_ERROR_NAMED(
114  "rclcpp",
115  "unknown error in GraphListener thread");
116  std::rethrow_exception(std::current_exception());
117  }
118 }
119 
120 void
121 GraphListener::run_loop()
122 {
123  while (true) {
124  // If shutdown() was called, exit.
125  if (is_shutdown_.load()) {
126  return;
127  }
128  rcl_ret_t ret;
129  {
130  // This "barrier" lock ensures that other functions can acquire the
131  // node_graph_interfaces_mutex_ after waking up rcl_wait.
132  std::lock_guard<std::mutex> nodes_barrier_lock(node_graph_interfaces_barrier_mutex_);
133  // This is ownership is passed to nodes_lock in the next line.
134  node_graph_interfaces_mutex_.lock();
135  }
136  // This lock is released when the loop continues or exits.
137  std::lock_guard<std::mutex> nodes_lock(node_graph_interfaces_mutex_, std::adopt_lock);
138  // Resize the wait set if necessary.
139  const size_t node_graph_interfaces_size = node_graph_interfaces_.size();
140  // Add 2 for the interrupt and shutdown guard conditions
141  if (wait_set_.size_of_guard_conditions < (node_graph_interfaces_size + 2)) {
142  ret = rcl_wait_set_resize(&wait_set_, 0, node_graph_interfaces_size + 2, 0, 0, 0, 0);
143  if (RCL_RET_OK != ret) {
144  throw_from_rcl_error(ret, "failed to resize wait set");
145  }
146  }
147  // Clear the wait set.
148  ret = rcl_wait_set_clear(&wait_set_);
149  if (RCL_RET_OK != ret) {
150  throw_from_rcl_error(ret, "failed to clear wait set");
151  }
152  // Put the interrupt guard condition in the wait set.
153  detail::add_guard_condition_to_rcl_wait_set(wait_set_, interrupt_guard_condition_);
154 
155  // Put graph guard conditions for each node into the wait set.
156  std::vector<size_t> graph_gc_indexes(node_graph_interfaces_size, 0u);
157  for (size_t i = 0u; i < node_graph_interfaces_size; ++i) {
158  auto node_ptr = node_graph_interfaces_[i];
159  // Only wait on graph changes if some user of the node is watching.
160  if (node_ptr->count_graph_users() == 0) {
161  continue;
162  }
163  // Add the graph guard condition for the node to the wait set.
164  auto graph_gc = node_ptr->get_graph_guard_condition();
165  if (!graph_gc) {
166  throw_from_rcl_error(RCL_RET_ERROR, "failed to get graph guard condition");
167  }
168  ret = rcl_wait_set_add_guard_condition(&wait_set_, graph_gc, &graph_gc_indexes[i]);
169  if (RCL_RET_OK != ret) {
170  throw_from_rcl_error(ret, "failed to add graph guard condition to wait set");
171  }
172  }
173 
174  // Wait for: graph changes, interrupt, or shutdown/SIGINT
175  ret = rcl_wait(&wait_set_, -1); // block for ever until a guard condition is triggered
176  if (RCL_RET_TIMEOUT == ret) {
177  throw std::runtime_error("rcl_wait unexpectedly timed out");
178  }
179  if (RCL_RET_OK != ret) {
180  throw_from_rcl_error(ret, "failed to wait on wait set");
181  }
182 
183  // Notify nodes who's guard conditions are set (triggered).
184  for (size_t i = 0u; i < node_graph_interfaces_size; ++i) {
185  const auto node_ptr = node_graph_interfaces_[i];
186  auto graph_gc = node_ptr->get_graph_guard_condition();
187  if (!graph_gc) {
188  throw_from_rcl_error(RCL_RET_ERROR, "failed to get graph guard condition");
189  }
190  if (graph_gc == wait_set_.guard_conditions[graph_gc_indexes[i]]) {
191  node_ptr->notify_graph_change();
192  }
193  if (is_shutdown_) {
194  // If shutdown, then notify the node of this as well.
195  node_ptr->notify_shutdown();
196  }
197  }
198  } // while (true)
199 }
200 
201 static void
202 interrupt_(GuardCondition * interrupt_guard_condition)
203 {
204  interrupt_guard_condition->trigger();
205 }
206 
207 static void
208 acquire_nodes_lock_(
209  std::mutex * node_graph_interfaces_barrier_mutex,
210  std::mutex * node_graph_interfaces_mutex,
211  GuardCondition * interrupt_guard_condition)
212 {
213  {
214  // Acquire this lock to prevent the run loop from re-locking the
215  // nodes_mutext_ after being woken up.
216  std::lock_guard<std::mutex> nodes_barrier_lock(*node_graph_interfaces_barrier_mutex);
217  // Trigger the interrupt guard condition to wake up rcl_wait.
218  interrupt_(interrupt_guard_condition);
219  node_graph_interfaces_mutex->lock();
220  }
221 }
222 
223 static bool
224 has_node_(
225  std::vector<rclcpp::node_interfaces::NodeGraphInterface *> * node_graph_interfaces,
227 {
228  for (const auto node_ptr : (*node_graph_interfaces)) {
229  if (node_graph == node_ptr) {
230  return true;
231  }
232  }
233  return false;
234 }
235 
236 bool
237 GraphListener::has_node(rclcpp::node_interfaces::NodeGraphInterface * node_graph)
238 {
239  if (!node_graph) {
240  return false;
241  }
242  // Acquire the nodes mutex using the barrier to prevent the run loop from
243  // re-locking the nodes mutex after being interrupted.
244  acquire_nodes_lock_(
245  &node_graph_interfaces_barrier_mutex_,
246  &node_graph_interfaces_mutex_,
247  &interrupt_guard_condition_);
248  // Store the now acquired node_graph_interfaces_mutex_ in the scoped lock using adopt_lock.
249  std::lock_guard<std::mutex> nodes_lock(node_graph_interfaces_mutex_, std::adopt_lock);
250  return has_node_(&node_graph_interfaces_, node_graph);
251 }
252 
253 void
254 GraphListener::add_node(rclcpp::node_interfaces::NodeGraphInterface * node_graph)
255 {
256  if (!node_graph) {
257  throw std::invalid_argument("node is nullptr");
258  }
259  std::lock_guard<std::mutex> shutdown_lock(shutdown_mutex_);
260  if (is_shutdown_.load()) {
262  }
263 
264  // Acquire the nodes mutex using the barrier to prevent the run loop from
265  // re-locking the nodes mutex after being interrupted.
266  acquire_nodes_lock_(
267  &node_graph_interfaces_barrier_mutex_,
268  &node_graph_interfaces_mutex_,
269  &interrupt_guard_condition_);
270  // Store the now acquired node_graph_interfaces_mutex_ in the scoped lock using adopt_lock.
271  std::lock_guard<std::mutex> nodes_lock(node_graph_interfaces_mutex_, std::adopt_lock);
272  if (has_node_(&node_graph_interfaces_, node_graph)) {
273  throw NodeAlreadyAddedError();
274  }
275  node_graph_interfaces_.push_back(node_graph);
276  // The run loop has already been interrupted by acquire_nodes_lock_() and
277  // will evaluate the new node when nodes_lock releases the node_graph_interfaces_mutex_.
278 }
279 
280 static void
281 remove_node_(
282  std::vector<rclcpp::node_interfaces::NodeGraphInterface *> * node_graph_interfaces,
284 {
285  // Remove the node if it is found.
286  for (auto it = node_graph_interfaces->begin(); it != node_graph_interfaces->end(); ++it) {
287  if (node_graph == *it) {
288  // Found the node, remove it.
289  node_graph_interfaces->erase(it);
290  // Now trigger the interrupt guard condition to make sure
291  return;
292  }
293  }
294  // Not found in the loop.
295  throw NodeNotFoundError();
296 }
297 
298 void
299 GraphListener::remove_node(rclcpp::node_interfaces::NodeGraphInterface * node_graph)
300 {
301  if (!node_graph) {
302  throw std::invalid_argument("node is nullptr");
303  }
304  std::lock_guard<std::mutex> shutdown_lock(shutdown_mutex_);
305  if (is_shutdown()) {
306  // If shutdown, then the run loop has been joined, so we can remove them directly.
307  return remove_node_(&node_graph_interfaces_, node_graph);
308  }
309  // Otherwise, first interrupt and lock against the run loop to safely remove the node.
310  // Acquire the nodes mutex using the barrier to prevent the run loop from
311  // re-locking the nodes mutex after being interrupted.
312  acquire_nodes_lock_(
313  &node_graph_interfaces_barrier_mutex_,
314  &node_graph_interfaces_mutex_,
315  &interrupt_guard_condition_);
316  // Store the now acquired node_graph_interfaces_mutex_ in the scoped lock using adopt_lock.
317  std::lock_guard<std::mutex> nodes_lock(node_graph_interfaces_mutex_, std::adopt_lock);
318  remove_node_(&node_graph_interfaces_, node_graph);
319 }
320 
321 void
322 GraphListener::cleanup_wait_set()
323 {
324  rcl_ret_t ret = rcl_wait_set_fini(&wait_set_);
325  if (RCL_RET_OK != ret) {
326  throw_from_rcl_error(ret, "failed to finalize wait set");
327  }
328 }
329 
330 void
331 GraphListener::__shutdown()
332 {
333  std::lock_guard<std::mutex> shutdown_lock(shutdown_mutex_);
334  if (!is_shutdown_.exchange(true)) {
335  if (is_started_) {
336  interrupt_(&interrupt_guard_condition_);
337  listener_thread_.join();
338  }
339  if (is_started_) {
340  cleanup_wait_set();
341  }
342  }
343 }
344 
345 void
346 GraphListener::shutdown()
347 {
348  this->__shutdown();
349 }
350 
351 void
352 GraphListener::shutdown(const std::nothrow_t &) noexcept
353 {
354  try {
355  this->__shutdown();
356  } catch (const std::exception & exc) {
357  RCLCPP_ERROR(
358  rclcpp::get_logger("rclcpp"),
359  "caught %s exception when shutting down GraphListener: %s",
360  rmw::impl::cpp::demangle(exc).c_str(), exc.what());
361  } catch (...) {
362  RCLCPP_ERROR(
363  rclcpp::get_logger("rclcpp"),
364  "caught unknown exception when shutting down GraphListener");
365  }
366 }
367 
368 bool
369 GraphListener::is_shutdown()
370 {
371  return is_shutdown_.load();
372 }
373 
374 } // namespace graph_listener
375 } // namespace rclcpp
#define rcl_get_default_allocator
Return a properly initialized rcl_allocator_t with default values.
Definition: allocator.h:37
Thrown when a function is called on a GraphListener that is already shutdown.
Thrown when a node has already been added to the GraphListener.
Pure virtual interface class for the NodeGraph part of the Node API.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
RCLCPP_PUBLIC bool shutdown(rclcpp::Context::SharedPtr context=nullptr, const std::string &reason="user called rclcpp::shutdown()")
Shutdown rclcpp context, invalidating it for derived entities.
RCLCPP_PUBLIC Logger get_logger(const std::string &name)
Return a named logger.
Definition: logger.cpp:27
#define RCL_RET_OK
Success return code.
Definition: types.h:26
#define RCL_RET_ERROR
Unspecified error return code.
Definition: types.h:28
#define RCL_RET_TIMEOUT
Timeout occurred return code.
Definition: types.h:30
rmw_ret_t rcl_ret_t
The type that holds an rcl return code.
Definition: types.h:23
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_wait_set_init(rcl_wait_set_t *wait_set, size_t number_of_subscriptions, size_t number_of_guard_conditions, size_t number_of_timers, size_t number_of_clients, size_t number_of_services, size_t number_of_events, rcl_context_t *context, rcl_allocator_t allocator)
Initialize a rcl wait set with space for items to be waited on.
Definition: wait.c:103
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_wait_set_clear(rcl_wait_set_t *wait_set)
Remove (sets to NULL) all entities in the wait set.
Definition: wait.c:334
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_wait_set_fini(rcl_wait_set_t *wait_set)
Finalize a rcl wait set.
Definition: wait.c:189
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_wait(rcl_wait_set_t *wait_set, int64_t timeout)
Block until the wait set is ready or until the timeout has been exceeded.
Definition: wait.c:522
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_wait_set_resize(rcl_wait_set_t *wait_set, size_t subscriptions_size, size_t guard_conditions_size, size_t timers_size, size_t clients_size, size_t services_size, size_t events_size)
Reallocate space for entities in the wait set.
Definition: wait.c:376
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_wait_set_add_guard_condition(rcl_wait_set_t *wait_set, const rcl_guard_condition_t *guard_condition, size_t *index)
Store a pointer to the guard condition in the next empty spot in the set.
Definition: wait.c:454