ROS 2 rclcpp + rcl - rolling  rolling-4d14414d
ROS 2 C++ Client Library with ROS Client Library
graph_listener.cpp
1 // Copyright 2016 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "rclcpp/graph_listener.hpp"
16 
17 #include <cstdio>
18 #include <exception>
19 #include <memory>
20 #include <string>
21 #include <vector>
22 #include "rcl/error_handling.h"
23 #include "rcl/types.h"
24 #include "rclcpp/detail/add_guard_condition_to_rcl_wait_set.hpp"
25 #include "rclcpp/exceptions.hpp"
26 #include "rclcpp/logging.hpp"
27 #include "rclcpp/node.hpp"
28 #include "rmw/impl/cpp/demangle.hpp"
29 
30 #include "rcutils/logging_macros.h"
31 
32 using rclcpp::exceptions::throw_from_rcl_error;
33 
34 namespace rclcpp
35 {
36 namespace graph_listener
37 {
38 
39 GraphListener::GraphListener(const std::shared_ptr<Context> & parent_context)
40 : rcl_parent_context_(parent_context->get_rcl_context()),
41  is_started_(false),
42  is_shutdown_(false),
43  interrupt_guard_condition_(parent_context)
44 {
45 }
46 
47 GraphListener::~GraphListener()
48 {
49  GraphListener::shutdown(std::nothrow);
50 }
51 
52 void GraphListener::init_wait_set()
53 {
55  &wait_set_,
56  0, // number_of_subscriptions
57  2, // number_of_guard_conditions
58  0, // number_of_timers
59  0, // number_of_clients
60  0, // number_of_services
61  0, // number_of_events
62  rcl_parent_context_.get(),
64  if (RCL_RET_OK != ret) {
65  throw_from_rcl_error(ret, "failed to initialize wait set");
66  }
67 }
68 
69 void
70 GraphListener::start_if_not_started()
71 {
72  std::lock_guard<std::mutex> shutdown_lock(shutdown_mutex_);
73  if (is_shutdown()) {
75  }
76 
77  if (!is_started()) {
78  // Initialize the wait set before starting.
79  init_wait_set();
80  // Start the listener thread.
81  listener_thread_ = std::thread(&GraphListener::run, this);
82  is_started_ = true;
83  }
84 }
85 
86 void
87 GraphListener::run()
88 {
89  try {
90  run_loop();
91  } catch (const std::exception & exc) {
92  RCUTILS_LOG_ERROR_NAMED(
93  "rclcpp",
94  "caught %s exception in GraphListener thread: %s",
95  rmw::impl::cpp::demangle(exc).c_str(),
96  exc.what());
97  std::rethrow_exception(std::current_exception());
98  } catch (...) {
99  RCUTILS_LOG_ERROR_NAMED(
100  "rclcpp",
101  "unknown error in GraphListener thread");
102  std::rethrow_exception(std::current_exception());
103  }
104 }
105 
106 void
107 GraphListener::run_loop()
108 {
109  while (true) {
110  // If shutdown() was called, exit.
111  if (is_shutdown()) {
112  return;
113  }
114  rcl_ret_t ret;
115  {
116  // This "barrier" lock ensures that other functions can acquire the
117  // node_graph_interfaces_mutex_ after waking up rcl_wait.
118  std::lock_guard<std::mutex> nodes_barrier_lock(node_graph_interfaces_barrier_mutex_);
119  // This is ownership is passed to nodes_lock in the next line.
120  node_graph_interfaces_mutex_.lock();
121  }
122  // This lock is released when the loop continues or exits.
123  std::lock_guard<std::mutex> nodes_lock(node_graph_interfaces_mutex_, std::adopt_lock);
124  // Resize the wait set if necessary.
125  const size_t node_graph_interfaces_size = node_graph_interfaces_.size();
126  // Add 2 for the interrupt and shutdown guard conditions
127  if (wait_set_.size_of_guard_conditions < (node_graph_interfaces_size + 2)) {
128  ret = rcl_wait_set_resize(&wait_set_, 0, node_graph_interfaces_size + 2, 0, 0, 0, 0);
129  if (RCL_RET_OK != ret) {
130  throw_from_rcl_error(ret, "failed to resize wait set");
131  }
132  }
133  // Clear the wait set.
134  ret = rcl_wait_set_clear(&wait_set_);
135  if (RCL_RET_OK != ret) {
136  throw_from_rcl_error(ret, "failed to clear wait set");
137  }
138  // Put the interrupt guard condition in the wait set.
139  detail::add_guard_condition_to_rcl_wait_set(wait_set_, interrupt_guard_condition_);
140 
141  // Put graph guard conditions for each node into the wait set.
142  std::vector<size_t> graph_gc_indexes(node_graph_interfaces_size, 0u);
143  for (size_t i = 0u; i < node_graph_interfaces_size; ++i) {
144  auto node_ptr = node_graph_interfaces_[i];
145  // Only wait on graph changes if some user of the node is watching.
146  if (node_ptr->count_graph_users() == 0) {
147  continue;
148  }
149  // Add the graph guard condition for the node to the wait set.
150  auto graph_gc = node_ptr->get_graph_guard_condition();
151  if (!graph_gc) {
152  throw_from_rcl_error(RCL_RET_ERROR, "failed to get graph guard condition");
153  }
154  ret = rcl_wait_set_add_guard_condition(&wait_set_, graph_gc, &graph_gc_indexes[i]);
155  if (RCL_RET_OK != ret) {
156  throw_from_rcl_error(ret, "failed to add graph guard condition to wait set");
157  }
158  }
159 
160  // Wait for: graph changes, interrupt, or shutdown/SIGINT
161  ret = rcl_wait(&wait_set_, -1); // block for ever until a guard condition is triggered
162  if (RCL_RET_TIMEOUT == ret) {
163  throw std::runtime_error("rcl_wait unexpectedly timed out");
164  }
165  if (RCL_RET_OK != ret) {
166  throw_from_rcl_error(ret, "failed to wait on wait set");
167  }
168 
169  // Notify nodes who's guard conditions are set (triggered).
170  for (size_t i = 0u; i < node_graph_interfaces_size; ++i) {
171  const auto node_ptr = node_graph_interfaces_[i];
172  auto graph_gc = node_ptr->get_graph_guard_condition();
173  if (!graph_gc) {
174  throw_from_rcl_error(RCL_RET_ERROR, "failed to get graph guard condition");
175  }
176  if (graph_gc == wait_set_.guard_conditions[graph_gc_indexes[i]]) {
177  node_ptr->notify_graph_change();
178  }
179  if (is_shutdown()) {
180  // If shutdown, then notify the node of this as well.
181  node_ptr->notify_shutdown();
182  }
183  }
184  } // while (true)
185 }
186 
187 static void
188 interrupt_(GuardCondition * interrupt_guard_condition)
189 {
190  interrupt_guard_condition->trigger();
191 }
192 
193 static void
194 acquire_nodes_lock_(
195  std::mutex * node_graph_interfaces_barrier_mutex,
196  std::mutex * node_graph_interfaces_mutex,
197  GuardCondition * interrupt_guard_condition)
198 {
199  {
200  // Acquire this lock to prevent the run loop from re-locking the
201  // nodes_mutext_ after being woken up.
202  std::lock_guard<std::mutex> nodes_barrier_lock(*node_graph_interfaces_barrier_mutex);
203  // Trigger the interrupt guard condition to wake up rcl_wait.
204  interrupt_(interrupt_guard_condition);
205  node_graph_interfaces_mutex->lock();
206  }
207 }
208 
209 static bool
210 has_node_(
211  std::vector<rclcpp::node_interfaces::NodeGraphInterface *> * node_graph_interfaces,
213 {
214  for (const auto node_ptr : (*node_graph_interfaces)) {
215  if (node_graph == node_ptr) {
216  return true;
217  }
218  }
219  return false;
220 }
221 
222 bool
223 GraphListener::has_node(rclcpp::node_interfaces::NodeGraphInterface * node_graph)
224 {
225  if (!node_graph) {
226  return false;
227  }
228  // Acquire the nodes mutex using the barrier to prevent the run loop from
229  // re-locking the nodes mutex after being interrupted.
230  acquire_nodes_lock_(
231  &node_graph_interfaces_barrier_mutex_,
232  &node_graph_interfaces_mutex_,
233  &interrupt_guard_condition_);
234  // Store the now acquired node_graph_interfaces_mutex_ in the scoped lock using adopt_lock.
235  std::lock_guard<std::mutex> nodes_lock(node_graph_interfaces_mutex_, std::adopt_lock);
236  return has_node_(&node_graph_interfaces_, node_graph);
237 }
238 
239 void
240 GraphListener::add_node(rclcpp::node_interfaces::NodeGraphInterface * node_graph)
241 {
242  if (!node_graph) {
243  throw std::invalid_argument("node is nullptr");
244  }
245  std::lock_guard<std::mutex> shutdown_lock(shutdown_mutex_);
246  if (is_shutdown()) {
248  }
249 
250  // Acquire the nodes mutex using the barrier to prevent the run loop from
251  // re-locking the nodes mutex after being interrupted.
252  acquire_nodes_lock_(
253  &node_graph_interfaces_barrier_mutex_,
254  &node_graph_interfaces_mutex_,
255  &interrupt_guard_condition_);
256  // Store the now acquired node_graph_interfaces_mutex_ in the scoped lock using adopt_lock.
257  std::lock_guard<std::mutex> nodes_lock(node_graph_interfaces_mutex_, std::adopt_lock);
258  if (has_node_(&node_graph_interfaces_, node_graph)) {
259  throw NodeAlreadyAddedError();
260  }
261  node_graph_interfaces_.push_back(node_graph);
262  // The run loop has already been interrupted by acquire_nodes_lock_() and
263  // will evaluate the new node when nodes_lock releases the node_graph_interfaces_mutex_.
264 }
265 
266 static void
267 remove_node_(
268  std::vector<rclcpp::node_interfaces::NodeGraphInterface *> * node_graph_interfaces,
270 {
271  // Remove the node if it is found.
272  for (auto it = node_graph_interfaces->begin(); it != node_graph_interfaces->end(); ++it) {
273  if (node_graph == *it) {
274  // Found the node, remove it.
275  node_graph_interfaces->erase(it);
276  // Now trigger the interrupt guard condition to make sure
277  return;
278  }
279  }
280  // Not found in the loop.
281  throw NodeNotFoundError();
282 }
283 
284 void
285 GraphListener::remove_node(rclcpp::node_interfaces::NodeGraphInterface * node_graph)
286 {
287  if (!node_graph) {
288  throw std::invalid_argument("node is nullptr");
289  }
290  std::lock_guard<std::mutex> shutdown_lock(shutdown_mutex_);
291  if (is_shutdown()) {
292  // If shutdown, then the run loop has been joined, so we can remove them directly.
293  return remove_node_(&node_graph_interfaces_, node_graph);
294  }
295  // Otherwise, first interrupt and lock against the run loop to safely remove the node.
296  // Acquire the nodes mutex using the barrier to prevent the run loop from
297  // re-locking the nodes mutex after being interrupted.
298  acquire_nodes_lock_(
299  &node_graph_interfaces_barrier_mutex_,
300  &node_graph_interfaces_mutex_,
301  &interrupt_guard_condition_);
302  // Store the now acquired node_graph_interfaces_mutex_ in the scoped lock using adopt_lock.
303  std::lock_guard<std::mutex> nodes_lock(node_graph_interfaces_mutex_, std::adopt_lock);
304  remove_node_(&node_graph_interfaces_, node_graph);
305 }
306 
307 void
308 GraphListener::cleanup_wait_set()
309 {
310  rcl_ret_t ret = rcl_wait_set_fini(&wait_set_);
311  if (RCL_RET_OK != ret) {
312  throw_from_rcl_error(ret, "failed to finalize wait set");
313  }
314 }
315 
316 void
317 GraphListener::__shutdown()
318 {
319  std::lock_guard<std::mutex> shutdown_lock(shutdown_mutex_);
320  if (!is_shutdown_.exchange(true)) {
321  if (is_started()) {
322  interrupt_(&interrupt_guard_condition_);
323  listener_thread_.join();
324  }
325  if (is_started()) {
326  cleanup_wait_set();
327  }
328  }
329 }
330 
331 void
332 GraphListener::shutdown()
333 {
334  this->__shutdown();
335 }
336 
337 void
338 GraphListener::shutdown(const std::nothrow_t &) noexcept
339 {
340  try {
341  this->__shutdown();
342  } catch (const std::exception & exc) {
343  RCLCPP_ERROR(
344  rclcpp::get_logger("rclcpp"),
345  "caught %s exception when shutting down GraphListener: %s",
346  rmw::impl::cpp::demangle(exc).c_str(), exc.what());
347  } catch (...) {
348  RCLCPP_ERROR(
349  rclcpp::get_logger("rclcpp"),
350  "caught unknown exception when shutting down GraphListener");
351  }
352 }
353 
354 bool
355 GraphListener::is_started()
356 {
357  return is_started_;
358 }
359 
360 bool
361 GraphListener::is_shutdown()
362 {
363  return is_shutdown_.load();
364 }
365 
366 } // namespace graph_listener
367 } // namespace rclcpp
#define rcl_get_default_allocator
Return a properly initialized rcl_allocator_t with default values.
Definition: allocator.h:37
Thrown when a function is called on a GraphListener that is already shutdown.
Thrown when a node has already been added to the GraphListener.
Pure virtual interface class for the NodeGraph part of the Node API.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
RCLCPP_PUBLIC Logger get_logger(const std::string &name)
Return a named logger.
Definition: logger.cpp:34
#define RCL_RET_OK
Success return code.
Definition: types.h:27
#define RCL_RET_ERROR
Unspecified error return code.
Definition: types.h:29
#define RCL_RET_TIMEOUT
Timeout occurred return code.
Definition: types.h:31
rmw_ret_t rcl_ret_t
The type that holds an rcl return code.
Definition: types.h:24
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_wait_set_init(rcl_wait_set_t *wait_set, size_t number_of_subscriptions, size_t number_of_guard_conditions, size_t number_of_timers, size_t number_of_clients, size_t number_of_services, size_t number_of_events, rcl_context_t *context, rcl_allocator_t allocator)
Initialize a rcl wait set with space for items to be waited on.
Definition: wait.c:78
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_wait_set_clear(rcl_wait_set_t *wait_set)
Remove (sets to NULL) all entities in the wait set.
Definition: wait.c:321
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_wait_set_fini(rcl_wait_set_t *wait_set)
Finalize a rcl wait set.
Definition: wait.c:167
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_wait(rcl_wait_set_t *wait_set, int64_t timeout)
Block until the wait set is ready or until the timeout has been exceeded.
Definition: wait.c:509
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_wait_set_resize(rcl_wait_set_t *wait_set, size_t subscriptions_size, size_t guard_conditions_size, size_t timers_size, size_t clients_size, size_t services_size, size_t events_size)
Reallocate space for entities in the wait set.
Definition: wait.c:363
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_wait_set_add_guard_condition(rcl_wait_set_t *wait_set, const rcl_guard_condition_t *guard_condition, size_t *index)
Store a pointer to the guard condition in the next empty spot in the set.
Definition: wait.c:441