ROS 2 rclcpp + rcl - rolling  rolling-a919a6e5
ROS 2 C++ Client Library with ROS Client Library
subscription_base.hpp
1 // Copyright 2019 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef RCLCPP__SUBSCRIPTION_BASE_HPP_
16 #define RCLCPP__SUBSCRIPTION_BASE_HPP_
17 
18 #include <atomic>
19 #include <memory>
20 #include <mutex>
21 #include <string>
22 #include <unordered_map>
23 #include <utility>
24 #include <vector>
25 
26 #include "rcl/event_callback.h"
27 #include "rcl/subscription.h"
28 
29 #include "rmw/impl/cpp/demangle.hpp"
30 #include "rmw/rmw.h"
31 
32 #include "rclcpp/any_subscription_callback.hpp"
33 #include "rclcpp/detail/cpp_callback_trampoline.hpp"
34 #include "rclcpp/dynamic_typesupport/dynamic_message.hpp"
35 #include "rclcpp/dynamic_typesupport/dynamic_message_type.hpp"
36 #include "rclcpp/dynamic_typesupport/dynamic_serialization_support.hpp"
37 #include "rclcpp/experimental/intra_process_manager.hpp"
38 #include "rclcpp/experimental/subscription_intra_process_base.hpp"
39 #include "rclcpp/macros.hpp"
40 #include "rclcpp/message_info.hpp"
41 #include "rclcpp/network_flow_endpoint.hpp"
42 #include "rclcpp/qos.hpp"
43 #include "rclcpp/event_handler.hpp"
44 #include "rclcpp/serialized_message.hpp"
45 #include "rclcpp/subscription_content_filter_options.hpp"
46 #include "rclcpp/type_support_decl.hpp"
47 #include "rclcpp/visibility_control.hpp"
48 
49 namespace rclcpp
50 {
51 
52 namespace node_interfaces
53 {
54 class NodeBaseInterface;
55 } // namespace node_interfaces
56 
57 namespace experimental
58 {
63 class IntraProcessManager;
64 } // namespace experimental
65 
67 
79 enum class DeliveredMessageKind : uint8_t
80 {
81  INVALID = 0,
82  ROS_MESSAGE = 1, // The subscription delivers a ROS message to its callback
83  SERIALIZED_MESSAGE = 2, // The subscription delivers a serialized message to its callback
84  DYNAMIC_MESSAGE = 3, // The subscription delivers a dynamic message to its callback
85 };
86 
89 class SubscriptionBase : public std::enable_shared_from_this<SubscriptionBase>
90 {
91 public:
92  RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(SubscriptionBase)
93 
94 
106  RCLCPP_PUBLIC
109  const rosidl_message_type_support_t & type_support_handle,
110  const std::string & topic_name,
111  const rcl_subscription_options_t & subscription_options,
112  const SubscriptionEventCallbacks & event_callbacks,
113  bool use_default_callbacks,
114  DeliveredMessageKind delivered_message_kind = DeliveredMessageKind::ROS_MESSAGE);
115 
117  RCLCPP_PUBLIC
118  virtual ~SubscriptionBase();
119 
121  RCLCPP_PUBLIC
122  void
124  const SubscriptionEventCallbacks & event_callbacks, bool use_default_callbacks);
125 
127  RCLCPP_PUBLIC
128  const char *
129  get_topic_name() const;
130 
131  RCLCPP_PUBLIC
132  std::shared_ptr<rcl_subscription_t>
133  get_subscription_handle();
134 
135  RCLCPP_PUBLIC
136  std::shared_ptr<const rcl_subscription_t>
137  get_subscription_handle() const;
138 
140 
141  RCLCPP_PUBLIC
142  const
143  std::unordered_map<rcl_subscription_event_type_t, std::shared_ptr<rclcpp::EventHandlerBase>> &
144  get_event_handlers() const;
145 
147 
158  RCLCPP_PUBLIC
160  get_actual_qos() const;
161 
163 
179  RCLCPP_PUBLIC
180  bool
181  take_type_erased(void * message_out, rclcpp::MessageInfo & message_info_out);
182 
184 
197  RCLCPP_PUBLIC
198  bool
199  take_serialized(rclcpp::SerializedMessage & message_out, rclcpp::MessageInfo & message_info_out);
200 
202 
203  RCLCPP_PUBLIC
204  virtual
205  std::shared_ptr<void>
207 
209 
210  RCLCPP_PUBLIC
211  virtual
212  std::shared_ptr<rclcpp::SerializedMessage>
214 
216 
220  RCLCPP_PUBLIC
221  virtual
222  void
223  handle_message(std::shared_ptr<void> & message, const rclcpp::MessageInfo & message_info) = 0;
224 
225  RCLCPP_PUBLIC
226  virtual
227  void
228  handle_serialized_message(
229  const std::shared_ptr<rclcpp::SerializedMessage> & serialized_message,
230  const rclcpp::MessageInfo & message_info) = 0;
231 
232  RCLCPP_PUBLIC
233  virtual
234  void
235  handle_loaned_message(void * loaned_message, const rclcpp::MessageInfo & message_info) = 0;
236 
238 
239  RCLCPP_PUBLIC
240  virtual
241  void
242  return_message(std::shared_ptr<void> & message) = 0;
243 
245 
246  RCLCPP_PUBLIC
247  virtual
248  void
249  return_serialized_message(std::shared_ptr<rclcpp::SerializedMessage> & message) = 0;
250 
251  RCLCPP_PUBLIC
252  const rosidl_message_type_support_t &
253  get_message_type_support_handle() const;
254 
256 
259  RCLCPP_PUBLIC
260  bool
261  is_serialized() const;
262 
264 
267  RCLCPP_PUBLIC
270 
272 
273  RCLCPP_PUBLIC
274  size_t
275  get_publisher_count() const;
276 
278 
284  RCLCPP_PUBLIC
285  bool
286  can_loan_messages() const;
287 
288  using IntraProcessManagerWeakPtr =
289  std::weak_ptr<rclcpp::experimental::IntraProcessManager>;
290 
292  RCLCPP_PUBLIC
293  void
295  uint64_t intra_process_subscription_id,
296  IntraProcessManagerWeakPtr weak_ipm);
297 
299 
303  RCLCPP_PUBLIC
304  rclcpp::Waitable::SharedPtr
306 
308 
320  RCLCPP_PUBLIC
321  bool
322  exchange_in_use_by_wait_set_state(void * pointer_to_subscription_part, bool in_use_state);
323 
325 
329  RCLCPP_PUBLIC
330  std::vector<rclcpp::NetworkFlowEndpoint>
332 
334 
357  void
358  set_on_new_message_callback(std::function<void(size_t)> callback)
359  {
360  if (!callback) {
361  throw std::invalid_argument(
362  "The callback passed to set_on_new_message_callback "
363  "is not callable.");
364  }
365 
366  auto new_callback =
367  [callback, this](size_t number_of_messages) {
368  try {
369  callback(number_of_messages);
370  } catch (const std::exception & exception) {
371  RCLCPP_ERROR_STREAM(
372  node_logger_,
373  "rclcpp::SubscriptionBase@" << this <<
374  " caught " << rmw::impl::cpp::demangle(exception) <<
375  " exception in user-provided callback for the 'on new message' callback: " <<
376  exception.what());
377  } catch (...) {
378  RCLCPP_ERROR_STREAM(
379  node_logger_,
380  "rclcpp::SubscriptionBase@" << this <<
381  " caught unhandled exception in user-provided callback " <<
382  "for the 'on new message' callback");
383  }
384  };
385 
386  std::lock_guard<std::recursive_mutex> lock(callback_mutex_);
387 
388  // Set it temporarily to the new callback, while we replace the old one.
389  // This two-step setting, prevents a gap where the old std::function has
390  // been replaced but the middleware hasn't been told about the new one yet.
392  rclcpp::detail::cpp_callback_trampoline<decltype(new_callback), const void *, size_t>,
393  static_cast<const void *>(&new_callback));
394 
395  // Store the std::function to keep it in scope, also overwrites the existing one.
396  on_new_message_callback_ = new_callback;
397 
398  // Set it again, now using the permanent storage.
400  rclcpp::detail::cpp_callback_trampoline<
401  decltype(on_new_message_callback_), const void *, size_t>,
402  static_cast<const void *>(&on_new_message_callback_));
403  }
404 
406  void
408  {
409  std::lock_guard<std::recursive_mutex> lock(callback_mutex_);
410 
411  if (on_new_message_callback_) {
412  set_on_new_message_callback(nullptr, nullptr);
413  on_new_message_callback_ = nullptr;
414  }
415  }
416 
418 
435  void
436  set_on_new_intra_process_message_callback(std::function<void(size_t)> callback)
437  {
438  if (!use_intra_process_) {
439  RCLCPP_WARN(
440  rclcpp::get_logger("rclcpp"),
441  "Calling set_on_new_intra_process_message_callback for subscription with IPC disabled");
442  return;
443  }
444 
445  if (!callback) {
446  throw std::invalid_argument(
447  "The callback passed to set_on_new_intra_process_message_callback "
448  "is not callable.");
449  }
450 
451  // The on_ready_callback signature has an extra `int` argument used to disambiguate between
452  // possible different entities within a generic waitable.
453  // We hide that detail to users of this method.
454  std::function<void(size_t, int)> new_callback = std::bind(callback, std::placeholders::_1);
455  subscription_intra_process_->set_on_ready_callback(new_callback);
456  }
457 
459  void
461  {
462  if (!use_intra_process_) {
463  RCLCPP_WARN(
464  rclcpp::get_logger("rclcpp"),
465  "Calling clear_on_new_intra_process_message_callback for subscription with IPC disabled");
466  return;
467  }
468 
469  subscription_intra_process_->clear_on_ready_callback();
470  }
471 
473 
498  void
500  std::function<void(size_t)> callback,
502  {
503  if (event_handlers_.count(event_type) == 0) {
504  RCLCPP_WARN(
505  rclcpp::get_logger("rclcpp"),
506  "Calling set_on_new_qos_event_callback for non registered subscription event_type");
507  return;
508  }
509 
510  if (!callback) {
511  throw std::invalid_argument(
512  "The callback passed to set_on_new_qos_event_callback "
513  "is not callable.");
514  }
515 
516  // The on_ready_callback signature has an extra `int` argument used to disambiguate between
517  // possible different entities within a generic waitable.
518  // We hide that detail to users of this method.
519  std::function<void(size_t, int)> new_callback = std::bind(callback, std::placeholders::_1);
520  event_handlers_[event_type]->set_on_ready_callback(new_callback);
521  }
522 
524  void
526  {
527  if (event_handlers_.count(event_type) == 0) {
528  RCLCPP_WARN(
529  rclcpp::get_logger("rclcpp"),
530  "Calling clear_on_new_qos_event_callback for non registered event_type");
531  return;
532  }
533 
534  event_handlers_[event_type]->clear_on_ready_callback();
535  }
536 
538 
541  RCLCPP_PUBLIC
542  bool
543  is_cft_enabled() const;
544 
546 
555  RCLCPP_PUBLIC
556  void
558  const std::string & filter_expression,
559  const std::vector<std::string> & expression_parameters = {});
560 
562 
567  RCLCPP_PUBLIC
569  get_content_filter() const;
570 
571  // DYNAMIC TYPE ==================================================================================
572  // TODO(methylDragon): Reorder later
573  RCLCPP_PUBLIC
574  virtual
575  rclcpp::dynamic_typesupport::DynamicMessageType::SharedPtr
576  get_shared_dynamic_message_type() = 0;
577 
578  RCLCPP_PUBLIC
579  virtual
580  rclcpp::dynamic_typesupport::DynamicMessage::SharedPtr
581  get_shared_dynamic_message() = 0;
582 
583  RCLCPP_PUBLIC
584  virtual
585  rclcpp::dynamic_typesupport::DynamicSerializationSupport::SharedPtr
586  get_shared_dynamic_serialization_support() = 0;
587 
589 
590  RCLCPP_PUBLIC
591  virtual
592  rclcpp::dynamic_typesupport::DynamicMessage::SharedPtr
594 
595  RCLCPP_PUBLIC
596  virtual
597  void
598  return_dynamic_message(rclcpp::dynamic_typesupport::DynamicMessage::SharedPtr & message) = 0;
599 
600  RCLCPP_PUBLIC
601  virtual
602  void
603  handle_dynamic_message(
604  const rclcpp::dynamic_typesupport::DynamicMessage::SharedPtr & message,
605  const rclcpp::MessageInfo & message_info) = 0;
606 
607  RCLCPP_PUBLIC
608  bool
609  take_dynamic_message(
611  rclcpp::MessageInfo & message_info_out);
612  // ===============================================================================================
613 
614 protected:
615  template<typename EventCallbackT>
616  void
617  add_event_handler(
618  const EventCallbackT & callback,
619  const rcl_subscription_event_type_t event_type)
620  {
621  auto handler = std::make_shared<EventHandler<EventCallbackT,
622  std::shared_ptr<rcl_subscription_t>>>(
623  callback,
625  get_subscription_handle(),
626  event_type);
627  qos_events_in_use_by_wait_set_.insert(std::make_pair(handler.get(), false));
628  event_handlers_.insert(std::make_pair(event_type, handler));
629  }
630 
631  RCLCPP_PUBLIC
632  void default_incompatible_qos_callback(QOSRequestedIncompatibleQoSInfo & info) const;
633 
634  RCLCPP_PUBLIC
635  void default_incompatible_type_callback(IncompatibleTypeInfo & info) const;
636 
637  RCLCPP_PUBLIC
638  bool
639  matches_any_intra_process_publishers(const rmw_gid_t * sender_gid) const;
640 
641  RCLCPP_PUBLIC
642  void
643  set_on_new_message_callback(rcl_event_callback_t callback, const void * user_data);
644 
646 
647  std::shared_ptr<rcl_node_t> node_handle_;
648 
649  std::recursive_mutex callback_mutex_;
650  // It is important to declare on_new_message_callback_ before
651  // subscription_handle_, so on destruction the subscription is
652  // destroyed first. Otherwise, the rmw subscription callback
653  // would point briefly to a destroyed function.
654  std::function<void(size_t)> on_new_message_callback_{nullptr};
655  // Declare subscription_handle_ after callback
656  std::shared_ptr<rcl_subscription_t> subscription_handle_;
657  std::shared_ptr<rcl_subscription_t> intra_process_subscription_handle_;
658  rclcpp::Logger node_logger_;
659 
660  std::unordered_map<rcl_subscription_event_type_t,
661  std::shared_ptr<rclcpp::EventHandlerBase>> event_handlers_;
662 
663  bool use_intra_process_;
664  IntraProcessManagerWeakPtr weak_ipm_;
665  uint64_t intra_process_subscription_id_;
666  std::shared_ptr<rclcpp::experimental::SubscriptionIntraProcessBase> subscription_intra_process_;
667 
668  const SubscriptionEventCallbacks event_callbacks_;
669 
670 private:
671  RCLCPP_DISABLE_COPY(SubscriptionBase)
672 
673  rosidl_message_type_support_t type_support_;
674  DeliveredMessageKind delivered_message_kind_;
675 
676  std::atomic<bool> subscription_in_use_by_wait_set_{false};
677  std::atomic<bool> intra_process_subscription_waitable_in_use_by_wait_set_{false};
678  std::unordered_map<rclcpp::EventHandlerBase *,
679  std::atomic<bool>> qos_events_in_use_by_wait_set_;
680 };
681 
682 } // namespace rclcpp
683 
684 #endif // RCLCPP__SUBSCRIPTION_BASE_HPP_
Additional meta data about messages taken from subscriptions.
Encapsulation of Quality of Service settings.
Definition: qos.hpp:116
Object oriented version of rcl_serialized_message_t with destructor to avoid memory leaks.
virtual RCLCPP_PUBLIC std::shared_ptr< void > create_message()=0
Borrow a new message.
RCLCPP_PUBLIC size_t get_publisher_count() const
Get matching publisher count.
virtual RCLCPP_PUBLIC rclcpp::dynamic_typesupport::DynamicMessage::SharedPtr create_dynamic_message()=0
Borrow a new serialized message (this clones!)
RCLCPP_PUBLIC rclcpp::QoS get_actual_qos() const
Get the actual QoS settings, after the defaults have been determined.
void set_on_new_intra_process_message_callback(std::function< void(size_t)> callback)
Set a callback to be called when each new intra-process message is received.
virtual RCLCPP_PUBLIC void handle_message(std::shared_ptr< void > &message, const rclcpp::MessageInfo &message_info)=0
Check if we need to handle the message, and execute the callback if we do.
RCLCPP_PUBLIC bool can_loan_messages() const
Check if subscription instance can loan messages.
void set_on_new_message_callback(std::function< void(size_t)> callback)
Set a callback to be called when each new message is received.
RCLCPP_PUBLIC rclcpp::Waitable::SharedPtr get_intra_process_waitable() const
Return the waitable for intra-process.
void set_on_new_qos_event_callback(std::function< void(size_t)> callback, rcl_subscription_event_type_t event_type)
Set a callback to be called when each new qos event instance occurs.
RCLCPP_PUBLIC std::vector< rclcpp::NetworkFlowEndpoint > get_network_flow_endpoints() const
Get network flow endpoints.
RCLCPP_PUBLIC void setup_intra_process(uint64_t intra_process_subscription_id, IntraProcessManagerWeakPtr weak_ipm)
Implemenation detail.
RCLCPP_PUBLIC DeliveredMessageKind get_delivered_message_kind() const
Return the delivered message kind.
virtual RCLCPP_PUBLIC void return_message(std::shared_ptr< void > &message)=0
Return the message borrowed in create_message.
void clear_on_new_message_callback()
Unset the callback registered for new messages, if any.
virtual RCLCPP_PUBLIC std::shared_ptr< rclcpp::SerializedMessage > create_serialized_message()=0
Borrow a new serialized message.
RCLCPP_PUBLIC bool take_serialized(rclcpp::SerializedMessage &message_out, rclcpp::MessageInfo &message_info_out)
Take the next inter-process message, in its serialized form, from the subscription.
RCLCPP_PUBLIC bool take_type_erased(void *message_out, rclcpp::MessageInfo &message_info_out)
Take the next inter-process message from the subscription as a type erased pointer.
RCLCPP_PUBLIC bool exchange_in_use_by_wait_set_state(void *pointer_to_subscription_part, bool in_use_state)
Exchange state of whether or not a part of the subscription is used by a wait set.
void clear_on_new_qos_event_callback(rcl_subscription_event_type_t event_type)
Unset the callback registered for new qos events, if any.
RCLCPP_PUBLIC void set_content_filter(const std::string &filter_expression, const std::vector< std::string > &expression_parameters={})
Set the filter expression and expression parameters for the subscription.
virtual RCLCPP_PUBLIC ~SubscriptionBase()
Destructor.
RCLCPP_PUBLIC SubscriptionBase(rclcpp::node_interfaces::NodeBaseInterface *node_base, const rosidl_message_type_support_t &type_support_handle, const std::string &topic_name, const rcl_subscription_options_t &subscription_options, const SubscriptionEventCallbacks &event_callbacks, bool use_default_callbacks, DeliveredMessageKind delivered_message_kind=DeliveredMessageKind::ROS_MESSAGE)
Constructor.
virtual RCLCPP_PUBLIC void return_serialized_message(std::shared_ptr< rclcpp::SerializedMessage > &message)=0
Return the message borrowed in create_serialized_message.
RCLCPP_PUBLIC bool is_cft_enabled() const
Check if content filtered topic feature of the subscription instance is enabled.
void clear_on_new_intra_process_message_callback()
Unset the callback registered for new intra-process messages, if any.
RCLCPP_PUBLIC void bind_event_callbacks(const SubscriptionEventCallbacks &event_callbacks, bool use_default_callbacks)
Add event handlers for passed in event_callbacks.
RCLCPP_PUBLIC const std::unordered_map< rcl_subscription_event_type_t, std::shared_ptr< rclcpp::EventHandlerBase > > & get_event_handlers() const
Get all the QoS event handlers associated with this subscription.
RCLCPP_PUBLIC rclcpp::ContentFilterOptions get_content_filter() const
Get the filter expression and expression parameters for the subscription.
RCLCPP_PUBLIC bool is_serialized() const
Return if the subscription is serialized.
RCLCPP_PUBLIC const char * get_topic_name() const
Get the topic that this subscription is subscribed on.
Pure virtual interface class for the NodeBase part of the Node API.
enum rcl_subscription_event_type_e rcl_subscription_event_type_t
Enumeration of all of the subscription events that may fire.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_subscription_event_init(rcl_event_t *event, const rcl_subscription_t *subscription, const rcl_subscription_event_type_t event_type)
Initialize an rcl_event_t with a subscription.
Definition: event.c:104
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
DeliveredMessageKind
The kind of message that the subscription delivers in its callback, used by the executor.
RCLCPP_PUBLIC Logger get_logger(const std::string &name)
Return a named logger.
Definition: logger.cpp:34
Options available for a rcl subscription.
Definition: subscription.h:47
Options to configure content filtered topic in the subscription.
Contains callbacks for non-message events that a Subscription can receive from the middleware.