15 #ifndef RCLCPP__TOPIC_STATISTICS__SUBSCRIPTION_TOPIC_STATISTICS_HPP_
16 #define RCLCPP__TOPIC_STATISTICS__SUBSCRIPTION_TOPIC_STATISTICS_HPP_
23 #include "libstatistics_collector/collector/generate_statistics_message.hpp"
24 #include "libstatistics_collector/moving_average_statistics/types.hpp"
25 #include "libstatistics_collector/topic_statistics_collector/constants.hpp"
26 #include "libstatistics_collector/topic_statistics_collector/received_message_age.hpp"
27 #include "libstatistics_collector/topic_statistics_collector/received_message_period.hpp"
30 #include "rclcpp/time.hpp"
31 #include "rclcpp/publisher.hpp"
32 #include "rclcpp/timer.hpp"
34 #include "statistics_msgs/msg/metrics_message.hpp"
38 namespace topic_statistics
41 constexpr
const char kDefaultPublishTopicName[]{
"/statistics"};
42 constexpr
const std::chrono::milliseconds kDefaultPublishingPeriod{std::chrono::seconds(1)};
44 using libstatistics_collector::collector::GenerateStatisticMessage;
45 using statistics_msgs::msg::MetricsMessage;
46 using libstatistics_collector::moving_average_statistics::StatisticData;
54 using TopicStatsCollector = libstatistics_collector::TopicStatisticsCollector;
55 using ReceivedMessageAge = libstatistics_collector::ReceivedMessageAgeCollector;
56 using ReceivedMessagePeriod = libstatistics_collector::ReceivedMessagePeriodCollector;
72 const std::string & node_name,
74 : node_name_(node_name),
75 publisher_(std::move(publisher))
79 if (
nullptr == publisher_) {
80 throw std::invalid_argument(
"publisher pointer is nullptr");
99 const rmw_message_info_t & message_info,
102 std::lock_guard<std::mutex> lock(mutex_);
103 for (
const auto & collector : subscriber_statistics_collectors_) {
104 collector->OnMessageReceived(message_info, now_nanoseconds.
nanoseconds());
114 publisher_timer_ = publisher_timer;
123 std::vector<MetricsMessage> msgs;
124 rclcpp::Time window_end{get_current_nanoseconds_since_epoch()};
127 std::lock_guard<std::mutex> lock(mutex_);
128 for (
auto & collector : subscriber_statistics_collectors_) {
129 const auto collected_stats = collector->GetStatisticsResults();
130 collector->ClearCurrentMeasurements();
132 auto message = libstatistics_collector::collector::GenerateStatisticMessage(
134 collector->GetMetricName(),
135 collector->GetMetricUnit(),
139 msgs.push_back(message);
143 for (
auto & msg : msgs) {
146 window_start_ = window_end;
158 std::vector<StatisticData> data;
159 std::lock_guard<std::mutex> lock(mutex_);
160 for (
const auto & collector : subscriber_statistics_collectors_) {
161 data.push_back(collector->GetStatisticsResults());
173 auto received_message_age = std::make_unique<ReceivedMessageAge>();
174 received_message_age->Start();
175 auto received_message_period = std::make_unique<ReceivedMessagePeriod>();
176 received_message_period->Start();
178 std::lock_guard<std::mutex> lock(mutex_);
179 subscriber_statistics_collectors_.emplace_back(std::move(received_message_age));
180 subscriber_statistics_collectors_.emplace_back(std::move(received_message_period));
183 window_start_ =
rclcpp::Time(get_current_nanoseconds_since_epoch());
193 std::lock_guard<std::mutex> lock(mutex_);
194 for (
auto & collector : subscriber_statistics_collectors_) {
198 subscriber_statistics_collectors_.clear();
201 if (publisher_timer_) {
202 publisher_timer_->cancel();
203 publisher_timer_.reset();
213 int64_t get_current_nanoseconds_since_epoch()
const
215 const auto now = std::chrono::system_clock::now();
216 return std::chrono::duration_cast<std::chrono::nanoseconds>(now.time_since_epoch()).count();
220 mutable std::mutex mutex_;
222 std::vector<std::unique_ptr<TopicStatsCollector>> subscriber_statistics_collectors_{};
224 const std::string node_name_;
228 rclcpp::TimerBase::SharedPtr publisher_timer_;
A publisher publishes messages of any type to a topic.
std::enable_if_t< rosidl_generator_traits::is_message< T >::value &&std::is_same< T, ROSMessageType >::value > publish(std::unique_ptr< T, ROSMessageTypeDeleter > msg)
Publish a message on the topic.
RCLCPP_PUBLIC rcl_time_point_value_t nanoseconds() const
Get the nanoseconds since epoch.
virtual void handle_message(const rmw_message_info_t &message_info, const rclcpp::Time now_nanoseconds) const
Handle a message received by the subscription to collect statistics.
virtual void publish_message_and_reset_measurements()
Publish a populated MetricsStatisticsMessage.
SubscriptionTopicStatistics(const std::string &node_name, rclcpp::Publisher< statistics_msgs::msg::MetricsMessage >::SharedPtr publisher)
Construct a SubscriptionTopicStatistics object.
void set_publisher_timer(rclcpp::TimerBase::SharedPtr publisher_timer)
Set the timer used to publish statistics messages.
std::vector< StatisticData > get_current_collector_data() const
Return a vector of all the currently collected data.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.