15 #ifndef RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BASE_HPP_
16 #define RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BASE_HPP_
24 #include "rmw/impl/cpp/demangle.hpp"
26 #include "rclcpp/guard_condition.hpp"
27 #include "rclcpp/logging.hpp"
28 #include "rclcpp/qos.hpp"
29 #include "rclcpp/waitable.hpp"
33 namespace experimental
41 enum class EntityType : std::size_t
48 rclcpp::Context::SharedPtr context,
49 const std::string & topic_name,
51 : gc_(context), topic_name_(topic_name), qos_profile_(qos_profile)
68 available_capacity()
const = 0;
72 is_durability_transient_local()
const;
88 execute(
const std::shared_ptr<void> & data)
override = 0;
92 use_take_shared_method()
const = 0;
96 get_topic_name()
const;
100 get_actual_qos()
const;
134 throw std::invalid_argument(
135 "The callback passed to set_on_ready_callback "
141 [callback,
this](
size_t number_of_events) {
143 callback(number_of_events,
static_cast<int>(EntityType::Subscription));
144 }
catch (
const std::exception & exception) {
148 "rclcpp::SubscriptionIntraProcessBase@" <<
this <<
149 " caught " << rmw::impl::cpp::demangle(exception) <<
150 " exception in user-provided callback for the 'on ready' callback: " <<
155 "rclcpp::SubscriptionIntraProcessBase@" <<
this <<
156 " caught unhandled exception in user-provided callback " <<
157 "for the 'on ready' callback");
161 std::lock_guard<std::recursive_mutex> lock(callback_mutex_);
162 on_new_message_callback_ = new_callback;
164 if (unread_count_ > 0) {
165 if (qos_profile_.
history() == HistoryPolicy::KeepAll) {
166 on_new_message_callback_(unread_count_);
169 on_new_message_callback_(std::min(unread_count_, qos_profile_.
depth()));
179 std::lock_guard<std::recursive_mutex> lock(callback_mutex_);
180 on_new_message_callback_ =
nullptr;
184 std::recursive_mutex callback_mutex_;
185 std::function<void(
size_t)> on_new_message_callback_ {
nullptr};
186 size_t unread_count_{0};
190 trigger_guard_condition() = 0;
193 invoke_on_new_message()
195 std::lock_guard<std::recursive_mutex> lock(this->callback_mutex_);
196 if (this->on_new_message_callback_) {
197 this->on_new_message_callback_(1);
199 this->unread_count_++;
204 std::string topic_name_;
A condition that can be waited on in a single wait set and asynchronously triggered.
Encapsulation of Quality of Service settings.
QoS & history(HistoryPolicy history)
Set the history policy.
size_t depth() const
Get the history depth.
Subscription implementation, templated on the type of message this subscription receives.
void execute(const std::shared_ptr< void > &data) override=0
Execute data that is passed in.
void clear_on_ready_callback() override
Unset the callback registered for new messages, if any.
std::shared_ptr< void > take_data_by_entity_id(size_t id) override
Take the data so that it can be consumed with execute.
bool is_ready(const rcl_wait_set_t &wait_set) override=0
Check if the Waitable is ready.
void set_on_ready_callback(std::function< void(size_t, int)> callback) override
Set a callback to be called when each new message arrives.
RCLCPP_PUBLIC void add_to_wait_set(rcl_wait_set_t &wait_set) override
Add the Waitable to a wait set.
std::shared_ptr< void > take_data() override=0
Take the data so that it can be consumed with execute.
RCLCPP_PUBLIC size_t get_number_of_ready_guard_conditions() override
Get the number of ready guard_conditions.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
RCLCPP_PUBLIC Logger get_logger(const std::string &name)
Return a named logger.
Container for subscription's, guard condition's, etc to be waited on.