ROS 2 rclcpp + rcl - jazzy  jazzy
ROS 2 C++ Client Library with ROS Client Library
publisher_base.cpp
1 // Copyright 2015 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "rclcpp/publisher_base.hpp"
16 
17 #include <rmw/error_handling.h>
18 #include <rmw/rmw.h>
19 
20 #include <iostream>
21 #include <memory>
22 #include <mutex>
23 #include <sstream>
24 #include <string>
25 #include <unordered_map>
26 #include <vector>
27 
28 #include "rcutils/logging_macros.h"
29 #include "rmw/impl/cpp/demangle.hpp"
30 
31 #include "rclcpp/allocator/allocator_common.hpp"
32 #include "rclcpp/allocator/allocator_deleter.hpp"
33 #include "rclcpp/exceptions.hpp"
34 #include "rclcpp/expand_topic_or_service_name.hpp"
35 #include "rclcpp/experimental/intra_process_manager.hpp"
36 #include "rclcpp/logging.hpp"
37 #include "rclcpp/macros.hpp"
38 #include "rclcpp/network_flow_endpoint.hpp"
39 #include "rclcpp/node.hpp"
40 #include "rclcpp/event_handler.hpp"
41 
43 
44 PublisherBase::PublisherBase(
46  const std::string & topic,
47  const rosidl_message_type_support_t & type_support,
48  const rcl_publisher_options_t & publisher_options,
49  const PublisherEventCallbacks & event_callbacks,
50  bool use_default_callbacks)
51 : rcl_node_handle_(node_base->get_shared_rcl_node_handle()),
52  intra_process_is_enabled_(false),
53  intra_process_publisher_id_(0),
54  type_support_(type_support),
55  event_callbacks_(event_callbacks)
56 {
57  auto custom_deleter = [node_handle = this->rcl_node_handle_](rcl_publisher_t * rcl_pub)
58  {
59  if (rcl_publisher_fini(rcl_pub, node_handle.get()) != RCL_RET_OK) {
60  RCLCPP_ERROR(
61  rclcpp::get_node_logger(node_handle.get()).get_child("rclcpp"),
62  "Error in destruction of rcl publisher handle: %s",
63  rcl_get_error_string().str);
64  rcl_reset_error();
65  }
66  delete rcl_pub;
67  };
68 
69  publisher_handle_ = std::shared_ptr<rcl_publisher_t>(
70  new rcl_publisher_t, custom_deleter);
71  *publisher_handle_.get() = rcl_get_zero_initialized_publisher();
72 
74  publisher_handle_.get(),
75  rcl_node_handle_.get(),
76  &type_support,
77  topic.c_str(),
78  &publisher_options);
79  if (ret != RCL_RET_OK) {
80  if (ret == RCL_RET_TOPIC_NAME_INVALID) {
81  auto rcl_node_handle = rcl_node_handle_.get();
82  // this will throw on any validation problem
83  rcl_reset_error();
85  topic,
86  rcl_node_get_name(rcl_node_handle),
87  rcl_node_get_namespace(rcl_node_handle));
88  }
89 
90  rclcpp::exceptions::throw_from_rcl_error(ret, "could not create publisher");
91  }
92  // Life time of this object is tied to the publisher handle.
93  rmw_publisher_t * publisher_rmw_handle = rcl_publisher_get_rmw_handle(publisher_handle_.get());
94  if (!publisher_rmw_handle) {
95  auto msg = std::string("failed to get rmw handle: ") + rcl_get_error_string().str;
96  rcl_reset_error();
97  throw std::runtime_error(msg);
98  }
99  if (rmw_get_gid_for_publisher(publisher_rmw_handle, &rmw_gid_) != RMW_RET_OK) {
100  auto msg = std::string("failed to get publisher gid: ") + rmw_get_error_string().str;
101  rmw_reset_error();
102  throw std::runtime_error(msg);
103  }
104 
105  bind_event_callbacks(event_callbacks_, use_default_callbacks);
106 }
107 
108 PublisherBase::~PublisherBase()
109 {
110  // must fini the events before fini-ing the publisher
111  event_handlers_.clear();
112 
113  auto ipm = weak_ipm_.lock();
114 
115  if (!intra_process_is_enabled_) {
116  return;
117  }
118  if (!ipm) {
119  // TODO(ivanpauno): should this raise an error?
120  RCLCPP_WARN(
121  rclcpp::get_logger("rclcpp"),
122  "Intra process manager died before a publisher.");
123  return;
124  }
125  ipm->remove_publisher(intra_process_publisher_id_);
126 }
127 
128 const char *
130 {
131  return rcl_publisher_get_topic_name(publisher_handle_.get());
132 }
133 
134 void
136  const PublisherEventCallbacks & event_callbacks, bool use_default_callbacks)
137 {
138  if (event_callbacks.deadline_callback) {
139  this->add_event_handler(
140  event_callbacks.deadline_callback,
141  RCL_PUBLISHER_OFFERED_DEADLINE_MISSED);
142  }
143  if (event_callbacks.liveliness_callback) {
144  this->add_event_handler(
145  event_callbacks.liveliness_callback,
146  RCL_PUBLISHER_LIVELINESS_LOST);
147  }
148 
149  QOSOfferedIncompatibleQoSCallbackType incompatible_qos_cb;
150  if (event_callbacks.incompatible_qos_callback) {
151  incompatible_qos_cb = event_callbacks.incompatible_qos_callback;
152  } else if (use_default_callbacks) {
153  // Register default callback when not specified
154  incompatible_qos_cb = [this](QOSOfferedIncompatibleQoSInfo & info) {
155  this->default_incompatible_qos_callback(info);
156  };
157  }
158  try {
159  if (incompatible_qos_cb) {
160  this->add_event_handler(incompatible_qos_cb, RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS);
161  }
162  } catch (const UnsupportedEventTypeException & /*exc*/) {
163  RCLCPP_DEBUG(
164  rclcpp::get_logger("rclcpp"),
165  "Failed to add event handler for incompatible qos; wrong callback type");
166  }
167 
168  IncompatibleTypeCallbackType incompatible_type_cb;
169  if (event_callbacks.incompatible_type_callback) {
170  incompatible_type_cb = event_callbacks.incompatible_type_callback;
171  } else if (use_default_callbacks) {
172  // Register default callback when not specified
173  incompatible_type_cb = [this](IncompatibleTypeInfo & info) {
174  this->default_incompatible_type_callback(info);
175  };
176  }
177  try {
178  if (incompatible_type_cb) {
179  this->add_event_handler(incompatible_type_cb, RCL_PUBLISHER_INCOMPATIBLE_TYPE);
180  }
181  } catch (UnsupportedEventTypeException & /*exc*/) {
182  RCLCPP_DEBUG(
183  rclcpp::get_logger("rclcpp"),
184  "Failed to add event handler for incompatible type; wrong callback type");
185  }
186  if (event_callbacks.matched_callback) {
187  this->add_event_handler(
188  event_callbacks.matched_callback,
189  RCL_PUBLISHER_MATCHED);
190  }
191 }
192 
193 size_t
195 {
196  const rcl_publisher_options_t * publisher_options = rcl_publisher_get_options(
197  publisher_handle_.get());
198  if (!publisher_options) {
199  auto msg = std::string("failed to get publisher options: ") + rcl_get_error_string().str;
200  rcl_reset_error();
201  throw std::runtime_error(msg);
202  }
203  return publisher_options->qos.depth;
204 }
205 
206 const rmw_gid_t &
208 {
209  return rmw_gid_;
210 }
211 
212 std::shared_ptr<rcl_publisher_t>
214 {
215  return publisher_handle_;
216 }
217 
218 std::shared_ptr<const rcl_publisher_t>
220 {
221  return publisher_handle_;
222 }
223 
224 const
225 std::unordered_map<rcl_publisher_event_type_t, std::shared_ptr<rclcpp::EventHandlerBase>> &
227 {
228  return event_handlers_;
229 }
230 
231 size_t
233 {
234  size_t inter_process_subscription_count = 0;
235 
237  publisher_handle_.get(),
238  &inter_process_subscription_count);
239 
240  if (RCL_RET_PUBLISHER_INVALID == status) {
241  rcl_reset_error(); /* next call will reset error message if not context */
242  if (rcl_publisher_is_valid_except_context(publisher_handle_.get())) {
243  rcl_context_t * context = rcl_publisher_get_context(publisher_handle_.get());
244  if (nullptr != context && !rcl_context_is_valid(context)) {
245  /* publisher is invalid due to context being shutdown */
246  return 0;
247  }
248  }
249  }
250  if (RCL_RET_OK != status) {
251  rclcpp::exceptions::throw_from_rcl_error(status, "failed to get get subscription count");
252  }
253  return inter_process_subscription_count;
254 }
255 
256 size_t
258 {
259  auto ipm = weak_ipm_.lock();
260  if (!intra_process_is_enabled_) {
261  return 0;
262  }
263  if (!ipm) {
264  // TODO(ivanpauno): should this just return silently? Or maybe return with a warning?
265  // Same as wjwwood comment in publisher_factory create_shared_publish_callback.
266  throw std::runtime_error(
267  "intra process subscriber count called after "
268  "destruction of intra process manager");
269  }
270  return ipm->get_subscription_count(intra_process_publisher_id_);
271 }
272 
273 bool
275 {
276  return rcl_publisher_get_actual_qos(publisher_handle_.get())->durability ==
277  RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL;
278 }
279 
282 {
283  const rmw_qos_profile_t * qos = rcl_publisher_get_actual_qos(publisher_handle_.get());
284  if (!qos) {
285  auto msg = std::string("failed to get qos settings: ") + rcl_get_error_string().str;
286  rcl_reset_error();
287  throw std::runtime_error(msg);
288  }
289 
291 }
292 
293 bool
295 {
296  return RCL_RET_OK == rcl_publisher_assert_liveliness(publisher_handle_.get());
297 }
298 
299 bool
301 {
302  return !intra_process_is_enabled_ && rcl_publisher_can_loan_messages(publisher_handle_.get());
303 }
304 
305 bool
306 PublisherBase::operator==(const rmw_gid_t & gid) const
307 {
308  return *this == &gid;
309 }
310 
311 bool
312 PublisherBase::operator==(const rmw_gid_t * gid) const
313 {
314  bool result = false;
315  auto ret = rmw_compare_gids_equal(gid, &this->get_gid(), &result);
316  if (ret != RMW_RET_OK) {
317  auto msg = std::string("failed to compare gids: ") + rmw_get_error_string().str;
318  rmw_reset_error();
319  throw std::runtime_error(msg);
320  }
321  return result;
322 }
323 
324 void
326  uint64_t intra_process_publisher_id,
327  IntraProcessManagerSharedPtr ipm)
328 {
329  intra_process_publisher_id_ = intra_process_publisher_id;
330  weak_ipm_ = ipm;
331  intra_process_is_enabled_ = true;
332 }
333 
334 void
335 PublisherBase::default_incompatible_qos_callback(
336  rclcpp::QOSOfferedIncompatibleQoSInfo & event) const
337 {
338  std::string policy_name = qos_policy_name_from_kind(event.last_policy_kind);
339  RCLCPP_WARN(
340  rclcpp::get_logger(rcl_node_get_logger_name(rcl_node_handle_.get())),
341  "New subscription discovered on topic '%s', requesting incompatible QoS. "
342  "No messages will be sent to it. "
343  "Last incompatible policy: %s",
344  get_topic_name(),
345  policy_name.c_str());
346 }
347 
348 void
349 PublisherBase::default_incompatible_type_callback(
350  rclcpp::IncompatibleTypeInfo & event) const
351 {
352  (void)event;
353 
354  RCLCPP_WARN(
355  rclcpp::get_logger(rcl_node_get_logger_name(rcl_node_handle_.get())),
356  "Incompatible type on topic '%s', no messages will be sent to it.", get_topic_name());
357 }
358 
359 std::vector<rclcpp::NetworkFlowEndpoint> PublisherBase::get_network_flow_endpoints() const
360 {
361  rcutils_allocator_t allocator = rcutils_get_default_allocator();
362  rcl_network_flow_endpoint_array_t network_flow_endpoint_array =
363  rcl_get_zero_initialized_network_flow_endpoint_array();
364  rcl_ret_t ret = rcl_publisher_get_network_flow_endpoints(
365  publisher_handle_.get(), &allocator, &network_flow_endpoint_array);
366  if (RCL_RET_OK != ret) {
367  auto error_msg = std::string("error obtaining network flows of publisher: ") +
368  rcl_get_error_string().str;
369  rcl_reset_error();
370  if (RCL_RET_OK !=
371  rcl_network_flow_endpoint_array_fini(&network_flow_endpoint_array))
372  {
373  error_msg += std::string(", also error cleaning up network flow array: ") +
374  rcl_get_error_string().str;
375  rcl_reset_error();
376  }
377  rclcpp::exceptions::throw_from_rcl_error(ret, error_msg);
378  }
379 
380  std::vector<rclcpp::NetworkFlowEndpoint> network_flow_endpoint_vector;
381  for (size_t i = 0; i < network_flow_endpoint_array.size; ++i) {
382  network_flow_endpoint_vector.push_back(
384  network_flow_endpoint_array.network_flow_endpoint[i]));
385  }
386 
387  ret = rcl_network_flow_endpoint_array_fini(&network_flow_endpoint_array);
388  if (RCL_RET_OK != ret) {
389  rclcpp::exceptions::throw_from_rcl_error(ret, "error cleaning up network flow array");
390  }
391 
392  return network_flow_endpoint_vector;
393 }
394 
396 {
397  if (!intra_process_is_enabled_) {
398  return 0u;
399  }
400 
401  auto ipm = weak_ipm_.lock();
402 
403  if (!ipm) {
404  // TODO(ivanpauno): should this raise an error?
405  RCLCPP_WARN(
406  rclcpp::get_logger("rclcpp"),
407  "Intra process manager died for a publisher.");
408  return 0u;
409  }
410 
411  return ipm->lowest_available_capacity(intra_process_publisher_id_);
412 }
RCLCPP_PUBLIC Logger get_child(const std::string &suffix)
Return a logger that is a descendant of this logger.
Definition: logger.cpp:72
RCLCPP_PUBLIC const rmw_gid_t & get_gid() const
Get the global identifier for this publisher (used in rmw and by DDS).
RCLCPP_PUBLIC std::shared_ptr< rcl_publisher_t > get_publisher_handle()
Get the rcl publisher handle.
RCLCPP_PUBLIC size_t get_intra_process_subscription_count() const
Get intraprocess subscription count.
RCLCPP_PUBLIC void bind_event_callbacks(const PublisherEventCallbacks &event_callbacks, bool use_default_callbacks)
Add event handlers for passed in event_callbacks.
RCLCPP_PUBLIC rclcpp::QoS get_actual_qos() const
Get the actual QoS settings, after the defaults have been determined.
RCLCPP_PUBLIC const char * get_topic_name() const
Get the topic that this publisher publishes on.
RCLCPP_PUBLIC bool operator==(const rmw_gid_t &gid) const
Compare this publisher to a gid.
RCLCPP_PUBLIC void setup_intra_process(uint64_t intra_process_publisher_id, IntraProcessManagerSharedPtr ipm)
Implementation utility function used to setup intra process publishing after creation.
RCLCPP_PUBLIC size_t get_queue_size() const
Get the queue size for this publisher.
RCLCPP_PUBLIC bool can_loan_messages() const
Check if publisher instance can loan messages.
RCLCPP_PUBLIC bool is_durability_transient_local() const
Get if durability is transient local.
RCLCPP_PUBLIC size_t get_subscription_count() const
Get subscription count.
RCLCPP_PUBLIC std::vector< rclcpp::NetworkFlowEndpoint > get_network_flow_endpoints() const
Get network flow endpoints.
RCLCPP_PUBLIC RCUTILS_WARN_UNUSED bool assert_liveliness() const
Manually assert that this Publisher is alive (for RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC).
RCLCPP_PUBLIC const std::unordered_map< rcl_publisher_event_type_t, std::shared_ptr< rclcpp::EventHandlerBase > > & get_event_handlers() const
Get all the QoS event handlers associated with this publisher.
RCLCPP_PUBLIC size_t lowest_available_ipm_capacity() const
Return the lowest available capacity for all subscription buffers.
Encapsulation of Quality of Service settings.
Definition: qos.hpp:116
Pure virtual interface class for the NodeBase part of the Node API.
RCL_PUBLIC RCL_WARN_UNUSED bool rcl_context_is_valid(const rcl_context_t *context)
Return true if the given context is currently valid, otherwise false.
Definition: context.c:94
RCLCPP_PUBLIC std::string expand_topic_or_service_name(const std::string &name, const std::string &node_name, const std::string &namespace_, bool is_service=false)
Expand a topic or service name and throw if it is not valid.
RCLCPP_PUBLIC Logger get_node_logger(const rcl_node_t *node)
Return a named logger using an rcl_node_t.
Definition: logger.cpp:44
RCLCPP_PUBLIC Logger get_logger(const std::string &name)
Return a named logger.
Definition: logger.cpp:33
RCL_PUBLIC RCL_WARN_UNUSED const char * rcl_node_get_name(const rcl_node_t *node)
Return the name of the node.
Definition: node.c:411
RCL_PUBLIC RCL_WARN_UNUSED const char * rcl_node_get_namespace(const rcl_node_t *node)
Return the namespace of the node.
Definition: node.c:420
RCL_PUBLIC RCL_WARN_UNUSED const char * rcl_node_get_logger_name(const rcl_node_t *node)
Return the logger name of the node.
Definition: node.c:488
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_publisher_init(rcl_publisher_t *publisher, const rcl_node_t *node, const rosidl_message_type_support_t *type_support, const char *topic_name, const rcl_publisher_options_t *options)
Initialize a rcl publisher.
Definition: publisher.c:47
RCL_PUBLIC RCL_WARN_UNUSED rcl_context_t * rcl_publisher_get_context(const rcl_publisher_t *publisher)
Return the context associated with this publisher.
Definition: publisher.c:408
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_publisher_get_subscription_count(const rcl_publisher_t *publisher, size_t *subscription_count)
Get the number of subscriptions matched to a publisher.
Definition: publisher.c:443
RCL_PUBLIC RCL_WARN_UNUSED rmw_publisher_t * rcl_publisher_get_rmw_handle(const rcl_publisher_t *publisher)
Return the rmw publisher handle.
Definition: publisher.c:399
RCL_PUBLIC RCL_WARN_UNUSED const char * rcl_publisher_get_topic_name(const rcl_publisher_t *publisher)
Get the topic name for the publisher.
Definition: publisher.c:381
RCL_PUBLIC RCL_WARN_UNUSED const rmw_qos_profile_t * rcl_publisher_get_actual_qos(const rcl_publisher_t *publisher)
Get the actual qos settings of the publisher.
Definition: publisher.c:463
RCL_PUBLIC bool rcl_publisher_is_valid_except_context(const rcl_publisher_t *publisher)
Return true if the publisher is valid except the context, otherwise false.
Definition: publisher.c:432
RCL_PUBLIC RCL_WARN_UNUSED const rcl_publisher_options_t * rcl_publisher_get_options(const rcl_publisher_t *publisher)
Return the rcl publisher options.
Definition: publisher.c:390
RCL_PUBLIC RCL_WARN_UNUSED rcl_publisher_t rcl_get_zero_initialized_publisher(void)
Return a rcl_publisher_t struct with members set to NULL.
Definition: publisher.c:40
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_publisher_fini(rcl_publisher_t *publisher, rcl_node_t *node)
Finalize a rcl_publisher_t.
Definition: publisher.c:178
RCL_PUBLIC bool rcl_publisher_can_loan_messages(const rcl_publisher_t *publisher)
Check if publisher instance can loan messages.
Definition: publisher.c:472
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_publisher_assert_liveliness(const rcl_publisher_t *publisher)
Manually assert that this Publisher is alive (for RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC)
Definition: publisher.c:333
Encapsulates the non-global state of an init/shutdown cycle.
Definition: context.h:114
Options available for a rcl publisher.
Definition: publisher.h:44
rmw_qos_profile_t qos
Middleware quality of service settings for the publisher.
Definition: publisher.h:46
Structure which encapsulates a ROS Publisher.
Definition: publisher.h:37
Contains callbacks for various types of events a Publisher can receive from the middleware.
static QoSInitialization from_rmw(const rmw_qos_profile_t &rmw_qos)
Create a QoSInitialization from an existing rmw_qos_profile_t, using its history and depth.
Definition: qos.cpp:63
#define RCL_RET_OK
Success return code.
Definition: types.h:27
#define RCL_RET_TOPIC_NAME_INVALID
Topic name does not pass validation.
Definition: types.h:47
rmw_ret_t rcl_ret_t
The type that holds an rcl return code.
Definition: types.h:24
#define RCL_RET_PUBLISHER_INVALID
Invalid rcl_publisher_t given return code.
Definition: types.h:69