ROS 2 rclcpp + rcl - jazzy  jazzy
ROS 2 C++ Client Library with ROS Client Library
executor_notify_waitable.cpp
1 // Copyright 2023 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <iostream>
16 
17 #include "rclcpp/exceptions.hpp"
18 #include "rclcpp/executors/executor_notify_waitable.hpp"
19 
20 namespace rclcpp
21 {
22 namespace executors
23 {
24 
25 ExecutorNotifyWaitable::ExecutorNotifyWaitable(std::function<void(void)> on_execute_callback)
26 : execute_callback_(on_execute_callback)
27 {
28 }
29 
31 {
32  std::lock_guard<std::mutex> lock(other.guard_condition_mutex_);
33  this->execute_callback_ = other.execute_callback_;
34  this->notify_guard_conditions_ = other.notify_guard_conditions_;
35 }
36 
37 ExecutorNotifyWaitable & ExecutorNotifyWaitable::operator=(ExecutorNotifyWaitable & other)
38 {
39  if (this != &other) {
40  std::lock_guard<std::mutex> lock(other.guard_condition_mutex_);
41  this->execute_callback_ = other.execute_callback_;
42  this->notify_guard_conditions_ = other.notify_guard_conditions_;
43  }
44  return *this;
45 }
46 
47 void
49 {
50  std::lock_guard<std::mutex> lock(guard_condition_mutex_);
51 
52  // Note: no guard conditions need to be re-triggered, since the guard
53  // conditions in this class are not tracking a stateful condition, but instead
54  // only serve to interrupt the wait set when new information is available to
55  // consider.
56  for (auto weak_guard_condition : this->notify_guard_conditions_) {
57  auto guard_condition = weak_guard_condition.lock();
58  if (!guard_condition) {continue;}
59 
60  rcl_guard_condition_t * cond = &guard_condition->get_rcl_guard_condition();
61  rcl_ret_t ret = rcl_wait_set_add_guard_condition(&wait_set, cond, NULL);
62 
63  if (RCL_RET_OK != ret) {
64  rclcpp::exceptions::throw_from_rcl_error(
65  ret, "failed to add guard condition to wait set");
66  }
67  }
68 }
69 
70 bool
72 {
73  std::lock_guard<std::mutex> lock(guard_condition_mutex_);
74 
75  bool any_ready = false;
76  for (size_t ii = 0; ii < wait_set.size_of_guard_conditions; ++ii) {
77  const auto * rcl_guard_condition = wait_set.guard_conditions[ii];
78 
79  if (nullptr == rcl_guard_condition) {
80  continue;
81  }
82  for (const auto & weak_guard_condition : this->notify_guard_conditions_) {
83  auto guard_condition = weak_guard_condition.lock();
84  if (guard_condition && &guard_condition->get_rcl_guard_condition() == rcl_guard_condition) {
85  any_ready = true;
86  break;
87  }
88  }
89  }
90  return any_ready;
91 }
92 
93 void
94 ExecutorNotifyWaitable::execute(const std::shared_ptr<void> & data)
95 {
96  (void) data;
97  this->execute_callback_();
98 }
99 
100 std::shared_ptr<void>
102 {
103  return nullptr;
104 }
105 
106 std::shared_ptr<void>
108 {
109  (void) id;
110  return nullptr;
111 }
112 
113 void
114 ExecutorNotifyWaitable::set_on_ready_callback(std::function<void(size_t, int)> callback)
115 {
116  // The second argument of the callback could be used to identify which guard condition
117  // triggered the event.
118  // We could indicate which of the guard conditions was triggered, but the executor
119  // is already going to check that.
120  auto gc_callback = [callback](size_t count) {
121  callback(count, 0);
122  };
123 
124  std::lock_guard<std::mutex> lock(guard_condition_mutex_);
125 
126  on_ready_callback_ = gc_callback;
127  for (auto weak_gc : notify_guard_conditions_) {
128  auto gc = weak_gc.lock();
129  if (!gc) {
130  continue;
131  }
132  gc->set_on_trigger_callback(on_ready_callback_);
133  }
134 }
135 
136 RCLCPP_PUBLIC
137 void
139 {
140  std::lock_guard<std::mutex> lock(guard_condition_mutex_);
141 
142  on_ready_callback_ = nullptr;
143  for (auto weak_gc : notify_guard_conditions_) {
144  auto gc = weak_gc.lock();
145  if (!gc) {
146  continue;
147  }
148  gc->set_on_trigger_callback(nullptr);
149  }
150 }
151 
152 void
153 ExecutorNotifyWaitable::add_guard_condition(rclcpp::GuardCondition::WeakPtr weak_guard_condition)
154 {
155  std::lock_guard<std::mutex> lock(guard_condition_mutex_);
156  auto guard_condition = weak_guard_condition.lock();
157  if (guard_condition && notify_guard_conditions_.count(weak_guard_condition) == 0) {
158  notify_guard_conditions_.insert(weak_guard_condition);
159  if (on_ready_callback_) {
160  guard_condition->set_on_trigger_callback(on_ready_callback_);
161  }
162  }
163 }
164 
165 void
166 ExecutorNotifyWaitable::remove_guard_condition(rclcpp::GuardCondition::WeakPtr weak_guard_condition)
167 {
168  std::lock_guard<std::mutex> lock(guard_condition_mutex_);
169  if (notify_guard_conditions_.count(weak_guard_condition) != 0) {
170  notify_guard_conditions_.erase(weak_guard_condition);
171  auto guard_condition = weak_guard_condition.lock();
172  // If this notify waitable doesn't have an on_ready_callback, then there's nothing to unset
173  if (guard_condition && on_ready_callback_) {
174  guard_condition->set_on_trigger_callback(nullptr);
175  }
176  }
177 }
178 
179 size_t
181 {
182  std::lock_guard<std::mutex> lock(guard_condition_mutex_);
183  return notify_guard_conditions_.size();
184 }
185 
186 } // namespace executors
187 } // namespace rclcpp
RCLCPP_PUBLIC std::shared_ptr< void > take_data_by_entity_id(size_t id) override
Take the data from an entity ID so that it can be consumed with execute.
RCLCPP_PUBLIC void execute(const std::shared_ptr< void > &data) override
Perform work associated with the waitable.
RCLCPP_PUBLIC void add_guard_condition(rclcpp::GuardCondition::WeakPtr guard_condition)
Add a guard condition to be waited on.
RCLCPP_PUBLIC size_t get_number_of_ready_guard_conditions() override
Get the number of ready guard_conditions.
RCLCPP_PUBLIC std::shared_ptr< void > take_data() override
Retrieve data to be used in the next execute call.
RCLCPP_PUBLIC ExecutorNotifyWaitable(std::function< void(void)> on_execute_callback={})
RCLCPP_PUBLIC void add_to_wait_set(rcl_wait_set_t &wait_set) override
Add conditions to the wait set.
RCLCPP_PUBLIC void set_on_ready_callback(std::function< void(size_t, int)> callback) override
Set a callback to be called whenever the waitable becomes ready.
RCLCPP_PUBLIC void clear_on_ready_callback() override
Unset any callback registered via set_on_ready_callback.
RCLCPP_PUBLIC bool is_ready(const rcl_wait_set_t &wait_set) override
Check conditions against the wait set.
RCLCPP_PUBLIC void remove_guard_condition(rclcpp::GuardCondition::WeakPtr weak_guard_condition)
Remove a guard condition from being waited on.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
Handle for a rcl guard condition.
Container for subscription's, guard condition's, etc to be waited on.
Definition: wait.h:42
size_t size_of_guard_conditions
Number of guard_conditions.
Definition: wait.h:50
const rcl_guard_condition_t ** guard_conditions
Storage for guard condition pointers.
Definition: wait.h:48
#define RCL_RET_OK
Success return code.
Definition: types.h:27
rmw_ret_t rcl_ret_t
The type that holds an rcl return code.
Definition: types.h:24
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_wait_set_add_guard_condition(rcl_wait_set_t *wait_set, const rcl_guard_condition_t *guard_condition, size_t *index)
Store a pointer to the guard condition in the next empty spot in the set.
Definition: wait.c:454