ROS 2 rclcpp + rcl - humble  humble
ROS 2 C++ Client Library with ROS Client Library
subscription_base.hpp
1 // Copyright 2019 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef RCLCPP__SUBSCRIPTION_BASE_HPP_
16 #define RCLCPP__SUBSCRIPTION_BASE_HPP_
17 
18 #include <atomic>
19 #include <memory>
20 #include <mutex>
21 #include <string>
22 #include <unordered_map>
23 #include <vector>
24 #include <utility>
25 
26 #include "rcl/event_callback.h"
27 #include "rcl/subscription.h"
28 
29 #include "rmw/impl/cpp/demangle.hpp"
30 #include "rmw/rmw.h"
31 
32 #include "rclcpp/any_subscription_callback.hpp"
33 #include "rclcpp/detail/cpp_callback_trampoline.hpp"
34 #include "rclcpp/experimental/intra_process_manager.hpp"
35 #include "rclcpp/experimental/subscription_intra_process_base.hpp"
36 #include "rclcpp/macros.hpp"
37 #include "rclcpp/message_info.hpp"
38 #include "rclcpp/network_flow_endpoint.hpp"
39 #include "rclcpp/qos.hpp"
40 #include "rclcpp/qos_event.hpp"
41 #include "rclcpp/serialized_message.hpp"
42 #include "rclcpp/subscription_content_filter_options.hpp"
43 #include "rclcpp/type_support_decl.hpp"
44 #include "rclcpp/visibility_control.hpp"
45 
46 namespace rclcpp
47 {
48 
49 namespace node_interfaces
50 {
51 class NodeBaseInterface;
52 } // namespace node_interfaces
53 
54 namespace experimental
55 {
60 class IntraProcessManager;
61 } // namespace experimental
62 
65 class SubscriptionBase : public std::enable_shared_from_this<SubscriptionBase>
66 {
67 public:
68  RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(SubscriptionBase)
69 
70 
81  RCLCPP_PUBLIC
84  const rosidl_message_type_support_t & type_support_handle,
85  const std::string & topic_name,
86  const rcl_subscription_options_t & subscription_options,
87  bool is_serialized = false);
88 
90  RCLCPP_PUBLIC
91  virtual ~SubscriptionBase();
92 
94  RCLCPP_PUBLIC
95  const char *
96  get_topic_name() const;
97 
98  RCLCPP_PUBLIC
99  std::shared_ptr<rcl_subscription_t>
100  get_subscription_handle();
101 
102  RCLCPP_PUBLIC
103  std::shared_ptr<const rcl_subscription_t>
104  get_subscription_handle() const;
105 
107 
108  RCLCPP_PUBLIC
109  const
110  std::unordered_map<rcl_subscription_event_type_t, std::shared_ptr<rclcpp::QOSEventHandlerBase>> &
111  get_event_handlers() const;
112 
114 
125  RCLCPP_PUBLIC
127  get_actual_qos() const;
128 
130 
146  RCLCPP_PUBLIC
147  bool
148  take_type_erased(void * message_out, rclcpp::MessageInfo & message_info_out);
149 
151 
164  RCLCPP_PUBLIC
165  bool
166  take_serialized(rclcpp::SerializedMessage & message_out, rclcpp::MessageInfo & message_info_out);
167 
169 
170  RCLCPP_PUBLIC
171  virtual
172  std::shared_ptr<void>
174 
176 
177  RCLCPP_PUBLIC
178  virtual
179  std::shared_ptr<rclcpp::SerializedMessage>
181 
183 
187  RCLCPP_PUBLIC
188  virtual
189  void
190  handle_message(std::shared_ptr<void> & message, const rclcpp::MessageInfo & message_info) = 0;
191 
192  RCLCPP_PUBLIC
193  virtual
194  void
195  handle_serialized_message(
196  const std::shared_ptr<rclcpp::SerializedMessage> & serialized_message,
197  const rclcpp::MessageInfo & message_info) = 0;
198 
199  RCLCPP_PUBLIC
200  virtual
201  void
202  handle_loaned_message(void * loaned_message, const rclcpp::MessageInfo & message_info) = 0;
203 
205 
206  RCLCPP_PUBLIC
207  virtual
208  void
209  return_message(std::shared_ptr<void> & message) = 0;
210 
212 
213  RCLCPP_PUBLIC
214  virtual
215  void
216  return_serialized_message(std::shared_ptr<rclcpp::SerializedMessage> & message) = 0;
217 
218  RCLCPP_PUBLIC
219  const rosidl_message_type_support_t &
220  get_message_type_support_handle() const;
221 
223 
226  RCLCPP_PUBLIC
227  bool
228  is_serialized() const;
229 
231 
232  RCLCPP_PUBLIC
233  size_t
234  get_publisher_count() const;
235 
237 
243  RCLCPP_PUBLIC
244  bool
245  can_loan_messages() const;
246 
247  using IntraProcessManagerWeakPtr =
248  std::weak_ptr<rclcpp::experimental::IntraProcessManager>;
249 
251  RCLCPP_PUBLIC
252  void
254  uint64_t intra_process_subscription_id,
255  IntraProcessManagerWeakPtr weak_ipm);
256 
258 
262  RCLCPP_PUBLIC
263  rclcpp::Waitable::SharedPtr
265 
267 
279  RCLCPP_PUBLIC
280  bool
281  exchange_in_use_by_wait_set_state(void * pointer_to_subscription_part, bool in_use_state);
282 
284 
288  RCLCPP_PUBLIC
289  std::vector<rclcpp::NetworkFlowEndpoint>
291 
293 
316  void
317  set_on_new_message_callback(std::function<void(size_t)> callback)
318  {
319  if (!callback) {
320  throw std::invalid_argument(
321  "The callback passed to set_on_new_message_callback "
322  "is not callable.");
323  }
324 
325  auto new_callback =
326  [callback, this](size_t number_of_messages) {
327  try {
328  callback(number_of_messages);
329  } catch (const std::exception & exception) {
330  RCLCPP_ERROR_STREAM(
331  node_logger_,
332  "rclcpp::SubscriptionBase@" << this <<
333  " caught " << rmw::impl::cpp::demangle(exception) <<
334  " exception in user-provided callback for the 'on new message' callback: " <<
335  exception.what());
336  } catch (...) {
337  RCLCPP_ERROR_STREAM(
338  node_logger_,
339  "rclcpp::SubscriptionBase@" << this <<
340  " caught unhandled exception in user-provided callback " <<
341  "for the 'on new message' callback");
342  }
343  };
344 
345  std::lock_guard<std::recursive_mutex> lock(callback_mutex_);
346 
347  // Set it temporarily to the new callback, while we replace the old one.
348  // This two-step setting, prevents a gap where the old std::function has
349  // been replaced but the middleware hasn't been told about the new one yet.
351  rclcpp::detail::cpp_callback_trampoline<const void *, size_t>,
352  static_cast<const void *>(&new_callback));
353 
354  // Store the std::function to keep it in scope, also overwrites the existing one.
355  on_new_message_callback_ = new_callback;
356 
357  // Set it again, now using the permanent storage.
359  rclcpp::detail::cpp_callback_trampoline<const void *, size_t>,
360  static_cast<const void *>(&on_new_message_callback_));
361  }
362 
364  void
366  {
367  std::lock_guard<std::recursive_mutex> lock(callback_mutex_);
368 
369  if (on_new_message_callback_) {
370  set_on_new_message_callback(nullptr, nullptr);
371  on_new_message_callback_ = nullptr;
372  }
373  }
374 
376 
393  void
394  set_on_new_intra_process_message_callback(std::function<void(size_t)> callback)
395  {
396  if (!use_intra_process_) {
397  RCLCPP_WARN(
398  rclcpp::get_logger("rclcpp"),
399  "Calling set_on_new_intra_process_message_callback for subscription with IPC disabled");
400  return;
401  }
402 
403  if (!callback) {
404  throw std::invalid_argument(
405  "The callback passed to set_on_new_intra_process_message_callback "
406  "is not callable.");
407  }
408 
409  // The on_ready_callback signature has an extra `int` argument used to disambiguate between
410  // possible different entities within a generic waitable.
411  // We hide that detail to users of this method.
412  std::function<void(size_t, int)> new_callback = std::bind(callback, std::placeholders::_1);
413  subscription_intra_process_->set_on_ready_callback(new_callback);
414  }
415 
417  void
419  {
420  if (!use_intra_process_) {
421  RCLCPP_WARN(
422  rclcpp::get_logger("rclcpp"),
423  "Calling clear_on_new_intra_process_message_callback for subscription with IPC disabled");
424  return;
425  }
426 
427  subscription_intra_process_->clear_on_ready_callback();
428  }
429 
431 
456  void
458  std::function<void(size_t)> callback,
460  {
461  if (event_handlers_.count(event_type) == 0) {
462  RCLCPP_WARN(
463  rclcpp::get_logger("rclcpp"),
464  "Calling set_on_new_qos_event_callback for non registered subscription event_type");
465  return;
466  }
467 
468  if (!callback) {
469  throw std::invalid_argument(
470  "The callback passed to set_on_new_qos_event_callback "
471  "is not callable.");
472  }
473 
474  // The on_ready_callback signature has an extra `int` argument used to disambiguate between
475  // possible different entities within a generic waitable.
476  // We hide that detail to users of this method.
477  std::function<void(size_t, int)> new_callback = std::bind(callback, std::placeholders::_1);
478  event_handlers_[event_type]->set_on_ready_callback(new_callback);
479  }
480 
482  void
484  {
485  if (event_handlers_.count(event_type) == 0) {
486  RCLCPP_WARN(
487  rclcpp::get_logger("rclcpp"),
488  "Calling clear_on_new_qos_event_callback for non registered event_type");
489  return;
490  }
491 
492  event_handlers_[event_type]->clear_on_ready_callback();
493  }
494 
496 
499  RCLCPP_PUBLIC
500  bool
501  is_cft_enabled() const;
502 
504 
513  RCLCPP_PUBLIC
514  void
516  const std::string & filter_expression,
517  const std::vector<std::string> & expression_parameters = {});
518 
520 
525  RCLCPP_PUBLIC
527  get_content_filter() const;
528 
529 protected:
530  template<typename EventCallbackT>
531  void
532  add_event_handler(
533  const EventCallbackT & callback,
534  const rcl_subscription_event_type_t event_type)
535  {
536  auto handler = std::make_shared<QOSEventHandler<EventCallbackT,
537  std::shared_ptr<rcl_subscription_t>>>(
538  callback,
540  get_subscription_handle(),
541  event_type);
542  qos_events_in_use_by_wait_set_.insert(std::make_pair(handler.get(), false));
543  event_handlers_.insert(std::make_pair(event_type, handler));
544  }
545 
546  RCLCPP_PUBLIC
547  void default_incompatible_qos_callback(QOSRequestedIncompatibleQoSInfo & info) const;
548 
549  RCLCPP_PUBLIC
550  bool
551  matches_any_intra_process_publishers(const rmw_gid_t * sender_gid) const;
552 
553  RCLCPP_PUBLIC
554  void
555  set_on_new_message_callback(rcl_event_callback_t callback, const void * user_data);
556 
558 
559  std::shared_ptr<rcl_node_t> node_handle_;
560  std::shared_ptr<rcl_subscription_t> subscription_handle_;
561  std::shared_ptr<rcl_subscription_t> intra_process_subscription_handle_;
562  rclcpp::Logger node_logger_;
563 
564  std::unordered_map<rcl_subscription_event_type_t,
565  std::shared_ptr<rclcpp::QOSEventHandlerBase>> event_handlers_;
566 
567  bool use_intra_process_;
568  IntraProcessManagerWeakPtr weak_ipm_;
569  uint64_t intra_process_subscription_id_;
570  std::shared_ptr<rclcpp::experimental::SubscriptionIntraProcessBase> subscription_intra_process_;
571 
572 private:
573  RCLCPP_DISABLE_COPY(SubscriptionBase)
574 
575  rosidl_message_type_support_t type_support_;
576  bool is_serialized_;
577 
578  std::atomic<bool> subscription_in_use_by_wait_set_{false};
579  std::atomic<bool> intra_process_subscription_waitable_in_use_by_wait_set_{false};
580  std::unordered_map<rclcpp::QOSEventHandlerBase *,
581  std::atomic<bool>> qos_events_in_use_by_wait_set_;
582 
583  std::recursive_mutex callback_mutex_;
584  std::function<void(size_t)> on_new_message_callback_{nullptr};
585 };
586 
587 } // namespace rclcpp
588 
589 #endif // RCLCPP__SUBSCRIPTION_BASE_HPP_
Additional meta data about messages taken from subscriptions.
Encapsulation of Quality of Service settings.
Definition: qos.hpp:111
Object oriented version of rcl_serialized_message_t with destructor to avoid memory leaks.
virtual RCLCPP_PUBLIC std::shared_ptr< void > create_message()=0
Borrow a new message.
RCLCPP_PUBLIC size_t get_publisher_count() const
Get matching publisher count.
RCLCPP_PUBLIC rclcpp::QoS get_actual_qos() const
Get the actual QoS settings, after the defaults have been determined.
void set_on_new_intra_process_message_callback(std::function< void(size_t)> callback)
Set a callback to be called when each new intra-process message is received.
virtual RCLCPP_PUBLIC void handle_message(std::shared_ptr< void > &message, const rclcpp::MessageInfo &message_info)=0
Check if we need to handle the message, and execute the callback if we do.
RCLCPP_PUBLIC bool can_loan_messages() const
Check if subscription instance can loan messages.
void set_on_new_message_callback(std::function< void(size_t)> callback)
Set a callback to be called when each new message is received.
RCLCPP_PUBLIC rclcpp::Waitable::SharedPtr get_intra_process_waitable() const
Return the waitable for intra-process.
RCLCPP_PUBLIC const std::unordered_map< rcl_subscription_event_type_t, std::shared_ptr< rclcpp::QOSEventHandlerBase > > & get_event_handlers() const
Get all the QoS event handlers associated with this subscription.
void set_on_new_qos_event_callback(std::function< void(size_t)> callback, rcl_subscription_event_type_t event_type)
Set a callback to be called when each new qos event instance occurs.
RCLCPP_PUBLIC std::vector< rclcpp::NetworkFlowEndpoint > get_network_flow_endpoints() const
Get network flow endpoints.
RCLCPP_PUBLIC void setup_intra_process(uint64_t intra_process_subscription_id, IntraProcessManagerWeakPtr weak_ipm)
Implemenation detail.
virtual RCLCPP_PUBLIC void return_message(std::shared_ptr< void > &message)=0
Return the message borrowed in create_message.
void clear_on_new_message_callback()
Unset the callback registered for new messages, if any.
virtual RCLCPP_PUBLIC std::shared_ptr< rclcpp::SerializedMessage > create_serialized_message()=0
Borrow a new serialized message.
RCLCPP_PUBLIC bool take_serialized(rclcpp::SerializedMessage &message_out, rclcpp::MessageInfo &message_info_out)
Take the next inter-process message, in its serialized form, from the subscription.
RCLCPP_PUBLIC bool take_type_erased(void *message_out, rclcpp::MessageInfo &message_info_out)
Take the next inter-process message from the subscription as a type erased pointer.
RCLCPP_PUBLIC bool exchange_in_use_by_wait_set_state(void *pointer_to_subscription_part, bool in_use_state)
Exchange state of whether or not a part of the subscription is used by a wait set.
void clear_on_new_qos_event_callback(rcl_subscription_event_type_t event_type)
Unset the callback registered for new qos events, if any.
RCLCPP_PUBLIC void set_content_filter(const std::string &filter_expression, const std::vector< std::string > &expression_parameters={})
Set the filter expression and expression parameters for the subscription.
virtual RCLCPP_PUBLIC ~SubscriptionBase()
Destructor.
virtual RCLCPP_PUBLIC void return_serialized_message(std::shared_ptr< rclcpp::SerializedMessage > &message)=0
Return the message borrowed in create_serialized_message.
RCLCPP_PUBLIC bool is_cft_enabled() const
Check if content filtered topic feature of the subscription instance is enabled.
void clear_on_new_intra_process_message_callback()
Unset the callback registered for new intra-process messages, if any.
RCLCPP_PUBLIC rclcpp::ContentFilterOptions get_content_filter() const
Get the filter expression and expression parameters for the subscription.
RCLCPP_PUBLIC bool is_serialized() const
Return if the subscription is serialized.
RCLCPP_PUBLIC const char * get_topic_name() const
Get the topic that this subscription is subscribed on.
RCLCPP_PUBLIC SubscriptionBase(rclcpp::node_interfaces::NodeBaseInterface *node_base, const rosidl_message_type_support_t &type_support_handle, const std::string &topic_name, const rcl_subscription_options_t &subscription_options, bool is_serialized=false)
Constructor.
Pure virtual interface class for the NodeBase part of the Node API.
enum rcl_subscription_event_type_e rcl_subscription_event_type_t
Enumeration of all of the subscription events that may fire.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_subscription_event_init(rcl_event_t *event, const rcl_subscription_t *subscription, const rcl_subscription_event_type_t event_type)
Initialize an rcl_event_t with a subscription.
Definition: event.c:97
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
RCLCPP_PUBLIC Logger get_logger(const std::string &name)
Return a named logger.
Definition: logger.cpp:27
Options available for a rcl subscription.
Definition: subscription.h:46
Options to configure content filtered topic in the subscription.