15 #ifndef RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BASE_HPP_
16 #define RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BASE_HPP_
24 #include "rmw/impl/cpp/demangle.hpp"
26 #include "rclcpp/guard_condition.hpp"
27 #include "rclcpp/logging.hpp"
28 #include "rclcpp/qos.hpp"
29 #include "rclcpp/waitable.hpp"
33 namespace experimental
41 enum class EntityType : std::size_t
48 rclcpp::Context::SharedPtr context,
49 const std::string & topic_name,
51 : gc_(context), topic_name_(topic_name), qos_profile_(qos_profile)
79 execute(std::shared_ptr<void> & data)
override = 0;
83 use_take_shared_method()
const = 0;
87 get_topic_name()
const;
91 get_actual_qos()
const;
125 throw std::invalid_argument(
126 "The callback passed to set_on_ready_callback "
132 [callback,
this](
size_t number_of_events) {
134 callback(number_of_events,
static_cast<int>(EntityType::Subscription));
135 }
catch (
const std::exception & exception) {
139 "rclcpp::SubscriptionIntraProcessBase@" <<
this <<
140 " caught " << rmw::impl::cpp::demangle(exception) <<
141 " exception in user-provided callback for the 'on ready' callback: " <<
146 "rclcpp::SubscriptionIntraProcessBase@" <<
this <<
147 " caught unhandled exception in user-provided callback " <<
148 "for the 'on ready' callback");
152 std::lock_guard<std::recursive_mutex> lock(callback_mutex_);
153 on_new_message_callback_ = new_callback;
155 if (unread_count_ > 0) {
156 if (qos_profile_.
history() == HistoryPolicy::KeepAll) {
157 on_new_message_callback_(unread_count_);
160 on_new_message_callback_(std::min(unread_count_, qos_profile_.
depth()));
170 std::lock_guard<std::recursive_mutex> lock(callback_mutex_);
171 on_new_message_callback_ =
nullptr;
175 std::recursive_mutex callback_mutex_;
176 std::function<void(
size_t)> on_new_message_callback_ {
nullptr};
177 size_t unread_count_{0};
181 trigger_guard_condition() = 0;
184 invoke_on_new_message()
186 std::lock_guard<std::recursive_mutex> lock(this->callback_mutex_);
187 if (this->on_new_message_callback_) {
188 this->on_new_message_callback_(1);
190 this->unread_count_++;
195 std::string topic_name_;
A condition that can be waited on in a single wait set and asynchronously triggered.
Encapsulation of Quality of Service settings.
QoS & history(HistoryPolicy history)
Set the history policy.
size_t depth() const
Get the history depth.
Subscription implementation, templated on the type of message this subscription receives.
void clear_on_ready_callback() override
Unset the callback registered for new messages, if any.
std::shared_ptr< void > take_data_by_entity_id(size_t id) override
Take the data so that it can be consumed with execute.
RCLCPP_PUBLIC void add_to_wait_set(rcl_wait_set_t *wait_set) override
Add the Waitable to a wait set.
void execute(std::shared_ptr< void > &data) override=0
Execute data that is passed in.
void set_on_ready_callback(std::function< void(size_t, int)> callback) override
Set a callback to be called when each new message arrives.
bool is_ready(rcl_wait_set_t *wait_set) override=0
Check if the Waitable is ready.
std::shared_ptr< void > take_data() override=0
Take the data so that it can be consumed with execute.
RCLCPP_PUBLIC size_t get_number_of_ready_guard_conditions() override
Get the number of ready guard_conditions.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
RCLCPP_PUBLIC Logger get_logger(const std::string &name)
Return a named logger.
Container for subscription's, guard condition's, etc to be waited on.