ROS 2 rclcpp + rcl - kilted  kilted
ROS 2 C++ Client Library with ROS Client Library
subscription_intra_process_base.hpp
1 // Copyright 2019 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BASE_HPP_
16 #define RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BASE_HPP_
17 
18 #include <algorithm>
19 #include <memory>
20 #include <mutex>
21 #include <string>
22 #include <vector>
23 
24 #include "rcl/wait.h"
25 #include "rmw/impl/cpp/demangle.hpp"
26 
27 #include "rclcpp/guard_condition.hpp"
28 #include "rclcpp/logging.hpp"
29 #include "rclcpp/qos.hpp"
30 #include "rclcpp/waitable.hpp"
31 
32 namespace rclcpp
33 {
34 namespace experimental
35 {
36 
38 {
39 public:
40  RCLCPP_SMART_PTR_ALIASES_ONLY(SubscriptionIntraProcessBase)
41 
42  enum class EntityType : std::size_t
43  {
45  };
46 
47  RCLCPP_PUBLIC
49  rclcpp::Context::SharedPtr context,
50  const std::string & topic_name,
51  const rclcpp::QoS & qos_profile)
52  : gc_(context), topic_name_(topic_name), qos_profile_(qos_profile)
53  {}
54 
55  RCLCPP_PUBLIC
56  virtual ~SubscriptionIntraProcessBase() = default;
57 
58  RCLCPP_PUBLIC
59  size_t
60  get_number_of_ready_guard_conditions() override {return 1;}
61 
62  RCLCPP_PUBLIC
63  void
64  add_to_wait_set(rcl_wait_set_t & wait_set) override;
65 
66  RCLCPP_PUBLIC
67  virtual
68  size_t
69  available_capacity() const = 0;
70 
71  RCLCPP_PUBLIC
72  bool
73  is_durability_transient_local() const;
74 
75  bool
76  is_ready(const rcl_wait_set_t & wait_set) override = 0;
77 
78  std::shared_ptr<void>
79  take_data() override = 0;
80 
81  std::shared_ptr<void>
82  take_data_by_entity_id([[maybe_unused]] size_t id) override
83  {
84  return take_data();
85  }
86 
87  void
88  execute(const std::shared_ptr<void> & data) override = 0;
89 
90  virtual
91  bool
92  use_take_shared_method() const = 0;
93 
94  RCLCPP_PUBLIC
95  const char *
96  get_topic_name() const;
97 
98  RCLCPP_PUBLIC
99  QoS
100  get_actual_qos() const;
101 
103 
130  void
131  set_on_ready_callback(std::function<void(size_t, int)> callback) override
132  {
133  if (!callback) {
134  throw std::invalid_argument(
135  "The callback passed to set_on_ready_callback "
136  "is not callable.");
137  }
138 
139  // Note: we bind the int identifier argument to this waitable's entity types
140  auto new_callback =
141  [callback, this](size_t number_of_events) {
142  try {
143  callback(number_of_events, static_cast<int>(EntityType::Subscription));
144  } catch (const std::exception & exception) {
145  RCLCPP_ERROR_STREAM(
146  // TODO(wjwwood): get this class access to the node logger it is associated with
147  rclcpp::get_logger("rclcpp"),
148  "rclcpp::SubscriptionIntraProcessBase@" << this <<
149  " caught " << rmw::impl::cpp::demangle(exception) <<
150  " exception in user-provided callback for the 'on ready' callback: " <<
151  exception.what());
152  } catch (...) {
153  RCLCPP_ERROR_STREAM(
154  rclcpp::get_logger("rclcpp"),
155  "rclcpp::SubscriptionIntraProcessBase@" << this <<
156  " caught unhandled exception in user-provided callback " <<
157  "for the 'on ready' callback");
158  }
159  };
160 
161  std::lock_guard<std::recursive_mutex> lock(callback_mutex_);
162  on_new_message_callback_ = new_callback;
163 
164  if (unread_count_ > 0) {
165  if (qos_profile_.history() == HistoryPolicy::KeepAll) {
166  on_new_message_callback_(unread_count_);
167  } else {
168  // Use qos profile depth as upper bound for unread_count_
169  on_new_message_callback_(std::min(unread_count_, qos_profile_.depth()));
170  }
171  unread_count_ = 0;
172  }
173  }
174 
176  void
178  {
179  std::lock_guard<std::recursive_mutex> lock(callback_mutex_);
180  on_new_message_callback_ = nullptr;
181  }
182 
183  RCLCPP_PUBLIC
184  std::vector<std::shared_ptr<rclcpp::TimerBase>>
185  get_timers() const override
186  {
187  return {};
188  }
189 
190 protected:
191  std::recursive_mutex callback_mutex_;
192  std::function<void(size_t)> on_new_message_callback_ {nullptr};
193  size_t unread_count_{0};
195 
196  virtual void
197  trigger_guard_condition() = 0;
198 
199  void
200  invoke_on_new_message()
201  {
202  std::lock_guard<std::recursive_mutex> lock(this->callback_mutex_);
203  if (this->on_new_message_callback_) {
204  this->on_new_message_callback_(1);
205  } else {
206  this->unread_count_++;
207  }
208  }
209 
210 private:
211  std::string topic_name_;
212  QoS qos_profile_;
213 };
214 
215 } // namespace experimental
216 } // namespace rclcpp
217 
218 #endif // RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BASE_HPP_
A condition that can be waited on in a single wait set and asynchronously triggered.
Encapsulation of Quality of Service settings.
Definition: qos.hpp:116
QoS & history(HistoryPolicy history)
Set the history policy.
Definition: qos.cpp:121
size_t depth() const
Get the history depth.
Definition: qos.cpp:283
Subscription implementation, templated on the type of message this subscription receives.
void execute(const std::shared_ptr< void > &data) override=0
Execute data that is passed in.
void clear_on_ready_callback() override
Unset the callback registered for new messages, if any.
bool is_ready(const rcl_wait_set_t &wait_set) override=0
Check if the Waitable is ready.
void set_on_ready_callback(std::function< void(size_t, int)> callback) override
Set a callback to be called when each new message arrives.
RCLCPP_PUBLIC std::vector< std::shared_ptr< rclcpp::TimerBase > > get_timers() const override
Returns all timers used by this waitable.
RCLCPP_PUBLIC void add_to_wait_set(rcl_wait_set_t &wait_set) override
Add the Waitable to a wait set.
std::shared_ptr< void > take_data() override=0
Take the data so that it can be consumed with execute.
RCLCPP_PUBLIC size_t get_number_of_ready_guard_conditions() override
Get the number of ready guard_conditions.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
RCLCPP_PUBLIC Logger get_logger(const std::string &name)
Return a named logger.
Definition: logger.cpp:34
Container for subscription's, guard condition's, etc to be waited on.
Definition: wait.h:42