ROS 2 rclcpp + rcl - kilted  kilted
ROS 2 C++ Client Library with ROS Client Library
publisher_base.cpp
1 // Copyright 2015 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "rclcpp/publisher_base.hpp"
16 
17 #include <rmw/error_handling.h>
18 #include <rmw/rmw.h>
19 
20 #include <iostream>
21 #include <memory>
22 #include <mutex>
23 #include <sstream>
24 #include <string>
25 #include <unordered_map>
26 #include <vector>
27 
28 #include "rcutils/logging_macros.h"
29 #include "rmw/impl/cpp/demangle.hpp"
30 
31 #include "rclcpp/allocator/allocator_common.hpp"
32 #include "rclcpp/allocator/allocator_deleter.hpp"
33 #include "rclcpp/exceptions.hpp"
34 #include "rclcpp/expand_topic_or_service_name.hpp"
35 #include "rclcpp/experimental/intra_process_manager.hpp"
36 #include "rclcpp/logging.hpp"
37 #include "rclcpp/macros.hpp"
38 #include "rclcpp/network_flow_endpoint.hpp"
39 #include "rclcpp/node.hpp"
40 #include "rclcpp/event_handler.hpp"
41 
43 
44 PublisherBase::PublisherBase(
46  const std::string & topic,
47  const rosidl_message_type_support_t & type_support,
48  const rcl_publisher_options_t & publisher_options,
49  const PublisherEventCallbacks & event_callbacks,
50  bool use_default_callbacks)
51 : rcl_node_handle_(node_base->get_shared_rcl_node_handle()),
52  intra_process_is_enabled_(false),
53  intra_process_publisher_id_(0),
54  type_support_(type_support),
55  event_callbacks_(event_callbacks)
56 {
57  auto custom_deleter = [node_handle = this->rcl_node_handle_](rcl_publisher_t * rcl_pub)
58  {
59  if (rcl_publisher_fini(rcl_pub, node_handle.get()) != RCL_RET_OK) {
60  RCLCPP_ERROR(
61  rclcpp::get_node_logger(node_handle.get()).get_child("rclcpp"),
62  "Error in destruction of rcl publisher handle: %s",
63  rcl_get_error_string().str);
64  rcl_reset_error();
65  }
66  delete rcl_pub;
67  };
68 
69  publisher_handle_ = std::shared_ptr<rcl_publisher_t>(
70  new rcl_publisher_t, custom_deleter);
71  *publisher_handle_.get() = rcl_get_zero_initialized_publisher();
72 
74  publisher_handle_.get(),
75  rcl_node_handle_.get(),
76  &type_support,
77  topic.c_str(),
78  &publisher_options);
79  if (ret != RCL_RET_OK) {
80  if (ret == RCL_RET_TOPIC_NAME_INVALID) {
81  auto rcl_node_handle = rcl_node_handle_.get();
82  // this will throw on any validation problem
83  rcl_reset_error();
85  topic,
86  rcl_node_get_name(rcl_node_handle),
87  rcl_node_get_namespace(rcl_node_handle));
88  }
89 
90  rclcpp::exceptions::throw_from_rcl_error(ret, "could not create publisher");
91  }
92  // Life time of this object is tied to the publisher handle.
93  rmw_publisher_t * publisher_rmw_handle = rcl_publisher_get_rmw_handle(publisher_handle_.get());
94  if (!publisher_rmw_handle) {
95  auto msg = std::string("failed to get rmw handle: ") + rcl_get_error_string().str;
96  rcl_reset_error();
97  throw std::runtime_error(msg);
98  }
99  if (rmw_get_gid_for_publisher(publisher_rmw_handle, &rmw_gid_) != RMW_RET_OK) {
100  auto msg = std::string("failed to get publisher gid: ") + rmw_get_error_string().str;
101  rmw_reset_error();
102  throw std::runtime_error(msg);
103  }
104 
105  bind_event_callbacks(event_callbacks_, use_default_callbacks);
106 }
107 
108 PublisherBase::~PublisherBase()
109 {
110  // must fini the events before fini-ing the publisher
111  event_handlers_.clear();
112 
113  auto ipm = weak_ipm_.lock();
114 
115  if (!intra_process_is_enabled_) {
116  return;
117  }
118  if (!ipm) {
119  // TODO(ivanpauno): should this raise an error?
120  RCLCPP_WARN(
121  rclcpp::get_logger("rclcpp"),
122  "Intra process manager died before a publisher.");
123  return;
124  }
125  ipm->remove_publisher(intra_process_publisher_id_);
126 }
127 
128 const char *
130 {
131  return rcl_publisher_get_topic_name(publisher_handle_.get());
132 }
133 
134 void
136  const PublisherEventCallbacks & event_callbacks, bool use_default_callbacks)
137 {
138  try {
139  if (event_callbacks.deadline_callback) {
140  this->add_event_handler(
141  event_callbacks.deadline_callback,
142  RCL_PUBLISHER_OFFERED_DEADLINE_MISSED);
143  }
144  } catch (const UnsupportedEventTypeException & /*exc*/) {
145  RCLCPP_WARN(
146  rclcpp::get_logger("rclcpp"),
147  "Failed to add event handler for deadline; not supported");
148  }
149 
150  try {
151  if (event_callbacks.liveliness_callback) {
152  this->add_event_handler(
153  event_callbacks.liveliness_callback,
154  RCL_PUBLISHER_LIVELINESS_LOST);
155  }
156  } catch (const UnsupportedEventTypeException & /*exc*/) {
157  RCLCPP_WARN(
158  rclcpp::get_logger("rclcpp"),
159  "Failed to add event handler for liveliness; not supported");
160  }
161 
162  QOSOfferedIncompatibleQoSCallbackType incompatible_qos_cb;
163  if (event_callbacks.incompatible_qos_callback) {
164  incompatible_qos_cb = event_callbacks.incompatible_qos_callback;
165  } else if (use_default_callbacks) {
166  // Register default callback when not specified
167  incompatible_qos_cb = [this](QOSOfferedIncompatibleQoSInfo & info) {
168  this->default_incompatible_qos_callback(info);
169  };
170  }
171  try {
172  if (incompatible_qos_cb) {
173  this->add_event_handler(incompatible_qos_cb, RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS);
174  }
175  } catch (const UnsupportedEventTypeException & /*exc*/) {
176  RCLCPP_WARN(
177  rclcpp::get_logger("rclcpp"),
178  "Failed to add event handler for incompatible qos; not supported");
179  }
180 
181  IncompatibleTypeCallbackType incompatible_type_cb;
182  if (event_callbacks.incompatible_type_callback) {
183  incompatible_type_cb = event_callbacks.incompatible_type_callback;
184  } else if (use_default_callbacks) {
185  // Register default callback when not specified
186  incompatible_type_cb = [this](IncompatibleTypeInfo & info) {
187  this->default_incompatible_type_callback(info);
188  };
189  }
190  try {
191  if (incompatible_type_cb) {
192  this->add_event_handler(incompatible_type_cb, RCL_PUBLISHER_INCOMPATIBLE_TYPE);
193  }
194  } catch (UnsupportedEventTypeException & /*exc*/) {
195  RCLCPP_WARN(
196  rclcpp::get_logger("rclcpp"),
197  "Failed to add event handler for incompatible type; not supported");
198  }
199 
200  try {
201  if (event_callbacks.matched_callback) {
202  this->add_event_handler(
203  event_callbacks.matched_callback,
204  RCL_PUBLISHER_MATCHED);
205  }
206  } catch (const UnsupportedEventTypeException & /*exc*/) {
207  RCLCPP_WARN(
208  rclcpp::get_logger("rclcpp"),
209  "Failed to add event handler for matched; not supported");
210  }
211 }
212 
213 size_t
215 {
216  const rcl_publisher_options_t * publisher_options = rcl_publisher_get_options(
217  publisher_handle_.get());
218  if (!publisher_options) {
219  auto msg = std::string("failed to get publisher options: ") + rcl_get_error_string().str;
220  rcl_reset_error();
221  throw std::runtime_error(msg);
222  }
223  return publisher_options->qos.depth;
224 }
225 
226 const rmw_gid_t &
228 {
229  return rmw_gid_;
230 }
231 
232 std::shared_ptr<rcl_publisher_t>
234 {
235  return publisher_handle_;
236 }
237 
238 std::shared_ptr<const rcl_publisher_t>
240 {
241  return publisher_handle_;
242 }
243 
244 const
245 std::unordered_map<rcl_publisher_event_type_t, std::shared_ptr<rclcpp::EventHandlerBase>> &
247 {
248  return event_handlers_;
249 }
250 
251 size_t
253 {
254  size_t inter_process_subscription_count = 0;
255 
257  publisher_handle_.get(),
258  &inter_process_subscription_count);
259 
260  if (RCL_RET_PUBLISHER_INVALID == status) {
261  rcl_reset_error(); /* next call will reset error message if not context */
262  if (rcl_publisher_is_valid_except_context(publisher_handle_.get())) {
263  rcl_context_t * context = rcl_publisher_get_context(publisher_handle_.get());
264  if (nullptr != context && !rcl_context_is_valid(context)) {
265  /* publisher is invalid due to context being shutdown */
266  return 0;
267  }
268  }
269  }
270  if (RCL_RET_OK != status) {
271  rclcpp::exceptions::throw_from_rcl_error(status, "failed to get get subscription count");
272  }
273  return inter_process_subscription_count;
274 }
275 
276 size_t
278 {
279  auto ipm = weak_ipm_.lock();
280  if (!intra_process_is_enabled_) {
281  return 0;
282  }
283  if (!ipm) {
284  // TODO(ivanpauno): should this just return silently? Or maybe return with a warning?
285  // Same as wjwwood comment in publisher_factory create_shared_publish_callback.
286  throw std::runtime_error(
287  "intra process subscriber count called after "
288  "destruction of intra process manager");
289  }
290  return ipm->get_subscription_count(intra_process_publisher_id_);
291 }
292 
293 bool
295 {
296  return rcl_publisher_get_actual_qos(publisher_handle_.get())->durability ==
297  RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL;
298 }
299 
302 {
303  const rmw_qos_profile_t * qos = rcl_publisher_get_actual_qos(publisher_handle_.get());
304  if (!qos) {
305  auto msg = std::string("failed to get qos settings: ") + rcl_get_error_string().str;
306  rcl_reset_error();
307  throw std::runtime_error(msg);
308  }
309 
311 }
312 
313 bool
315 {
316  return RCL_RET_OK == rcl_publisher_assert_liveliness(publisher_handle_.get());
317 }
318 
319 bool
321 {
322  return !intra_process_is_enabled_ && rcl_publisher_can_loan_messages(publisher_handle_.get());
323 }
324 
325 bool
326 PublisherBase::operator==(const rmw_gid_t & gid) const
327 {
328  return *this == &gid;
329 }
330 
331 bool
332 PublisherBase::operator==(const rmw_gid_t * gid) const
333 {
334  bool result = false;
335  auto ret = rmw_compare_gids_equal(gid, &this->get_gid(), &result);
336  if (ret != RMW_RET_OK) {
337  auto msg = std::string("failed to compare gids: ") + rmw_get_error_string().str;
338  rmw_reset_error();
339  throw std::runtime_error(msg);
340  }
341  return result;
342 }
343 
344 void
346  uint64_t intra_process_publisher_id,
347  IntraProcessManagerSharedPtr ipm)
348 {
349  intra_process_publisher_id_ = intra_process_publisher_id;
350  weak_ipm_ = ipm;
351  intra_process_is_enabled_ = true;
352 }
353 
354 void
355 PublisherBase::default_incompatible_qos_callback(
356  rclcpp::QOSOfferedIncompatibleQoSInfo & event) const
357 {
358  std::string policy_name = qos_policy_name_from_kind(event.last_policy_kind);
359  RCLCPP_WARN(
360  rclcpp::get_logger(rcl_node_get_logger_name(rcl_node_handle_.get())),
361  "New subscription discovered on topic '%s', requesting incompatible QoS. "
362  "No messages will be sent to it. "
363  "Last incompatible policy: %s",
364  get_topic_name(),
365  policy_name.c_str());
366 }
367 
368 void
369 PublisherBase::default_incompatible_type_callback(
370  [[maybe_unused]] rclcpp::IncompatibleTypeInfo & event) const
371 {
372  RCLCPP_WARN(
373  rclcpp::get_logger(rcl_node_get_logger_name(rcl_node_handle_.get())),
374  "Incompatible type on topic '%s', no messages will be sent to it.", get_topic_name());
375 }
376 
377 std::vector<rclcpp::NetworkFlowEndpoint> PublisherBase::get_network_flow_endpoints() const
378 {
379  rcutils_allocator_t allocator = rcutils_get_default_allocator();
380  rcl_network_flow_endpoint_array_t network_flow_endpoint_array =
381  rcl_get_zero_initialized_network_flow_endpoint_array();
382  rcl_ret_t ret = rcl_publisher_get_network_flow_endpoints(
383  publisher_handle_.get(), &allocator, &network_flow_endpoint_array);
384  if (RCL_RET_OK != ret) {
385  auto error_msg = std::string("error obtaining network flows of publisher: ") +
386  rcl_get_error_string().str;
387  rcl_reset_error();
388  if (RCL_RET_OK !=
389  rcl_network_flow_endpoint_array_fini(&network_flow_endpoint_array))
390  {
391  error_msg += std::string(", also error cleaning up network flow array: ") +
392  rcl_get_error_string().str;
393  rcl_reset_error();
394  }
395  rclcpp::exceptions::throw_from_rcl_error(ret, error_msg);
396  }
397 
398  std::vector<rclcpp::NetworkFlowEndpoint> network_flow_endpoint_vector;
399  for (size_t i = 0; i < network_flow_endpoint_array.size; ++i) {
400  network_flow_endpoint_vector.push_back(
402  network_flow_endpoint_array.network_flow_endpoint[i]));
403  }
404 
405  ret = rcl_network_flow_endpoint_array_fini(&network_flow_endpoint_array);
406  if (RCL_RET_OK != ret) {
407  rclcpp::exceptions::throw_from_rcl_error(ret, "error cleaning up network flow array");
408  }
409 
410  return network_flow_endpoint_vector;
411 }
412 
414 {
415  if (!intra_process_is_enabled_) {
416  return 0u;
417  }
418 
419  auto ipm = weak_ipm_.lock();
420 
421  if (!ipm) {
422  // TODO(ivanpauno): should this raise an error?
423  RCLCPP_WARN(
424  rclcpp::get_logger("rclcpp"),
425  "Intra process manager died for a publisher.");
426  return 0u;
427  }
428 
429  return ipm->lowest_available_capacity(intra_process_publisher_id_);
430 }
RCLCPP_PUBLIC Logger get_child(const std::string &suffix)
Return a logger that is a descendant of this logger.
Definition: logger.cpp:101
RCLCPP_PUBLIC const rmw_gid_t & get_gid() const
Get the global identifier for this publisher (used in rmw and by DDS).
RCLCPP_PUBLIC std::shared_ptr< rcl_publisher_t > get_publisher_handle()
Get the rcl publisher handle.
RCLCPP_PUBLIC size_t get_intra_process_subscription_count() const
Get intraprocess subscription count.
RCLCPP_PUBLIC void bind_event_callbacks(const PublisherEventCallbacks &event_callbacks, bool use_default_callbacks)
Add event handlers for passed in event_callbacks.
RCLCPP_PUBLIC rclcpp::QoS get_actual_qos() const
Get the actual QoS settings, after the defaults have been determined.
RCLCPP_PUBLIC const char * get_topic_name() const
Get the topic that this publisher publishes on.
RCLCPP_PUBLIC bool operator==(const rmw_gid_t &gid) const
Compare this publisher to a gid.
RCLCPP_PUBLIC void setup_intra_process(uint64_t intra_process_publisher_id, IntraProcessManagerSharedPtr ipm)
Implementation utility function used to setup intra process publishing after creation.
RCLCPP_PUBLIC size_t get_queue_size() const
Get the queue size for this publisher.
RCLCPP_PUBLIC bool can_loan_messages() const
Check if publisher instance can loan messages.
RCLCPP_PUBLIC bool is_durability_transient_local() const
Get if durability is transient local.
RCLCPP_PUBLIC size_t get_subscription_count() const
Get subscription count.
RCLCPP_PUBLIC std::vector< rclcpp::NetworkFlowEndpoint > get_network_flow_endpoints() const
Get network flow endpoints.
RCLCPP_PUBLIC RCUTILS_WARN_UNUSED bool assert_liveliness() const
Manually assert that this Publisher is alive (for RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC).
RCLCPP_PUBLIC const std::unordered_map< rcl_publisher_event_type_t, std::shared_ptr< rclcpp::EventHandlerBase > > & get_event_handlers() const
Get all the QoS event handlers associated with this publisher.
RCLCPP_PUBLIC size_t lowest_available_ipm_capacity() const
Return the lowest available capacity for all subscription buffers.
Encapsulation of Quality of Service settings.
Definition: qos.hpp:116
Pure virtual interface class for the NodeBase part of the Node API.
RCL_PUBLIC RCL_WARN_UNUSED bool rcl_context_is_valid(const rcl_context_t *context)
Return true if the given context is currently valid, otherwise false.
Definition: context.c:94
RCLCPP_PUBLIC std::string expand_topic_or_service_name(const std::string &name, const std::string &node_name, const std::string &namespace_, bool is_service=false)
Expand a topic or service name and throw if it is not valid.
RCLCPP_PUBLIC Logger get_node_logger(const rcl_node_t *node)
Return a named logger using an rcl_node_t.
Definition: logger.cpp:45
RCLCPP_PUBLIC Logger get_logger(const std::string &name)
Return a named logger.
Definition: logger.cpp:34
RCL_PUBLIC RCL_WARN_UNUSED const char * rcl_node_get_name(const rcl_node_t *node)
Return the name of the node.
Definition: node.c:408
RCL_PUBLIC RCL_WARN_UNUSED const char * rcl_node_get_namespace(const rcl_node_t *node)
Return the namespace of the node.
Definition: node.c:417
RCL_PUBLIC RCL_WARN_UNUSED const char * rcl_node_get_logger_name(const rcl_node_t *node)
Return the logger name of the node.
Definition: node.c:485
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_publisher_init(rcl_publisher_t *publisher, const rcl_node_t *node, const rosidl_message_type_support_t *type_support, const char *topic_name, const rcl_publisher_options_t *options)
Initialize a rcl publisher.
Definition: publisher.c:48
RCL_PUBLIC RCL_WARN_UNUSED rcl_context_t * rcl_publisher_get_context(const rcl_publisher_t *publisher)
Return the context associated with this publisher.
Definition: publisher.c:409
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_publisher_get_subscription_count(const rcl_publisher_t *publisher, size_t *subscription_count)
Get the number of subscriptions matched to a publisher.
Definition: publisher.c:448
RCL_PUBLIC RCL_WARN_UNUSED rmw_publisher_t * rcl_publisher_get_rmw_handle(const rcl_publisher_t *publisher)
Return the rmw publisher handle.
Definition: publisher.c:400
RCL_PUBLIC RCL_WARN_UNUSED const char * rcl_publisher_get_topic_name(const rcl_publisher_t *publisher)
Get the topic name for the publisher.
Definition: publisher.c:382
RCL_PUBLIC RCL_WARN_UNUSED const rmw_qos_profile_t * rcl_publisher_get_actual_qos(const rcl_publisher_t *publisher)
Get the actual qos settings of the publisher.
Definition: publisher.c:468
RCL_PUBLIC bool rcl_publisher_is_valid_except_context(const rcl_publisher_t *publisher)
Return true if the publisher is valid except the context, otherwise false.
Definition: publisher.c:437
RCL_PUBLIC RCL_WARN_UNUSED const rcl_publisher_options_t * rcl_publisher_get_options(const rcl_publisher_t *publisher)
Return the rcl publisher options.
Definition: publisher.c:391
RCL_PUBLIC RCL_WARN_UNUSED rcl_publisher_t rcl_get_zero_initialized_publisher(void)
Return a rcl_publisher_t struct with members set to NULL.
Definition: publisher.c:40
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_publisher_fini(rcl_publisher_t *publisher, rcl_node_t *node)
Finalize a rcl_publisher_t.
Definition: publisher.c:179
RCL_PUBLIC bool rcl_publisher_can_loan_messages(const rcl_publisher_t *publisher)
Check if publisher instance can loan messages.
Definition: publisher.c:477
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_publisher_assert_liveliness(const rcl_publisher_t *publisher)
Manually assert that this Publisher is alive (for RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC)
Definition: publisher.c:334
Encapsulates the non-global state of an init/shutdown cycle.
Definition: context.h:114
Options available for a rcl publisher.
Definition: publisher.h:44
rmw_qos_profile_t qos
Middleware quality of service settings for the publisher.
Definition: publisher.h:46
Structure which encapsulates a ROS Publisher.
Definition: publisher.h:37
Contains callbacks for various types of events a Publisher can receive from the middleware.
static QoSInitialization from_rmw(const rmw_qos_profile_t &rmw_qos)
Create a QoSInitialization from an existing rmw_qos_profile_t, using its history and depth.
Definition: qos.cpp:63
#define RCL_RET_OK
Success return code.
Definition: types.h:27
#define RCL_RET_TOPIC_NAME_INVALID
Topic name does not pass validation.
Definition: types.h:47
rmw_ret_t rcl_ret_t
The type that holds an rcl return code.
Definition: types.h:24
#define RCL_RET_PUBLISHER_INVALID
Invalid rcl_publisher_t given return code.
Definition: types.h:69