15 #include "rclcpp/publisher_base.hpp"
17 #include <rmw/error_handling.h>
25 #include <unordered_map>
28 #include "rcutils/logging_macros.h"
29 #include "rmw/impl/cpp/demangle.hpp"
31 #include "rclcpp/allocator/allocator_common.hpp"
32 #include "rclcpp/allocator/allocator_deleter.hpp"
33 #include "rclcpp/exceptions.hpp"
34 #include "rclcpp/expand_topic_or_service_name.hpp"
35 #include "rclcpp/experimental/intra_process_manager.hpp"
36 #include "rclcpp/logging.hpp"
37 #include "rclcpp/macros.hpp"
38 #include "rclcpp/network_flow_endpoint.hpp"
39 #include "rclcpp/node.hpp"
40 #include "rclcpp/event_handler.hpp"
44 PublisherBase::PublisherBase(
46 const std::string & topic,
47 const rosidl_message_type_support_t & type_support,
50 bool use_default_callbacks)
51 : rcl_node_handle_(node_base->get_shared_rcl_node_handle()),
52 intra_process_is_enabled_(false),
53 intra_process_publisher_id_(0),
54 type_support_(type_support),
55 event_callbacks_(event_callbacks)
57 auto custom_deleter = [node_handle = this->rcl_node_handle_](
rcl_publisher_t * rcl_pub)
62 "Error in destruction of rcl publisher handle: %s",
63 rcl_get_error_string().str);
69 publisher_handle_ = std::shared_ptr<rcl_publisher_t>(
74 publisher_handle_.get(),
75 rcl_node_handle_.get(),
81 auto rcl_node_handle = rcl_node_handle_.get();
90 rclcpp::exceptions::throw_from_rcl_error(ret,
"could not create publisher");
94 if (!publisher_rmw_handle) {
95 auto msg = std::string(
"failed to get rmw handle: ") + rcl_get_error_string().str;
97 throw std::runtime_error(msg);
99 if (rmw_get_gid_for_publisher(publisher_rmw_handle, &rmw_gid_) != RMW_RET_OK) {
100 auto msg = std::string(
"failed to get publisher gid: ") + rmw_get_error_string().str;
102 throw std::runtime_error(msg);
108 PublisherBase::~PublisherBase()
111 event_handlers_.clear();
113 auto ipm = weak_ipm_.lock();
115 if (!intra_process_is_enabled_) {
122 "Intra process manager died before a publisher.");
125 ipm->remove_publisher(intra_process_publisher_id_);
138 if (event_callbacks.deadline_callback) {
139 this->add_event_handler(
140 event_callbacks.deadline_callback,
141 RCL_PUBLISHER_OFFERED_DEADLINE_MISSED);
143 if (event_callbacks.liveliness_callback) {
144 this->add_event_handler(
145 event_callbacks.liveliness_callback,
146 RCL_PUBLISHER_LIVELINESS_LOST);
149 QOSOfferedIncompatibleQoSCallbackType incompatible_qos_cb;
150 if (event_callbacks.incompatible_qos_callback) {
151 incompatible_qos_cb = event_callbacks.incompatible_qos_callback;
152 }
else if (use_default_callbacks) {
154 incompatible_qos_cb = [
this](QOSOfferedIncompatibleQoSInfo & info) {
155 this->default_incompatible_qos_callback(info);
159 if (incompatible_qos_cb) {
160 this->add_event_handler(incompatible_qos_cb, RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS);
165 "Failed to add event handler for incompatible qos; wrong callback type");
168 IncompatibleTypeCallbackType incompatible_type_cb;
169 if (event_callbacks.incompatible_type_callback) {
170 incompatible_type_cb = event_callbacks.incompatible_type_callback;
171 }
else if (use_default_callbacks) {
173 incompatible_type_cb = [
this](IncompatibleTypeInfo & info) {
174 this->default_incompatible_type_callback(info);
178 if (incompatible_type_cb) {
179 this->add_event_handler(incompatible_type_cb, RCL_PUBLISHER_INCOMPATIBLE_TYPE);
184 "Failed to add event handler for incompatible type; wrong callback type");
186 if (event_callbacks.matched_callback) {
187 this->add_event_handler(
188 event_callbacks.matched_callback,
189 RCL_PUBLISHER_MATCHED);
197 publisher_handle_.get());
198 if (!publisher_options) {
199 auto msg = std::string(
"failed to get publisher options: ") + rcl_get_error_string().str;
201 throw std::runtime_error(msg);
203 return publisher_options->
qos.depth;
212 std::shared_ptr<rcl_publisher_t>
215 return publisher_handle_;
218 std::shared_ptr<const rcl_publisher_t>
221 return publisher_handle_;
225 std::unordered_map<rcl_publisher_event_type_t, std::shared_ptr<rclcpp::EventHandlerBase>> &
228 return event_handlers_;
234 size_t inter_process_subscription_count = 0;
237 publisher_handle_.get(),
238 &inter_process_subscription_count);
251 rclcpp::exceptions::throw_from_rcl_error(status,
"failed to get get subscription count");
253 return inter_process_subscription_count;
259 auto ipm = weak_ipm_.lock();
260 if (!intra_process_is_enabled_) {
266 throw std::runtime_error(
267 "intra process subscriber count called after "
268 "destruction of intra process manager");
270 return ipm->get_subscription_count(intra_process_publisher_id_);
277 RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL;
285 auto msg = std::string(
"failed to get qos settings: ") + rcl_get_error_string().str;
287 throw std::runtime_error(msg);
308 return *
this == &gid;
315 auto ret = rmw_compare_gids_equal(gid, &this->
get_gid(), &result);
316 if (ret != RMW_RET_OK) {
317 auto msg = std::string(
"failed to compare gids: ") + rmw_get_error_string().str;
319 throw std::runtime_error(msg);
326 uint64_t intra_process_publisher_id,
327 IntraProcessManagerSharedPtr ipm)
329 intra_process_publisher_id_ = intra_process_publisher_id;
331 intra_process_is_enabled_ =
true;
335 PublisherBase::default_incompatible_qos_callback(
336 rclcpp::QOSOfferedIncompatibleQoSInfo & event)
const
338 std::string policy_name = qos_policy_name_from_kind(event.last_policy_kind);
341 "New subscription discovered on topic '%s', requesting incompatible QoS. "
342 "No messages will be sent to it. "
343 "Last incompatible policy: %s",
345 policy_name.c_str());
349 PublisherBase::default_incompatible_type_callback(
350 rclcpp::IncompatibleTypeInfo & event)
const
356 "Incompatible type on topic '%s', no messages will be sent to it.",
get_topic_name());
361 rcutils_allocator_t allocator = rcutils_get_default_allocator();
362 rcl_network_flow_endpoint_array_t network_flow_endpoint_array =
363 rcl_get_zero_initialized_network_flow_endpoint_array();
364 rcl_ret_t ret = rcl_publisher_get_network_flow_endpoints(
365 publisher_handle_.get(), &allocator, &network_flow_endpoint_array);
367 auto error_msg = std::string(
"error obtaining network flows of publisher: ") +
368 rcl_get_error_string().str;
371 rcl_network_flow_endpoint_array_fini(&network_flow_endpoint_array))
373 error_msg += std::string(
", also error cleaning up network flow array: ") +
374 rcl_get_error_string().str;
377 rclcpp::exceptions::throw_from_rcl_error(ret, error_msg);
380 std::vector<rclcpp::NetworkFlowEndpoint> network_flow_endpoint_vector;
381 for (
size_t i = 0; i < network_flow_endpoint_array.size; ++i) {
382 network_flow_endpoint_vector.push_back(
384 network_flow_endpoint_array.network_flow_endpoint[i]));
387 ret = rcl_network_flow_endpoint_array_fini(&network_flow_endpoint_array);
389 rclcpp::exceptions::throw_from_rcl_error(ret,
"error cleaning up network flow array");
392 return network_flow_endpoint_vector;
397 if (!intra_process_is_enabled_) {
401 auto ipm = weak_ipm_.lock();
407 "Intra process manager died for a publisher.");
411 return ipm->lowest_available_capacity(intra_process_publisher_id_);
RCLCPP_PUBLIC Logger get_child(const std::string &suffix)
Return a logger that is a descendant of this logger.
RCLCPP_PUBLIC const rmw_gid_t & get_gid() const
Get the global identifier for this publisher (used in rmw and by DDS).
RCLCPP_PUBLIC std::shared_ptr< rcl_publisher_t > get_publisher_handle()
Get the rcl publisher handle.
RCLCPP_PUBLIC size_t get_intra_process_subscription_count() const
Get intraprocess subscription count.
RCLCPP_PUBLIC void bind_event_callbacks(const PublisherEventCallbacks &event_callbacks, bool use_default_callbacks)
Add event handlers for passed in event_callbacks.
RCLCPP_PUBLIC rclcpp::QoS get_actual_qos() const
Get the actual QoS settings, after the defaults have been determined.
RCLCPP_PUBLIC const char * get_topic_name() const
Get the topic that this publisher publishes on.
RCLCPP_PUBLIC bool operator==(const rmw_gid_t &gid) const
Compare this publisher to a gid.
RCLCPP_PUBLIC void setup_intra_process(uint64_t intra_process_publisher_id, IntraProcessManagerSharedPtr ipm)
Implementation utility function used to setup intra process publishing after creation.
RCLCPP_PUBLIC size_t get_queue_size() const
Get the queue size for this publisher.
RCLCPP_PUBLIC bool can_loan_messages() const
Check if publisher instance can loan messages.
RCLCPP_PUBLIC bool is_durability_transient_local() const
Get if durability is transient local.
RCLCPP_PUBLIC size_t get_subscription_count() const
Get subscription count.
RCLCPP_PUBLIC std::vector< rclcpp::NetworkFlowEndpoint > get_network_flow_endpoints() const
Get network flow endpoints.
RCLCPP_PUBLIC RCUTILS_WARN_UNUSED bool assert_liveliness() const
Manually assert that this Publisher is alive (for RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC).
RCLCPP_PUBLIC const std::unordered_map< rcl_publisher_event_type_t, std::shared_ptr< rclcpp::EventHandlerBase > > & get_event_handlers() const
Get all the QoS event handlers associated with this publisher.
RCLCPP_PUBLIC size_t lowest_available_ipm_capacity() const
Return the lowest available capacity for all subscription buffers.
Encapsulation of Quality of Service settings.
Pure virtual interface class for the NodeBase part of the Node API.
RCL_PUBLIC RCL_WARN_UNUSED bool rcl_context_is_valid(const rcl_context_t *context)
Return true if the given context is currently valid, otherwise false.
RCLCPP_PUBLIC std::string expand_topic_or_service_name(const std::string &name, const std::string &node_name, const std::string &namespace_, bool is_service=false)
Expand a topic or service name and throw if it is not valid.
RCLCPP_PUBLIC Logger get_node_logger(const rcl_node_t *node)
Return a named logger using an rcl_node_t.
RCLCPP_PUBLIC Logger get_logger(const std::string &name)
Return a named logger.
RCL_PUBLIC RCL_WARN_UNUSED const char * rcl_node_get_name(const rcl_node_t *node)
Return the name of the node.
RCL_PUBLIC RCL_WARN_UNUSED const char * rcl_node_get_namespace(const rcl_node_t *node)
Return the namespace of the node.
RCL_PUBLIC RCL_WARN_UNUSED const char * rcl_node_get_logger_name(const rcl_node_t *node)
Return the logger name of the node.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_publisher_init(rcl_publisher_t *publisher, const rcl_node_t *node, const rosidl_message_type_support_t *type_support, const char *topic_name, const rcl_publisher_options_t *options)
Initialize a rcl publisher.
RCL_PUBLIC RCL_WARN_UNUSED rcl_context_t * rcl_publisher_get_context(const rcl_publisher_t *publisher)
Return the context associated with this publisher.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_publisher_get_subscription_count(const rcl_publisher_t *publisher, size_t *subscription_count)
Get the number of subscriptions matched to a publisher.
RCL_PUBLIC RCL_WARN_UNUSED rmw_publisher_t * rcl_publisher_get_rmw_handle(const rcl_publisher_t *publisher)
Return the rmw publisher handle.
RCL_PUBLIC RCL_WARN_UNUSED const char * rcl_publisher_get_topic_name(const rcl_publisher_t *publisher)
Get the topic name for the publisher.
RCL_PUBLIC RCL_WARN_UNUSED const rmw_qos_profile_t * rcl_publisher_get_actual_qos(const rcl_publisher_t *publisher)
Get the actual qos settings of the publisher.
RCL_PUBLIC bool rcl_publisher_is_valid_except_context(const rcl_publisher_t *publisher)
Return true if the publisher is valid except the context, otherwise false.
RCL_PUBLIC RCL_WARN_UNUSED const rcl_publisher_options_t * rcl_publisher_get_options(const rcl_publisher_t *publisher)
Return the rcl publisher options.
RCL_PUBLIC RCL_WARN_UNUSED rcl_publisher_t rcl_get_zero_initialized_publisher(void)
Return a rcl_publisher_t struct with members set to NULL.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_publisher_fini(rcl_publisher_t *publisher, rcl_node_t *node)
Finalize a rcl_publisher_t.
RCL_PUBLIC bool rcl_publisher_can_loan_messages(const rcl_publisher_t *publisher)
Check if publisher instance can loan messages.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_publisher_assert_liveliness(const rcl_publisher_t *publisher)
Manually assert that this Publisher is alive (for RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC)
Encapsulates the non-global state of an init/shutdown cycle.
Options available for a rcl publisher.
rmw_qos_profile_t qos
Middleware quality of service settings for the publisher.
Structure which encapsulates a ROS Publisher.
Contains callbacks for various types of events a Publisher can receive from the middleware.
static QoSInitialization from_rmw(const rmw_qos_profile_t &rmw_qos)
Create a QoSInitialization from an existing rmw_qos_profile_t, using its history and depth.
#define RCL_RET_OK
Success return code.
#define RCL_RET_TOPIC_NAME_INVALID
Topic name does not pass validation.
rmw_ret_t rcl_ret_t
The type that holds an rcl return code.
#define RCL_RET_PUBLISHER_INVALID
Invalid rcl_publisher_t given return code.