15 #ifndef RCLCPP__TOPIC_STATISTICS__SUBSCRIPTION_TOPIC_STATISTICS_HPP_
16 #define RCLCPP__TOPIC_STATISTICS__SUBSCRIPTION_TOPIC_STATISTICS_HPP_
23 #include "libstatistics_collector/collector/generate_statistics_message.hpp"
24 #include "libstatistics_collector/moving_average_statistics/types.hpp"
25 #include "libstatistics_collector/topic_statistics_collector/constants.hpp"
26 #include "libstatistics_collector/topic_statistics_collector/received_message_age.hpp"
27 #include "libstatistics_collector/topic_statistics_collector/received_message_period.hpp"
30 #include "rclcpp/time.hpp"
31 #include "rclcpp/publisher.hpp"
32 #include "rclcpp/timer.hpp"
34 #include "statistics_msgs/msg/metrics_message.hpp"
38 namespace topic_statistics
41 constexpr
const char kDefaultPublishTopicName[]{
"/statistics"};
42 constexpr
const std::chrono::milliseconds kDefaultPublishingPeriod{std::chrono::seconds(1)};
44 using libstatistics_collector::collector::GenerateStatisticMessage;
45 using statistics_msgs::msg::MetricsMessage;
46 using libstatistics_collector::moving_average_statistics::StatisticData;
54 template<
typename CallbackMessageT>
57 using TopicStatsCollector =
58 libstatistics_collector::topic_statistics_collector::TopicStatisticsCollector<
60 using ReceivedMessageAge =
61 libstatistics_collector::topic_statistics_collector::ReceivedMessageAgeCollector<
63 using ReceivedMessagePeriod =
64 libstatistics_collector::topic_statistics_collector::ReceivedMessagePeriodCollector<
81 const std::string & node_name,
83 : node_name_(node_name),
84 publisher_(std::move(publisher))
88 if (
nullptr == publisher_) {
89 throw std::invalid_argument(
"publisher pointer is nullptr");
108 const CallbackMessageT & received_message,
111 std::lock_guard<std::mutex> lock(mutex_);
112 for (
const auto & collector : subscriber_statistics_collectors_) {
113 collector->OnMessageReceived(received_message, now_nanoseconds.
nanoseconds());
123 publisher_timer_ = publisher_timer;
132 std::vector<MetricsMessage> msgs;
133 rclcpp::Time window_end{get_current_nanoseconds_since_epoch()};
136 std::lock_guard<std::mutex> lock(mutex_);
137 for (
auto & collector : subscriber_statistics_collectors_) {
138 const auto collected_stats = collector->GetStatisticsResults();
139 collector->ClearCurrentMeasurements();
141 auto message = libstatistics_collector::collector::GenerateStatisticMessage(
143 collector->GetMetricName(),
144 collector->GetMetricUnit(),
148 msgs.push_back(message);
152 for (
auto & msg : msgs) {
155 window_start_ = window_end;
167 std::vector<StatisticData> data;
168 std::lock_guard<std::mutex> lock(mutex_);
169 for (
const auto & collector : subscriber_statistics_collectors_) {
170 data.push_back(collector->GetStatisticsResults());
182 auto received_message_age = std::make_unique<ReceivedMessageAge>();
183 received_message_age->Start();
184 subscriber_statistics_collectors_.emplace_back(std::move(received_message_age));
186 auto received_message_period = std::make_unique<ReceivedMessagePeriod>();
187 received_message_period->Start();
189 std::lock_guard<std::mutex> lock(mutex_);
190 subscriber_statistics_collectors_.emplace_back(std::move(received_message_period));
193 window_start_ =
rclcpp::Time(get_current_nanoseconds_since_epoch());
203 std::lock_guard<std::mutex> lock(mutex_);
204 for (
auto & collector : subscriber_statistics_collectors_) {
208 subscriber_statistics_collectors_.clear();
211 if (publisher_timer_) {
212 publisher_timer_->cancel();
213 publisher_timer_.reset();
223 int64_t get_current_nanoseconds_since_epoch()
const
225 const auto now = std::chrono::system_clock::now();
226 return std::chrono::duration_cast<std::chrono::nanoseconds>(now.time_since_epoch()).count();
230 mutable std::mutex mutex_;
232 std::vector<std::unique_ptr<TopicStatsCollector>> subscriber_statistics_collectors_{};
234 const std::string node_name_;
238 rclcpp::TimerBase::SharedPtr publisher_timer_;
A publisher publishes messages of any type to a topic.
std::enable_if_t< rosidl_generator_traits::is_message< T >::value &&std::is_same< T, ROSMessageType >::value > publish(std::unique_ptr< T, ROSMessageTypeDeleter > msg)
Publish a message on the topic.
RCLCPP_PUBLIC rcl_time_point_value_t nanoseconds() const
Get the nanoseconds since epoch.
SubscriptionTopicStatistics(const std::string &node_name, rclcpp::Publisher< statistics_msgs::msg::MetricsMessage >::SharedPtr publisher)
Construct a SubscriptionTopicStatistics object.
virtual void publish_message_and_reset_measurements()
Publish a populated MetricsStatisticsMessage.
virtual void handle_message(const CallbackMessageT &received_message, const rclcpp::Time now_nanoseconds) const
Handle a message received by the subscription to collect statistics.
void set_publisher_timer(rclcpp::TimerBase::SharedPtr publisher_timer)
Set the timer used to publish statistics messages.
std::vector< StatisticData > get_current_collector_data() const
Return a vector of all the currently collected data.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.