15 #ifndef RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BASE_HPP_
16 #define RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BASE_HPP_
25 #include "rmw/impl/cpp/demangle.hpp"
27 #include "rclcpp/guard_condition.hpp"
28 #include "rclcpp/logging.hpp"
29 #include "rclcpp/qos.hpp"
30 #include "rclcpp/waitable.hpp"
34 namespace experimental
42 enum class EntityType : std::size_t
49 rclcpp::Context::SharedPtr context,
50 const std::string & topic_name,
52 : gc_(context), topic_name_(topic_name), qos_profile_(qos_profile)
69 available_capacity()
const = 0;
73 is_durability_transient_local()
const;
82 take_data_by_entity_id([[maybe_unused]]
size_t id)
override
88 execute(
const std::shared_ptr<void> & data)
override = 0;
92 use_take_shared_method()
const = 0;
96 get_topic_name()
const;
100 get_actual_qos()
const;
134 throw std::invalid_argument(
135 "The callback passed to set_on_ready_callback "
141 [callback,
this](
size_t number_of_events) {
143 callback(number_of_events,
static_cast<int>(EntityType::Subscription));
144 }
catch (
const std::exception & exception) {
148 "rclcpp::SubscriptionIntraProcessBase@" <<
this <<
149 " caught " << rmw::impl::cpp::demangle(exception) <<
150 " exception in user-provided callback for the 'on ready' callback: " <<
155 "rclcpp::SubscriptionIntraProcessBase@" <<
this <<
156 " caught unhandled exception in user-provided callback " <<
157 "for the 'on ready' callback");
161 std::lock_guard<std::recursive_mutex> lock(callback_mutex_);
162 on_new_message_callback_ = new_callback;
164 if (unread_count_ > 0) {
165 if (qos_profile_.
history() == HistoryPolicy::KeepAll) {
166 on_new_message_callback_(unread_count_);
169 on_new_message_callback_(std::min(unread_count_, qos_profile_.
depth()));
179 std::lock_guard<std::recursive_mutex> lock(callback_mutex_);
180 on_new_message_callback_ =
nullptr;
184 std::vector<std::shared_ptr<rclcpp::TimerBase>>
191 std::recursive_mutex callback_mutex_;
192 std::function<void(
size_t)> on_new_message_callback_ {
nullptr};
193 size_t unread_count_{0};
197 trigger_guard_condition() = 0;
200 invoke_on_new_message()
202 std::lock_guard<std::recursive_mutex> lock(this->callback_mutex_);
203 if (this->on_new_message_callback_) {
204 this->on_new_message_callback_(1);
206 this->unread_count_++;
211 std::string topic_name_;
A condition that can be waited on in a single wait set and asynchronously triggered.
Encapsulation of Quality of Service settings.
QoS & history(HistoryPolicy history)
Set the history policy.
size_t depth() const
Get the history depth.
Subscription implementation, templated on the type of message this subscription receives.
void execute(const std::shared_ptr< void > &data) override=0
Execute data that is passed in.
void clear_on_ready_callback() override
Unset the callback registered for new messages, if any.
bool is_ready(const rcl_wait_set_t &wait_set) override=0
Check if the Waitable is ready.
void set_on_ready_callback(std::function< void(size_t, int)> callback) override
Set a callback to be called when each new message arrives.
RCLCPP_PUBLIC std::vector< std::shared_ptr< rclcpp::TimerBase > > get_timers() const override
Returns all timers used by this waitable.
RCLCPP_PUBLIC void add_to_wait_set(rcl_wait_set_t &wait_set) override
Add the Waitable to a wait set.
std::shared_ptr< void > take_data() override=0
Take the data so that it can be consumed with execute.
RCLCPP_PUBLIC size_t get_number_of_ready_guard_conditions() override
Get the number of ready guard_conditions.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
RCLCPP_PUBLIC Logger get_logger(const std::string &name)
Return a named logger.
Container for subscription's, guard condition's, etc to be waited on.