ROS 2 rclcpp + rcl - humble  humble
ROS 2 C++ Client Library with ROS Client Library
subscription_intra_process_base.hpp
1 // Copyright 2019 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BASE_HPP_
16 #define RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BASE_HPP_
17 
18 #include <algorithm>
19 #include <memory>
20 #include <mutex>
21 #include <string>
22 
23 #include "rcl/wait.h"
24 #include "rmw/impl/cpp/demangle.hpp"
25 
26 #include "rclcpp/guard_condition.hpp"
27 #include "rclcpp/logging.hpp"
28 #include "rclcpp/qos.hpp"
29 #include "rclcpp/waitable.hpp"
30 
31 namespace rclcpp
32 {
33 namespace experimental
34 {
35 
37 {
38 public:
39  RCLCPP_SMART_PTR_ALIASES_ONLY(SubscriptionIntraProcessBase)
40 
41  enum class EntityType : std::size_t
42  {
44  };
45 
46  RCLCPP_PUBLIC
48  rclcpp::Context::SharedPtr context,
49  const std::string & topic_name,
50  const rclcpp::QoS & qos_profile)
51  : gc_(context), topic_name_(topic_name), qos_profile_(qos_profile)
52  {}
53 
54  RCLCPP_PUBLIC
56 
57  RCLCPP_PUBLIC
58  size_t
59  get_number_of_ready_guard_conditions() override {return 1;}
60 
61  RCLCPP_PUBLIC
62  void
63  add_to_wait_set(rcl_wait_set_t * wait_set) override;
64 
65  bool
66  is_ready(rcl_wait_set_t * wait_set) override = 0;
67 
68  std::shared_ptr<void>
69  take_data() override = 0;
70 
71  std::shared_ptr<void>
72  take_data_by_entity_id(size_t id) override
73  {
74  (void)id;
75  return take_data();
76  }
77 
78  void
79  execute(std::shared_ptr<void> & data) override = 0;
80 
81  virtual
82  bool
83  use_take_shared_method() const = 0;
84 
85  RCLCPP_PUBLIC
86  const char *
87  get_topic_name() const;
88 
89  RCLCPP_PUBLIC
90  QoS
91  get_actual_qos() const;
92 
94 
121  void
122  set_on_ready_callback(std::function<void(size_t, int)> callback) override
123  {
124  if (!callback) {
125  throw std::invalid_argument(
126  "The callback passed to set_on_ready_callback "
127  "is not callable.");
128  }
129 
130  // Note: we bind the int identifier argument to this waitable's entity types
131  auto new_callback =
132  [callback, this](size_t number_of_events) {
133  try {
134  callback(number_of_events, static_cast<int>(EntityType::Subscription));
135  } catch (const std::exception & exception) {
136  RCLCPP_ERROR_STREAM(
137  // TODO(wjwwood): get this class access to the node logger it is associated with
138  rclcpp::get_logger("rclcpp"),
139  "rclcpp::SubscriptionIntraProcessBase@" << this <<
140  " caught " << rmw::impl::cpp::demangle(exception) <<
141  " exception in user-provided callback for the 'on ready' callback: " <<
142  exception.what());
143  } catch (...) {
144  RCLCPP_ERROR_STREAM(
145  rclcpp::get_logger("rclcpp"),
146  "rclcpp::SubscriptionIntraProcessBase@" << this <<
147  " caught unhandled exception in user-provided callback " <<
148  "for the 'on ready' callback");
149  }
150  };
151 
152  std::lock_guard<std::recursive_mutex> lock(callback_mutex_);
153  on_new_message_callback_ = new_callback;
154 
155  if (unread_count_ > 0) {
156  if (qos_profile_.history() == HistoryPolicy::KeepAll) {
157  on_new_message_callback_(unread_count_);
158  } else {
159  // Use qos profile depth as upper bound for unread_count_
160  on_new_message_callback_(std::min(unread_count_, qos_profile_.depth()));
161  }
162  unread_count_ = 0;
163  }
164  }
165 
167  void
169  {
170  std::lock_guard<std::recursive_mutex> lock(callback_mutex_);
171  on_new_message_callback_ = nullptr;
172  }
173 
174 protected:
175  std::recursive_mutex callback_mutex_;
176  std::function<void(size_t)> on_new_message_callback_ {nullptr};
177  size_t unread_count_{0};
179 
180  virtual void
181  trigger_guard_condition() = 0;
182 
183  void
184  invoke_on_new_message()
185  {
186  std::lock_guard<std::recursive_mutex> lock(this->callback_mutex_);
187  if (this->on_new_message_callback_) {
188  this->on_new_message_callback_(1);
189  } else {
190  this->unread_count_++;
191  }
192  }
193 
194 private:
195  std::string topic_name_;
196  QoS qos_profile_;
197 };
198 
199 } // namespace experimental
200 } // namespace rclcpp
201 
202 #endif // RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BASE_HPP_
A condition that can be waited on in a single wait set and asynchronously triggered.
Encapsulation of Quality of Service settings.
Definition: qos.hpp:111
QoS & history(HistoryPolicy history)
Set the history policy.
Definition: qos.cpp:107
size_t depth() const
Get the history depth.
Definition: qos.cpp:249
Subscription implementation, templated on the type of message this subscription receives.
void clear_on_ready_callback() override
Unset the callback registered for new messages, if any.
std::shared_ptr< void > take_data_by_entity_id(size_t id) override
Take the data so that it can be consumed with execute.
RCLCPP_PUBLIC void add_to_wait_set(rcl_wait_set_t *wait_set) override
Add the Waitable to a wait set.
void execute(std::shared_ptr< void > &data) override=0
Execute data that is passed in.
void set_on_ready_callback(std::function< void(size_t, int)> callback) override
Set a callback to be called when each new message arrives.
bool is_ready(rcl_wait_set_t *wait_set) override=0
Check if the Waitable is ready.
std::shared_ptr< void > take_data() override=0
Take the data so that it can be consumed with execute.
RCLCPP_PUBLIC size_t get_number_of_ready_guard_conditions() override
Get the number of ready guard_conditions.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
RCLCPP_PUBLIC Logger get_logger(const std::string &name)
Return a named logger.
Definition: logger.cpp:27
Container for subscription's, guard condition's, etc to be waited on.
Definition: wait.h:42