ROS 2 rclcpp + rcl - humble  humble
ROS 2 C++ Client Library with ROS Client Library
publisher_base.cpp
1 // Copyright 2015 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "rclcpp/publisher_base.hpp"
16 
17 #include <rmw/error_handling.h>
18 #include <rmw/rmw.h>
19 
20 #include <iostream>
21 #include <memory>
22 #include <mutex>
23 #include <sstream>
24 #include <string>
25 #include <unordered_map>
26 #include <vector>
27 
28 #include "rcutils/logging_macros.h"
29 #include "rmw/impl/cpp/demangle.hpp"
30 
31 #include "rclcpp/allocator/allocator_common.hpp"
32 #include "rclcpp/allocator/allocator_deleter.hpp"
33 #include "rclcpp/exceptions.hpp"
34 #include "rclcpp/expand_topic_or_service_name.hpp"
35 #include "rclcpp/experimental/intra_process_manager.hpp"
36 #include "rclcpp/logging.hpp"
37 #include "rclcpp/macros.hpp"
38 #include "rclcpp/network_flow_endpoint.hpp"
39 #include "rclcpp/node.hpp"
40 #include "rclcpp/qos_event.hpp"
41 
43 
44 PublisherBase::PublisherBase(
46  const std::string & topic,
47  const rosidl_message_type_support_t & type_support,
48  const rcl_publisher_options_t & publisher_options)
49 : rcl_node_handle_(node_base->get_shared_rcl_node_handle()),
50  intra_process_is_enabled_(false),
51  intra_process_publisher_id_(0),
52  type_support_(type_support)
53 {
54  auto custom_deleter = [node_handle = this->rcl_node_handle_](rcl_publisher_t * rcl_pub)
55  {
56  if (rcl_publisher_fini(rcl_pub, node_handle.get()) != RCL_RET_OK) {
57  RCLCPP_ERROR(
58  rclcpp::get_node_logger(node_handle.get()).get_child("rclcpp"),
59  "Error in destruction of rcl publisher handle: %s",
60  rcl_get_error_string().str);
61  rcl_reset_error();
62  }
63  delete rcl_pub;
64  };
65 
66  publisher_handle_ = std::shared_ptr<rcl_publisher_t>(
67  new rcl_publisher_t, custom_deleter);
68  *publisher_handle_.get() = rcl_get_zero_initialized_publisher();
69 
71  publisher_handle_.get(),
72  rcl_node_handle_.get(),
73  &type_support,
74  topic.c_str(),
75  &publisher_options);
76  if (ret != RCL_RET_OK) {
77  if (ret == RCL_RET_TOPIC_NAME_INVALID) {
78  auto rcl_node_handle = rcl_node_handle_.get();
79  // this will throw on any validation problem
80  rcl_reset_error();
82  topic,
83  rcl_node_get_name(rcl_node_handle),
84  rcl_node_get_namespace(rcl_node_handle));
85  }
86 
87  rclcpp::exceptions::throw_from_rcl_error(ret, "could not create publisher");
88  }
89  // Life time of this object is tied to the publisher handle.
90  rmw_publisher_t * publisher_rmw_handle = rcl_publisher_get_rmw_handle(publisher_handle_.get());
91  if (!publisher_rmw_handle) {
92  auto msg = std::string("failed to get rmw handle: ") + rcl_get_error_string().str;
93  rcl_reset_error();
94  throw std::runtime_error(msg);
95  }
96  if (rmw_get_gid_for_publisher(publisher_rmw_handle, &rmw_gid_) != RMW_RET_OK) {
97  auto msg = std::string("failed to get publisher gid: ") + rmw_get_error_string().str;
98  rmw_reset_error();
99  throw std::runtime_error(msg);
100  }
101 }
102 
103 PublisherBase::~PublisherBase()
104 {
105  for (const auto & pair : event_handlers_) {
106  rcl_publisher_event_type_t event_type = pair.first;
108  }
109 
110  // must fini the events before fini-ing the publisher
111  event_handlers_.clear();
112 
113  auto ipm = weak_ipm_.lock();
114 
115  if (!intra_process_is_enabled_) {
116  return;
117  }
118  if (!ipm) {
119  // TODO(ivanpauno): should this raise an error?
120  RCLCPP_WARN(
121  rclcpp::get_logger("rclcpp"),
122  "Intra process manager died before a publisher.");
123  return;
124  }
125  ipm->remove_publisher(intra_process_publisher_id_);
126 }
127 
128 const char *
130 {
131  return rcl_publisher_get_topic_name(publisher_handle_.get());
132 }
133 
134 size_t
136 {
137  const rcl_publisher_options_t * publisher_options = rcl_publisher_get_options(
138  publisher_handle_.get());
139  if (!publisher_options) {
140  auto msg = std::string("failed to get publisher options: ") + rcl_get_error_string().str;
141  rcl_reset_error();
142  throw std::runtime_error(msg);
143  }
144  return publisher_options->qos.depth;
145 }
146 
147 const rmw_gid_t &
149 {
150  return rmw_gid_;
151 }
152 
153 std::shared_ptr<rcl_publisher_t>
155 {
156  return publisher_handle_;
157 }
158 
159 std::shared_ptr<const rcl_publisher_t>
161 {
162  return publisher_handle_;
163 }
164 
165 const
166 std::unordered_map<rcl_publisher_event_type_t, std::shared_ptr<rclcpp::QOSEventHandlerBase>> &
168 {
169  return event_handlers_;
170 }
171 
172 size_t
174 {
175  size_t inter_process_subscription_count = 0;
176 
178  publisher_handle_.get(),
179  &inter_process_subscription_count);
180 
181  if (RCL_RET_PUBLISHER_INVALID == status) {
182  rcl_reset_error(); /* next call will reset error message if not context */
183  if (rcl_publisher_is_valid_except_context(publisher_handle_.get())) {
184  rcl_context_t * context = rcl_publisher_get_context(publisher_handle_.get());
185  if (nullptr != context && !rcl_context_is_valid(context)) {
186  /* publisher is invalid due to context being shutdown */
187  return 0;
188  }
189  }
190  }
191  if (RCL_RET_OK != status) {
192  rclcpp::exceptions::throw_from_rcl_error(status, "failed to get get subscription count");
193  }
194  return inter_process_subscription_count;
195 }
196 
197 size_t
199 {
200  auto ipm = weak_ipm_.lock();
201  if (!intra_process_is_enabled_) {
202  return 0;
203  }
204  if (!ipm) {
205  // TODO(ivanpauno): should this just return silently? Or maybe return with a warning?
206  // Same as wjwwood comment in publisher_factory create_shared_publish_callback.
207  throw std::runtime_error(
208  "intra process subscriber count called after "
209  "destruction of intra process manager");
210  }
211  return ipm->get_subscription_count(intra_process_publisher_id_);
212 }
213 
216 {
217  const rmw_qos_profile_t * qos = rcl_publisher_get_actual_qos(publisher_handle_.get());
218  if (!qos) {
219  auto msg = std::string("failed to get qos settings: ") + rcl_get_error_string().str;
220  rcl_reset_error();
221  throw std::runtime_error(msg);
222  }
223 
225 }
226 
227 bool
229 {
230  return RCL_RET_OK == rcl_publisher_assert_liveliness(publisher_handle_.get());
231 }
232 
233 bool
235 {
236  return rcl_publisher_can_loan_messages(publisher_handle_.get());
237 }
238 
239 bool
240 PublisherBase::operator==(const rmw_gid_t & gid) const
241 {
242  return *this == &gid;
243 }
244 
245 bool
246 PublisherBase::operator==(const rmw_gid_t * gid) const
247 {
248  bool result = false;
249  auto ret = rmw_compare_gids_equal(gid, &this->get_gid(), &result);
250  if (ret != RMW_RET_OK) {
251  auto msg = std::string("failed to compare gids: ") + rmw_get_error_string().str;
252  rmw_reset_error();
253  throw std::runtime_error(msg);
254  }
255  return result;
256 }
257 
258 void
260  uint64_t intra_process_publisher_id,
261  IntraProcessManagerSharedPtr ipm)
262 {
263  intra_process_publisher_id_ = intra_process_publisher_id;
264  weak_ipm_ = ipm;
265  intra_process_is_enabled_ = true;
266 }
267 
268 void
269 PublisherBase::default_incompatible_qos_callback(
270  rclcpp::QOSOfferedIncompatibleQoSInfo & event) const
271 {
272  std::string policy_name = qos_policy_name_from_kind(event.last_policy_kind);
273  RCLCPP_WARN(
274  rclcpp::get_logger(rcl_node_get_logger_name(rcl_node_handle_.get())),
275  "New subscription discovered on topic '%s', requesting incompatible QoS. "
276  "No messages will be sent to it. "
277  "Last incompatible policy: %s",
278  get_topic_name(),
279  policy_name.c_str());
280 }
281 
282 std::vector<rclcpp::NetworkFlowEndpoint> PublisherBase::get_network_flow_endpoints() const
283 {
284  rcutils_allocator_t allocator = rcutils_get_default_allocator();
285  rcl_network_flow_endpoint_array_t network_flow_endpoint_array =
286  rcl_get_zero_initialized_network_flow_endpoint_array();
287  rcl_ret_t ret = rcl_publisher_get_network_flow_endpoints(
288  publisher_handle_.get(), &allocator, &network_flow_endpoint_array);
289  if (RCL_RET_OK != ret) {
290  auto error_msg = std::string("error obtaining network flows of publisher: ") +
291  rcl_get_error_string().str;
292  rcl_reset_error();
293  if (RCL_RET_OK !=
294  rcl_network_flow_endpoint_array_fini(&network_flow_endpoint_array))
295  {
296  error_msg += std::string(", also error cleaning up network flow array: ") +
297  rcl_get_error_string().str;
298  rcl_reset_error();
299  }
300  rclcpp::exceptions::throw_from_rcl_error(ret, error_msg);
301  }
302 
303  std::vector<rclcpp::NetworkFlowEndpoint> network_flow_endpoint_vector;
304  for (size_t i = 0; i < network_flow_endpoint_array.size; ++i) {
305  network_flow_endpoint_vector.push_back(
307  network_flow_endpoint_array.network_flow_endpoint[i]));
308  }
309 
310  ret = rcl_network_flow_endpoint_array_fini(&network_flow_endpoint_array);
311  if (RCL_RET_OK != ret) {
312  rclcpp::exceptions::throw_from_rcl_error(ret, "error cleaning up network flow array");
313  }
314 
315  return network_flow_endpoint_vector;
316 }
RCLCPP_PUBLIC Logger get_child(const std::string &suffix)
Return a logger that is a descendant of this logger.
Definition: logger.hpp:160
RCLCPP_PUBLIC const rmw_gid_t & get_gid() const
Get the global identifier for this publisher (used in rmw and by DDS).
RCLCPP_PUBLIC std::shared_ptr< rcl_publisher_t > get_publisher_handle()
Get the rcl publisher handle.
RCLCPP_PUBLIC size_t get_intra_process_subscription_count() const
Get intraprocess subscription count.
RCLCPP_PUBLIC rclcpp::QoS get_actual_qos() const
Get the actual QoS settings, after the defaults have been determined.
RCLCPP_PUBLIC const char * get_topic_name() const
Get the topic that this publisher publishes on.
RCLCPP_PUBLIC bool operator==(const rmw_gid_t &gid) const
Compare this publisher to a gid.
RCLCPP_PUBLIC void setup_intra_process(uint64_t intra_process_publisher_id, IntraProcessManagerSharedPtr ipm)
Implementation utility function used to setup intra process publishing after creation.
RCLCPP_PUBLIC size_t get_queue_size() const
Get the queue size for this publisher.
void clear_on_new_qos_event_callback(rcl_publisher_event_type_t event_type)
Unset the callback registered for new qos events, if any.
RCLCPP_PUBLIC bool can_loan_messages() const
Check if publisher instance can loan messages.
RCLCPP_PUBLIC size_t get_subscription_count() const
Get subscription count.
RCLCPP_PUBLIC std::vector< rclcpp::NetworkFlowEndpoint > get_network_flow_endpoints() const
Get network flow endpoints.
RCLCPP_PUBLIC RCUTILS_WARN_UNUSED bool assert_liveliness() const
Manually assert that this Publisher is alive (for RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC).
RCLCPP_PUBLIC const std::unordered_map< rcl_publisher_event_type_t, std::shared_ptr< rclcpp::QOSEventHandlerBase > > & get_event_handlers() const
Get all the QoS event handlers associated with this publisher.
Encapsulation of Quality of Service settings.
Definition: qos.hpp:111
Pure virtual interface class for the NodeBase part of the Node API.
RCL_PUBLIC RCL_WARN_UNUSED bool rcl_context_is_valid(const rcl_context_t *context)
Return true if the given context is currently valid, otherwise false.
Definition: context.c:94
enum rcl_publisher_event_type_e rcl_publisher_event_type_t
Enumeration of all of the publisher events that may fire.
RCLCPP_PUBLIC std::string expand_topic_or_service_name(const std::string &name, const std::string &node_name, const std::string &namespace_, bool is_service=false)
Expand a topic or service name and throw if it is not valid.
RCLCPP_PUBLIC Logger get_node_logger(const rcl_node_t *node)
Return a named logger using an rcl_node_t.
Definition: logger.cpp:38
RCLCPP_PUBLIC Logger get_logger(const std::string &name)
Return a named logger.
Definition: logger.cpp:27
RCL_PUBLIC RCL_WARN_UNUSED const char * rcl_node_get_name(const rcl_node_t *node)
Return the name of the node.
Definition: node.c:435
RCL_PUBLIC RCL_WARN_UNUSED const char * rcl_node_get_namespace(const rcl_node_t *node)
Return the namespace of the node.
Definition: node.c:444
RCL_PUBLIC RCL_WARN_UNUSED const char * rcl_node_get_logger_name(const rcl_node_t *node)
Return the logger name of the node.
Definition: node.c:512
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_publisher_init(rcl_publisher_t *publisher, const rcl_node_t *node, const rosidl_message_type_support_t *type_support, const char *topic_name, const rcl_publisher_options_t *options)
Initialize a rcl publisher.
Definition: publisher.c:46
RCL_PUBLIC RCL_WARN_UNUSED rcl_context_t * rcl_publisher_get_context(const rcl_publisher_t *publisher)
Return the context associated with this publisher.
Definition: publisher.c:374
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_publisher_get_subscription_count(const rcl_publisher_t *publisher, size_t *subscription_count)
Get the number of subscriptions matched to a publisher.
Definition: publisher.c:409
RCL_PUBLIC RCL_WARN_UNUSED rmw_publisher_t * rcl_publisher_get_rmw_handle(const rcl_publisher_t *publisher)
Return the rmw publisher handle.
Definition: publisher.c:365
RCL_PUBLIC RCL_WARN_UNUSED const char * rcl_publisher_get_topic_name(const rcl_publisher_t *publisher)
Get the topic name for the publisher.
Definition: publisher.c:345
RCL_PUBLIC RCL_WARN_UNUSED const rmw_qos_profile_t * rcl_publisher_get_actual_qos(const rcl_publisher_t *publisher)
Get the actual qos settings of the publisher.
Definition: publisher.c:429
RCL_PUBLIC bool rcl_publisher_is_valid_except_context(const rcl_publisher_t *publisher)
Return true if the publisher is valid except the context, otherwise false.
Definition: publisher.c:398
RCL_PUBLIC RCL_WARN_UNUSED const rcl_publisher_options_t * rcl_publisher_get_options(const rcl_publisher_t *publisher)
Return the rcl publisher options.
Definition: publisher.c:356
RCL_PUBLIC RCL_WARN_UNUSED rcl_publisher_t rcl_get_zero_initialized_publisher(void)
Return a rcl_publisher_t struct with members set to NULL.
Definition: publisher.c:39
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_publisher_fini(rcl_publisher_t *publisher, rcl_node_t *node)
Finalize a rcl_publisher_t.
Definition: publisher.c:164
RCL_PUBLIC bool rcl_publisher_can_loan_messages(const rcl_publisher_t *publisher)
Check if publisher instance can loan messages.
Definition: publisher.c:438
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_publisher_assert_liveliness(const rcl_publisher_t *publisher)
Manually assert that this Publisher is alive (for RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC)
Definition: publisher.c:297
Encapsulates the non-global state of an init/shutdown cycle.
Definition: context.h:114
Options available for a rcl publisher.
Definition: publisher.h:44
rmw_qos_profile_t qos
Middleware quality of service settings for the publisher.
Definition: publisher.h:46
Structure which encapsulates a ROS Publisher.
Definition: publisher.h:37
static QoSInitialization from_rmw(const rmw_qos_profile_t &rmw_qos)
Create a QoSInitialization from an existing rmw_qos_profile_t, using its history and depth.
Definition: qos.cpp:51
#define RCL_RET_OK
Success return code.
Definition: types.h:26
#define RCL_RET_TOPIC_NAME_INVALID
Topic name does not pass validation.
Definition: types.h:46
rmw_ret_t rcl_ret_t
The type that holds an rcl return code.
Definition: types.h:23
#define RCL_RET_PUBLISHER_INVALID
Invalid rcl_publisher_t given return code.
Definition: types.h:66