15 #include "rclcpp/publisher_base.hpp"
17 #include <rmw/error_handling.h>
25 #include <unordered_map>
28 #include "rcutils/logging_macros.h"
29 #include "rmw/impl/cpp/demangle.hpp"
31 #include "rclcpp/allocator/allocator_common.hpp"
32 #include "rclcpp/allocator/allocator_deleter.hpp"
33 #include "rclcpp/exceptions.hpp"
34 #include "rclcpp/expand_topic_or_service_name.hpp"
35 #include "rclcpp/experimental/intra_process_manager.hpp"
36 #include "rclcpp/logging.hpp"
37 #include "rclcpp/macros.hpp"
38 #include "rclcpp/network_flow_endpoint.hpp"
39 #include "rclcpp/node.hpp"
40 #include "rclcpp/event_handler.hpp"
44 PublisherBase::PublisherBase(
46 const std::string & topic,
47 const rosidl_message_type_support_t & type_support,
50 bool use_default_callbacks)
51 : rcl_node_handle_(node_base->get_shared_rcl_node_handle()),
52 intra_process_is_enabled_(false),
53 intra_process_publisher_id_(0),
54 type_support_(type_support),
55 event_callbacks_(event_callbacks)
57 auto custom_deleter = [node_handle = this->rcl_node_handle_](
rcl_publisher_t * rcl_pub)
62 "Error in destruction of rcl publisher handle: %s",
63 rcl_get_error_string().str);
69 publisher_handle_ = std::shared_ptr<rcl_publisher_t>(
74 publisher_handle_.get(),
75 rcl_node_handle_.get(),
81 auto rcl_node_handle = rcl_node_handle_.get();
90 rclcpp::exceptions::throw_from_rcl_error(ret,
"could not create publisher");
94 if (!publisher_rmw_handle) {
95 auto msg = std::string(
"failed to get rmw handle: ") + rcl_get_error_string().str;
97 throw std::runtime_error(msg);
99 if (rmw_get_gid_for_publisher(publisher_rmw_handle, &rmw_gid_) != RMW_RET_OK) {
100 auto msg = std::string(
"failed to get publisher gid: ") + rmw_get_error_string().str;
102 throw std::runtime_error(msg);
108 PublisherBase::~PublisherBase()
111 event_handlers_.clear();
113 auto ipm = weak_ipm_.lock();
115 if (!intra_process_is_enabled_) {
122 "Intra process manager died before a publisher.");
125 ipm->remove_publisher(intra_process_publisher_id_);
139 if (event_callbacks.deadline_callback) {
140 this->add_event_handler(
141 event_callbacks.deadline_callback,
142 RCL_PUBLISHER_OFFERED_DEADLINE_MISSED);
147 "Failed to add event handler for deadline; not supported");
151 if (event_callbacks.liveliness_callback) {
152 this->add_event_handler(
153 event_callbacks.liveliness_callback,
154 RCL_PUBLISHER_LIVELINESS_LOST);
159 "Failed to add event handler for liveliness; not supported");
162 QOSOfferedIncompatibleQoSCallbackType incompatible_qos_cb;
163 if (event_callbacks.incompatible_qos_callback) {
164 incompatible_qos_cb = event_callbacks.incompatible_qos_callback;
165 }
else if (use_default_callbacks) {
167 incompatible_qos_cb = [
this](QOSOfferedIncompatibleQoSInfo & info) {
168 this->default_incompatible_qos_callback(info);
172 if (incompatible_qos_cb) {
173 this->add_event_handler(incompatible_qos_cb, RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS);
178 "Failed to add event handler for incompatible qos; not supported");
181 IncompatibleTypeCallbackType incompatible_type_cb;
182 if (event_callbacks.incompatible_type_callback) {
183 incompatible_type_cb = event_callbacks.incompatible_type_callback;
184 }
else if (use_default_callbacks) {
186 incompatible_type_cb = [
this](IncompatibleTypeInfo & info) {
187 this->default_incompatible_type_callback(info);
191 if (incompatible_type_cb) {
192 this->add_event_handler(incompatible_type_cb, RCL_PUBLISHER_INCOMPATIBLE_TYPE);
197 "Failed to add event handler for incompatible type; not supported");
201 if (event_callbacks.matched_callback) {
202 this->add_event_handler(
203 event_callbacks.matched_callback,
204 RCL_PUBLISHER_MATCHED);
209 "Failed to add event handler for matched; not supported");
217 publisher_handle_.get());
218 if (!publisher_options) {
219 auto msg = std::string(
"failed to get publisher options: ") + rcl_get_error_string().str;
221 throw std::runtime_error(msg);
223 return publisher_options->
qos.depth;
232 std::shared_ptr<rcl_publisher_t>
235 return publisher_handle_;
238 std::shared_ptr<const rcl_publisher_t>
241 return publisher_handle_;
245 std::unordered_map<rcl_publisher_event_type_t, std::shared_ptr<rclcpp::EventHandlerBase>> &
248 return event_handlers_;
254 size_t inter_process_subscription_count = 0;
257 publisher_handle_.get(),
258 &inter_process_subscription_count);
271 rclcpp::exceptions::throw_from_rcl_error(status,
"failed to get get subscription count");
273 return inter_process_subscription_count;
279 auto ipm = weak_ipm_.lock();
280 if (!intra_process_is_enabled_) {
286 throw std::runtime_error(
287 "intra process subscriber count called after "
288 "destruction of intra process manager");
290 return ipm->get_subscription_count(intra_process_publisher_id_);
297 RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL;
305 auto msg = std::string(
"failed to get qos settings: ") + rcl_get_error_string().str;
307 throw std::runtime_error(msg);
328 return *
this == &gid;
335 auto ret = rmw_compare_gids_equal(gid, &this->
get_gid(), &result);
336 if (ret != RMW_RET_OK) {
337 auto msg = std::string(
"failed to compare gids: ") + rmw_get_error_string().str;
339 throw std::runtime_error(msg);
346 uint64_t intra_process_publisher_id,
347 IntraProcessManagerSharedPtr ipm)
349 intra_process_publisher_id_ = intra_process_publisher_id;
351 intra_process_is_enabled_ =
true;
355 PublisherBase::default_incompatible_qos_callback(
356 rclcpp::QOSOfferedIncompatibleQoSInfo & event)
const
358 std::string policy_name = qos_policy_name_from_kind(event.last_policy_kind);
361 "New subscription discovered on topic '%s', requesting incompatible QoS. "
362 "No messages will be sent to it. "
363 "Last incompatible policy: %s",
365 policy_name.c_str());
369 PublisherBase::default_incompatible_type_callback(
370 [[maybe_unused]] rclcpp::IncompatibleTypeInfo & event)
const
374 "Incompatible type on topic '%s', no messages will be sent to it.",
get_topic_name());
379 rcutils_allocator_t allocator = rcutils_get_default_allocator();
380 rcl_network_flow_endpoint_array_t network_flow_endpoint_array =
381 rcl_get_zero_initialized_network_flow_endpoint_array();
382 rcl_ret_t ret = rcl_publisher_get_network_flow_endpoints(
383 publisher_handle_.get(), &allocator, &network_flow_endpoint_array);
385 auto error_msg = std::string(
"error obtaining network flows of publisher: ") +
386 rcl_get_error_string().str;
389 rcl_network_flow_endpoint_array_fini(&network_flow_endpoint_array))
391 error_msg += std::string(
", also error cleaning up network flow array: ") +
392 rcl_get_error_string().str;
395 rclcpp::exceptions::throw_from_rcl_error(ret, error_msg);
398 std::vector<rclcpp::NetworkFlowEndpoint> network_flow_endpoint_vector;
399 for (
size_t i = 0; i < network_flow_endpoint_array.size; ++i) {
400 network_flow_endpoint_vector.push_back(
402 network_flow_endpoint_array.network_flow_endpoint[i]));
405 ret = rcl_network_flow_endpoint_array_fini(&network_flow_endpoint_array);
407 rclcpp::exceptions::throw_from_rcl_error(ret,
"error cleaning up network flow array");
410 return network_flow_endpoint_vector;
415 if (!intra_process_is_enabled_) {
419 auto ipm = weak_ipm_.lock();
425 "Intra process manager died for a publisher.");
429 return ipm->lowest_available_capacity(intra_process_publisher_id_);
RCLCPP_PUBLIC Logger get_child(const std::string &suffix)
Return a logger that is a descendant of this logger.
RCLCPP_PUBLIC const rmw_gid_t & get_gid() const
Get the global identifier for this publisher (used in rmw and by DDS).
RCLCPP_PUBLIC std::shared_ptr< rcl_publisher_t > get_publisher_handle()
Get the rcl publisher handle.
RCLCPP_PUBLIC size_t get_intra_process_subscription_count() const
Get intraprocess subscription count.
RCLCPP_PUBLIC void bind_event_callbacks(const PublisherEventCallbacks &event_callbacks, bool use_default_callbacks)
Add event handlers for passed in event_callbacks.
RCLCPP_PUBLIC rclcpp::QoS get_actual_qos() const
Get the actual QoS settings, after the defaults have been determined.
RCLCPP_PUBLIC const char * get_topic_name() const
Get the topic that this publisher publishes on.
RCLCPP_PUBLIC bool operator==(const rmw_gid_t &gid) const
Compare this publisher to a gid.
RCLCPP_PUBLIC void setup_intra_process(uint64_t intra_process_publisher_id, IntraProcessManagerSharedPtr ipm)
Implementation utility function used to setup intra process publishing after creation.
RCLCPP_PUBLIC size_t get_queue_size() const
Get the queue size for this publisher.
RCLCPP_PUBLIC bool can_loan_messages() const
Check if publisher instance can loan messages.
RCLCPP_PUBLIC bool is_durability_transient_local() const
Get if durability is transient local.
RCLCPP_PUBLIC size_t get_subscription_count() const
Get subscription count.
RCLCPP_PUBLIC std::vector< rclcpp::NetworkFlowEndpoint > get_network_flow_endpoints() const
Get network flow endpoints.
RCLCPP_PUBLIC RCUTILS_WARN_UNUSED bool assert_liveliness() const
Manually assert that this Publisher is alive (for RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC).
RCLCPP_PUBLIC const std::unordered_map< rcl_publisher_event_type_t, std::shared_ptr< rclcpp::EventHandlerBase > > & get_event_handlers() const
Get all the QoS event handlers associated with this publisher.
RCLCPP_PUBLIC size_t lowest_available_ipm_capacity() const
Return the lowest available capacity for all subscription buffers.
Encapsulation of Quality of Service settings.
Pure virtual interface class for the NodeBase part of the Node API.
RCL_PUBLIC RCL_WARN_UNUSED bool rcl_context_is_valid(const rcl_context_t *context)
Return true if the given context is currently valid, otherwise false.
RCLCPP_PUBLIC std::string expand_topic_or_service_name(const std::string &name, const std::string &node_name, const std::string &namespace_, bool is_service=false)
Expand a topic or service name and throw if it is not valid.
RCLCPP_PUBLIC Logger get_node_logger(const rcl_node_t *node)
Return a named logger using an rcl_node_t.
RCLCPP_PUBLIC Logger get_logger(const std::string &name)
Return a named logger.
RCL_PUBLIC RCL_WARN_UNUSED const char * rcl_node_get_name(const rcl_node_t *node)
Return the name of the node.
RCL_PUBLIC RCL_WARN_UNUSED const char * rcl_node_get_namespace(const rcl_node_t *node)
Return the namespace of the node.
RCL_PUBLIC RCL_WARN_UNUSED const char * rcl_node_get_logger_name(const rcl_node_t *node)
Return the logger name of the node.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_publisher_init(rcl_publisher_t *publisher, const rcl_node_t *node, const rosidl_message_type_support_t *type_support, const char *topic_name, const rcl_publisher_options_t *options)
Initialize a rcl publisher.
RCL_PUBLIC RCL_WARN_UNUSED rcl_context_t * rcl_publisher_get_context(const rcl_publisher_t *publisher)
Return the context associated with this publisher.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_publisher_get_subscription_count(const rcl_publisher_t *publisher, size_t *subscription_count)
Get the number of subscriptions matched to a publisher.
RCL_PUBLIC RCL_WARN_UNUSED rmw_publisher_t * rcl_publisher_get_rmw_handle(const rcl_publisher_t *publisher)
Return the rmw publisher handle.
RCL_PUBLIC RCL_WARN_UNUSED const char * rcl_publisher_get_topic_name(const rcl_publisher_t *publisher)
Get the topic name for the publisher.
RCL_PUBLIC RCL_WARN_UNUSED const rmw_qos_profile_t * rcl_publisher_get_actual_qos(const rcl_publisher_t *publisher)
Get the actual qos settings of the publisher.
RCL_PUBLIC bool rcl_publisher_is_valid_except_context(const rcl_publisher_t *publisher)
Return true if the publisher is valid except the context, otherwise false.
RCL_PUBLIC RCL_WARN_UNUSED const rcl_publisher_options_t * rcl_publisher_get_options(const rcl_publisher_t *publisher)
Return the rcl publisher options.
RCL_PUBLIC RCL_WARN_UNUSED rcl_publisher_t rcl_get_zero_initialized_publisher(void)
Return a rcl_publisher_t struct with members set to NULL.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_publisher_fini(rcl_publisher_t *publisher, rcl_node_t *node)
Finalize a rcl_publisher_t.
RCL_PUBLIC bool rcl_publisher_can_loan_messages(const rcl_publisher_t *publisher)
Check if publisher instance can loan messages.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_publisher_assert_liveliness(const rcl_publisher_t *publisher)
Manually assert that this Publisher is alive (for RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC)
Encapsulates the non-global state of an init/shutdown cycle.
Options available for a rcl publisher.
rmw_qos_profile_t qos
Middleware quality of service settings for the publisher.
Structure which encapsulates a ROS Publisher.
Contains callbacks for various types of events a Publisher can receive from the middleware.
static QoSInitialization from_rmw(const rmw_qos_profile_t &rmw_qos)
Create a QoSInitialization from an existing rmw_qos_profile_t, using its history and depth.
#define RCL_RET_OK
Success return code.
#define RCL_RET_TOPIC_NAME_INVALID
Topic name does not pass validation.
rmw_ret_t rcl_ret_t
The type that holds an rcl return code.
#define RCL_RET_PUBLISHER_INVALID
Invalid rcl_publisher_t given return code.