15 #include "rclcpp/publisher_base.hpp"
17 #include <rmw/error_handling.h>
25 #include <unordered_map>
28 #include "rcutils/logging_macros.h"
29 #include "rmw/impl/cpp/demangle.hpp"
31 #include "rclcpp/allocator/allocator_common.hpp"
32 #include "rclcpp/allocator/allocator_deleter.hpp"
33 #include "rclcpp/exceptions.hpp"
34 #include "rclcpp/expand_topic_or_service_name.hpp"
35 #include "rclcpp/experimental/intra_process_manager.hpp"
36 #include "rclcpp/logging.hpp"
37 #include "rclcpp/macros.hpp"
38 #include "rclcpp/network_flow_endpoint.hpp"
39 #include "rclcpp/node.hpp"
40 #include "rclcpp/qos_event.hpp"
44 PublisherBase::PublisherBase(
46 const std::string & topic,
47 const rosidl_message_type_support_t & type_support,
49 : rcl_node_handle_(node_base->get_shared_rcl_node_handle()),
50 intra_process_is_enabled_(false),
51 intra_process_publisher_id_(0),
52 type_support_(type_support)
54 auto custom_deleter = [node_handle = this->rcl_node_handle_](
rcl_publisher_t * rcl_pub)
59 "Error in destruction of rcl publisher handle: %s",
60 rcl_get_error_string().str);
66 publisher_handle_ = std::shared_ptr<rcl_publisher_t>(
71 publisher_handle_.get(),
72 rcl_node_handle_.get(),
78 auto rcl_node_handle = rcl_node_handle_.get();
87 rclcpp::exceptions::throw_from_rcl_error(ret,
"could not create publisher");
91 if (!publisher_rmw_handle) {
92 auto msg = std::string(
"failed to get rmw handle: ") + rcl_get_error_string().str;
94 throw std::runtime_error(msg);
96 if (rmw_get_gid_for_publisher(publisher_rmw_handle, &rmw_gid_) != RMW_RET_OK) {
97 auto msg = std::string(
"failed to get publisher gid: ") + rmw_get_error_string().str;
99 throw std::runtime_error(msg);
103 PublisherBase::~PublisherBase()
105 for (
const auto & pair : event_handlers_) {
111 event_handlers_.clear();
113 auto ipm = weak_ipm_.lock();
115 if (!intra_process_is_enabled_) {
122 "Intra process manager died before a publisher.");
125 ipm->remove_publisher(intra_process_publisher_id_);
138 publisher_handle_.get());
139 if (!publisher_options) {
140 auto msg = std::string(
"failed to get publisher options: ") + rcl_get_error_string().str;
142 throw std::runtime_error(msg);
144 return publisher_options->
qos.depth;
153 std::shared_ptr<rcl_publisher_t>
156 return publisher_handle_;
159 std::shared_ptr<const rcl_publisher_t>
162 return publisher_handle_;
166 std::unordered_map<rcl_publisher_event_type_t, std::shared_ptr<rclcpp::QOSEventHandlerBase>> &
169 return event_handlers_;
175 size_t inter_process_subscription_count = 0;
178 publisher_handle_.get(),
179 &inter_process_subscription_count);
192 rclcpp::exceptions::throw_from_rcl_error(status,
"failed to get get subscription count");
194 return inter_process_subscription_count;
200 auto ipm = weak_ipm_.lock();
201 if (!intra_process_is_enabled_) {
207 throw std::runtime_error(
208 "intra process subscriber count called after "
209 "destruction of intra process manager");
211 return ipm->get_subscription_count(intra_process_publisher_id_);
219 auto msg = std::string(
"failed to get qos settings: ") + rcl_get_error_string().str;
221 throw std::runtime_error(msg);
242 return *
this == &gid;
249 auto ret = rmw_compare_gids_equal(gid, &this->
get_gid(), &result);
250 if (ret != RMW_RET_OK) {
251 auto msg = std::string(
"failed to compare gids: ") + rmw_get_error_string().str;
253 throw std::runtime_error(msg);
260 uint64_t intra_process_publisher_id,
261 IntraProcessManagerSharedPtr ipm)
263 intra_process_publisher_id_ = intra_process_publisher_id;
265 intra_process_is_enabled_ =
true;
269 PublisherBase::default_incompatible_qos_callback(
270 rclcpp::QOSOfferedIncompatibleQoSInfo & event)
const
272 std::string policy_name = qos_policy_name_from_kind(event.last_policy_kind);
275 "New subscription discovered on topic '%s', requesting incompatible QoS. "
276 "No messages will be sent to it. "
277 "Last incompatible policy: %s",
279 policy_name.c_str());
284 rcutils_allocator_t allocator = rcutils_get_default_allocator();
285 rcl_network_flow_endpoint_array_t network_flow_endpoint_array =
286 rcl_get_zero_initialized_network_flow_endpoint_array();
287 rcl_ret_t ret = rcl_publisher_get_network_flow_endpoints(
288 publisher_handle_.get(), &allocator, &network_flow_endpoint_array);
290 auto error_msg = std::string(
"error obtaining network flows of publisher: ") +
291 rcl_get_error_string().str;
294 rcl_network_flow_endpoint_array_fini(&network_flow_endpoint_array))
296 error_msg += std::string(
", also error cleaning up network flow array: ") +
297 rcl_get_error_string().str;
300 rclcpp::exceptions::throw_from_rcl_error(ret, error_msg);
303 std::vector<rclcpp::NetworkFlowEndpoint> network_flow_endpoint_vector;
304 for (
size_t i = 0; i < network_flow_endpoint_array.size; ++i) {
305 network_flow_endpoint_vector.push_back(
307 network_flow_endpoint_array.network_flow_endpoint[i]));
310 ret = rcl_network_flow_endpoint_array_fini(&network_flow_endpoint_array);
312 rclcpp::exceptions::throw_from_rcl_error(ret,
"error cleaning up network flow array");
315 return network_flow_endpoint_vector;
RCLCPP_PUBLIC Logger get_child(const std::string &suffix)
Return a logger that is a descendant of this logger.
RCLCPP_PUBLIC const rmw_gid_t & get_gid() const
Get the global identifier for this publisher (used in rmw and by DDS).
RCLCPP_PUBLIC std::shared_ptr< rcl_publisher_t > get_publisher_handle()
Get the rcl publisher handle.
RCLCPP_PUBLIC size_t get_intra_process_subscription_count() const
Get intraprocess subscription count.
RCLCPP_PUBLIC rclcpp::QoS get_actual_qos() const
Get the actual QoS settings, after the defaults have been determined.
RCLCPP_PUBLIC const char * get_topic_name() const
Get the topic that this publisher publishes on.
RCLCPP_PUBLIC bool operator==(const rmw_gid_t &gid) const
Compare this publisher to a gid.
RCLCPP_PUBLIC void setup_intra_process(uint64_t intra_process_publisher_id, IntraProcessManagerSharedPtr ipm)
Implementation utility function used to setup intra process publishing after creation.
RCLCPP_PUBLIC size_t get_queue_size() const
Get the queue size for this publisher.
void clear_on_new_qos_event_callback(rcl_publisher_event_type_t event_type)
Unset the callback registered for new qos events, if any.
RCLCPP_PUBLIC bool can_loan_messages() const
Check if publisher instance can loan messages.
RCLCPP_PUBLIC size_t get_subscription_count() const
Get subscription count.
RCLCPP_PUBLIC std::vector< rclcpp::NetworkFlowEndpoint > get_network_flow_endpoints() const
Get network flow endpoints.
RCLCPP_PUBLIC RCUTILS_WARN_UNUSED bool assert_liveliness() const
Manually assert that this Publisher is alive (for RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC).
RCLCPP_PUBLIC const std::unordered_map< rcl_publisher_event_type_t, std::shared_ptr< rclcpp::QOSEventHandlerBase > > & get_event_handlers() const
Get all the QoS event handlers associated with this publisher.
Encapsulation of Quality of Service settings.
Pure virtual interface class for the NodeBase part of the Node API.
RCL_PUBLIC RCL_WARN_UNUSED bool rcl_context_is_valid(const rcl_context_t *context)
Return true if the given context is currently valid, otherwise false.
enum rcl_publisher_event_type_e rcl_publisher_event_type_t
Enumeration of all of the publisher events that may fire.
RCLCPP_PUBLIC std::string expand_topic_or_service_name(const std::string &name, const std::string &node_name, const std::string &namespace_, bool is_service=false)
Expand a topic or service name and throw if it is not valid.
RCLCPP_PUBLIC Logger get_node_logger(const rcl_node_t *node)
Return a named logger using an rcl_node_t.
RCLCPP_PUBLIC Logger get_logger(const std::string &name)
Return a named logger.
RCL_PUBLIC RCL_WARN_UNUSED const char * rcl_node_get_name(const rcl_node_t *node)
Return the name of the node.
RCL_PUBLIC RCL_WARN_UNUSED const char * rcl_node_get_namespace(const rcl_node_t *node)
Return the namespace of the node.
RCL_PUBLIC RCL_WARN_UNUSED const char * rcl_node_get_logger_name(const rcl_node_t *node)
Return the logger name of the node.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_publisher_init(rcl_publisher_t *publisher, const rcl_node_t *node, const rosidl_message_type_support_t *type_support, const char *topic_name, const rcl_publisher_options_t *options)
Initialize a rcl publisher.
RCL_PUBLIC RCL_WARN_UNUSED rcl_context_t * rcl_publisher_get_context(const rcl_publisher_t *publisher)
Return the context associated with this publisher.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_publisher_get_subscription_count(const rcl_publisher_t *publisher, size_t *subscription_count)
Get the number of subscriptions matched to a publisher.
RCL_PUBLIC RCL_WARN_UNUSED rmw_publisher_t * rcl_publisher_get_rmw_handle(const rcl_publisher_t *publisher)
Return the rmw publisher handle.
RCL_PUBLIC RCL_WARN_UNUSED const char * rcl_publisher_get_topic_name(const rcl_publisher_t *publisher)
Get the topic name for the publisher.
RCL_PUBLIC RCL_WARN_UNUSED const rmw_qos_profile_t * rcl_publisher_get_actual_qos(const rcl_publisher_t *publisher)
Get the actual qos settings of the publisher.
RCL_PUBLIC bool rcl_publisher_is_valid_except_context(const rcl_publisher_t *publisher)
Return true if the publisher is valid except the context, otherwise false.
RCL_PUBLIC RCL_WARN_UNUSED const rcl_publisher_options_t * rcl_publisher_get_options(const rcl_publisher_t *publisher)
Return the rcl publisher options.
RCL_PUBLIC RCL_WARN_UNUSED rcl_publisher_t rcl_get_zero_initialized_publisher(void)
Return a rcl_publisher_t struct with members set to NULL.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_publisher_fini(rcl_publisher_t *publisher, rcl_node_t *node)
Finalize a rcl_publisher_t.
RCL_PUBLIC bool rcl_publisher_can_loan_messages(const rcl_publisher_t *publisher)
Check if publisher instance can loan messages.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_publisher_assert_liveliness(const rcl_publisher_t *publisher)
Manually assert that this Publisher is alive (for RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC)
Encapsulates the non-global state of an init/shutdown cycle.
Options available for a rcl publisher.
rmw_qos_profile_t qos
Middleware quality of service settings for the publisher.
Structure which encapsulates a ROS Publisher.
static QoSInitialization from_rmw(const rmw_qos_profile_t &rmw_qos)
Create a QoSInitialization from an existing rmw_qos_profile_t, using its history and depth.
#define RCL_RET_OK
Success return code.
#define RCL_RET_TOPIC_NAME_INVALID
Topic name does not pass validation.
rmw_ret_t rcl_ret_t
The type that holds an rcl return code.
#define RCL_RET_PUBLISHER_INVALID
Invalid rcl_publisher_t given return code.