ROS 2 rclcpp + rcl - humble  humble
ROS 2 C++ Client Library with ROS Client Library
publisher_base.hpp
1 // Copyright 2014 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef RCLCPP__PUBLISHER_BASE_HPP_
16 #define RCLCPP__PUBLISHER_BASE_HPP_
17 
18 #include <rmw/error_handling.h>
19 #include <rmw/rmw.h>
20 
21 #include <chrono>
22 #include <functional>
23 #include <iostream>
24 #include <memory>
25 #include <sstream>
26 #include <string>
27 #include <unordered_map>
28 #include <utility>
29 #include <vector>
30 
31 #include "rcl/publisher.h"
32 
33 #include "rclcpp/macros.hpp"
34 #include "rclcpp/network_flow_endpoint.hpp"
35 #include "rclcpp/qos.hpp"
36 #include "rclcpp/qos_event.hpp"
37 #include "rclcpp/type_support_decl.hpp"
38 #include "rclcpp/visibility_control.hpp"
39 #include "rcpputils/time.hpp"
40 
41 namespace rclcpp
42 {
43 
44 // Forward declaration is used for friend statement.
45 namespace node_interfaces
46 {
47 class NodeBaseInterface;
48 class NodeTopicsInterface;
49 } // namespace node_interfaces
50 
51 namespace experimental
52 {
57 class IntraProcessManager;
58 } // namespace experimental
59 
60 class PublisherBase : public std::enable_shared_from_this<PublisherBase>
61 {
62  friend ::rclcpp::node_interfaces::NodeTopicsInterface;
63 
64 public:
65  RCLCPP_SMART_PTR_DEFINITIONS(PublisherBase)
66 
67 
76  RCLCPP_PUBLIC
79  const std::string & topic,
80  const rosidl_message_type_support_t & type_support,
81  const rcl_publisher_options_t & publisher_options);
82 
83  RCLCPP_PUBLIC
84  virtual ~PublisherBase();
85 
87 
88  RCLCPP_PUBLIC
89  const char *
90  get_topic_name() const;
91 
93 
94  RCLCPP_PUBLIC
95  size_t
96  get_queue_size() const;
97 
99 
100  RCLCPP_PUBLIC
101  const rmw_gid_t &
102  get_gid() const;
103 
105 
106  RCLCPP_PUBLIC
107  std::shared_ptr<rcl_publisher_t>
109 
111 
112  RCLCPP_PUBLIC
113  std::shared_ptr<const rcl_publisher_t>
114  get_publisher_handle() const;
115 
117 
118  RCLCPP_PUBLIC
119  const
120  std::unordered_map<rcl_publisher_event_type_t, std::shared_ptr<rclcpp::QOSEventHandlerBase>> &
121  get_event_handlers() const;
122 
124 
125  RCLCPP_PUBLIC
126  size_t
127  get_subscription_count() const;
128 
130 
131  RCLCPP_PUBLIC
132  size_t
134 
136 
143  RCLCPP_PUBLIC
144  RCUTILS_WARN_UNUSED
145  bool
146  assert_liveliness() const;
147 
149 
159  RCLCPP_PUBLIC
161  get_actual_qos() const;
162 
164 
168  RCLCPP_PUBLIC
169  bool
170  can_loan_messages() const;
171 
173 
178  RCLCPP_PUBLIC
179  bool
180  operator==(const rmw_gid_t & gid) const;
181 
183 
188  RCLCPP_PUBLIC
189  bool
190  operator==(const rmw_gid_t * gid) const;
191 
192  using IntraProcessManagerSharedPtr =
193  std::shared_ptr<rclcpp::experimental::IntraProcessManager>;
194 
196  RCLCPP_PUBLIC
197  void
199  uint64_t intra_process_publisher_id,
200  IntraProcessManagerSharedPtr ipm);
201 
203 
207  RCLCPP_PUBLIC
208  std::vector<rclcpp::NetworkFlowEndpoint>
210 
212 
233  template<typename DurationRepT = int64_t, typename DurationT = std::milli>
234  bool
236  std::chrono::duration<DurationRepT, DurationT> timeout =
237  std::chrono::duration<DurationRepT, DurationT>(-1)) const
238  {
239  rcl_duration_value_t rcl_timeout = rcpputils::convert_to_nanoseconds(timeout).count();
240 
241  rcl_ret_t ret = rcl_publisher_wait_for_all_acked(publisher_handle_.get(), rcl_timeout);
242  if (ret == RCL_RET_OK) {
243  return true;
244  } else if (ret == RCL_RET_TIMEOUT) {
245  return false;
246  } else {
247  rclcpp::exceptions::throw_from_rcl_error(ret);
248  }
249  }
250 
252 
277  void
279  std::function<void(size_t)> callback,
280  rcl_publisher_event_type_t event_type)
281  {
282  if (event_handlers_.count(event_type) == 0) {
283  RCLCPP_WARN(
284  rclcpp::get_logger("rclcpp"),
285  "Calling set_on_new_qos_event_callback for non registered publisher event_type");
286  return;
287  }
288 
289  if (!callback) {
290  throw std::invalid_argument(
291  "The callback passed to set_on_new_qos_event_callback "
292  "is not callable.");
293  }
294 
295  // The on_ready_callback signature has an extra `int` argument used to disambiguate between
296  // possible different entities within a generic waitable.
297  // We hide that detail to users of this method.
298  std::function<void(size_t, int)> new_callback = std::bind(callback, std::placeholders::_1);
299  event_handlers_[event_type]->set_on_ready_callback(new_callback);
300  }
301 
303  void
305  {
306  if (event_handlers_.count(event_type) == 0) {
307  RCLCPP_WARN(
308  rclcpp::get_logger("rclcpp"),
309  "Calling clear_on_new_qos_event_callback for non registered event_type");
310  return;
311  }
312 
313  event_handlers_[event_type]->clear_on_ready_callback();
314  }
315 
316 protected:
317  template<typename EventCallbackT>
318  void
319  add_event_handler(
320  const EventCallbackT & callback,
321  const rcl_publisher_event_type_t event_type)
322  {
323  auto handler = std::make_shared<QOSEventHandler<EventCallbackT,
324  std::shared_ptr<rcl_publisher_t>>>(
325  callback,
327  publisher_handle_,
328  event_type);
329  event_handlers_.insert(std::make_pair(event_type, handler));
330  }
331 
332  RCLCPP_PUBLIC
333  void default_incompatible_qos_callback(QOSOfferedIncompatibleQoSInfo & info) const;
334 
335  std::shared_ptr<rcl_node_t> rcl_node_handle_;
336 
337  std::shared_ptr<rcl_publisher_t> publisher_handle_;
338 
339  std::unordered_map<rcl_publisher_event_type_t,
340  std::shared_ptr<rclcpp::QOSEventHandlerBase>> event_handlers_;
341 
342  using IntraProcessManagerWeakPtr =
343  std::weak_ptr<rclcpp::experimental::IntraProcessManager>;
344  bool intra_process_is_enabled_;
345  IntraProcessManagerWeakPtr weak_ipm_;
346  uint64_t intra_process_publisher_id_;
347 
348  rmw_gid_t rmw_gid_;
349 
350  const rosidl_message_type_support_t type_support_;
351 };
352 
353 } // namespace rclcpp
354 
355 #endif // RCLCPP__PUBLISHER_BASE_HPP_
RCLCPP_PUBLIC const rmw_gid_t & get_gid() const
Get the global identifier for this publisher (used in rmw and by DDS).
RCLCPP_PUBLIC PublisherBase(rclcpp::node_interfaces::NodeBaseInterface *node_base, const std::string &topic, const rosidl_message_type_support_t &type_support, const rcl_publisher_options_t &publisher_options)
Default constructor.
RCLCPP_PUBLIC std::shared_ptr< rcl_publisher_t > get_publisher_handle()
Get the rcl publisher handle.
RCLCPP_PUBLIC size_t get_intra_process_subscription_count() const
Get intraprocess subscription count.
RCLCPP_PUBLIC rclcpp::QoS get_actual_qos() const
Get the actual QoS settings, after the defaults have been determined.
RCLCPP_PUBLIC const char * get_topic_name() const
Get the topic that this publisher publishes on.
RCLCPP_PUBLIC bool operator==(const rmw_gid_t &gid) const
Compare this publisher to a gid.
RCLCPP_PUBLIC void setup_intra_process(uint64_t intra_process_publisher_id, IntraProcessManagerSharedPtr ipm)
Implementation utility function used to setup intra process publishing after creation.
RCLCPP_PUBLIC size_t get_queue_size() const
Get the queue size for this publisher.
void clear_on_new_qos_event_callback(rcl_publisher_event_type_t event_type)
Unset the callback registered for new qos events, if any.
RCLCPP_PUBLIC bool can_loan_messages() const
Check if publisher instance can loan messages.
RCLCPP_PUBLIC size_t get_subscription_count() const
Get subscription count.
RCLCPP_PUBLIC std::vector< rclcpp::NetworkFlowEndpoint > get_network_flow_endpoints() const
Get network flow endpoints.
bool wait_for_all_acked(std::chrono::duration< DurationRepT, DurationT > timeout=std::chrono::duration< DurationRepT, DurationT >(-1)) const
Wait until all published messages are acknowledged or until the specified timeout elapses.
RCLCPP_PUBLIC RCUTILS_WARN_UNUSED bool assert_liveliness() const
Manually assert that this Publisher is alive (for RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC).
void set_on_new_qos_event_callback(std::function< void(size_t)> callback, rcl_publisher_event_type_t event_type)
Set a callback to be called when each new qos event instance occurs.
RCLCPP_PUBLIC const std::unordered_map< rcl_publisher_event_type_t, std::shared_ptr< rclcpp::QOSEventHandlerBase > > & get_event_handlers() const
Get all the QoS event handlers associated with this publisher.
Encapsulation of Quality of Service settings.
Definition: qos.hpp:111
Pure virtual interface class for the NodeBase part of the Node API.
enum rcl_publisher_event_type_e rcl_publisher_event_type_t
Enumeration of all of the publisher events that may fire.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_publisher_event_init(rcl_event_t *event, const rcl_publisher_t *publisher, const rcl_publisher_event_type_t event_type)
Initialize an rcl_event_t with a publisher.
Definition: event.c:46
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
RCLCPP_PUBLIC Logger get_logger(const std::string &name)
Return a named logger.
Definition: logger.cpp:27
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_publisher_wait_for_all_acked(const rcl_publisher_t *publisher, rcl_duration_value_t timeout)
Wait until all published message data is acknowledged or until the specified timeout elapses.
Definition: publisher.c:310
Options available for a rcl publisher.
Definition: publisher.h:44
rcutils_duration_value_t rcl_duration_value_t
A duration of time, measured in nanoseconds.
Definition: time.h:48
#define RCL_RET_OK
Success return code.
Definition: types.h:26
#define RCL_RET_TIMEOUT
Timeout occurred return code.
Definition: types.h:30
rmw_ret_t rcl_ret_t
The type that holds an rcl return code.
Definition: types.h:23