ROS 2 rclcpp + rcl - jazzy  jazzy
ROS 2 C++ Client Library with ROS Client Library
subscription_topic_statistics.hpp
1 // Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef RCLCPP__TOPIC_STATISTICS__SUBSCRIPTION_TOPIC_STATISTICS_HPP_
16 #define RCLCPP__TOPIC_STATISTICS__SUBSCRIPTION_TOPIC_STATISTICS_HPP_
17 
18 #include <memory>
19 #include <string>
20 #include <utility>
21 #include <vector>
22 
23 #include "libstatistics_collector/collector/generate_statistics_message.hpp"
24 #include "libstatistics_collector/moving_average_statistics/types.hpp"
25 #include "libstatistics_collector/topic_statistics_collector/constants.hpp"
26 #include "libstatistics_collector/topic_statistics_collector/received_message_age.hpp"
27 #include "libstatistics_collector/topic_statistics_collector/received_message_period.hpp"
28 
29 #include "rcl/time.h"
30 #include "rclcpp/time.hpp"
31 #include "rclcpp/publisher.hpp"
32 #include "rclcpp/timer.hpp"
33 
34 #include "statistics_msgs/msg/metrics_message.hpp"
35 
36 namespace rclcpp
37 {
38 namespace topic_statistics
39 {
40 
41 constexpr const char kDefaultPublishTopicName[]{"/statistics"};
42 constexpr const std::chrono::milliseconds kDefaultPublishingPeriod{std::chrono::seconds(1)};
43 
44 using libstatistics_collector::collector::GenerateStatisticMessage;
45 using statistics_msgs::msg::MetricsMessage;
46 using libstatistics_collector::moving_average_statistics::StatisticData;
47 
53 {
54  using TopicStatsCollector = libstatistics_collector::TopicStatisticsCollector;
55  using ReceivedMessageAge = libstatistics_collector::ReceivedMessageAgeCollector;
56  using ReceivedMessagePeriod = libstatistics_collector::ReceivedMessagePeriodCollector;
57 
58 public:
60 
72  const std::string & node_name,
74  : node_name_(node_name),
75  publisher_(std::move(publisher))
76  {
77  // TODO(dbbonnie): ros-tooling/aws-roadmap/issues/226, received message age
78 
79  if (nullptr == publisher_) {
80  throw std::invalid_argument("publisher pointer is nullptr");
81  }
82 
83  bring_up();
84  }
85 
87  {
88  tear_down();
89  }
90 
92 
98  virtual void handle_message(
99  const rmw_message_info_t & message_info,
100  const rclcpp::Time now_nanoseconds) const
101  {
102  std::lock_guard<std::mutex> lock(mutex_);
103  for (const auto & collector : subscriber_statistics_collectors_) {
104  collector->OnMessageReceived(message_info, now_nanoseconds.nanoseconds());
105  }
106  }
107 
109 
112  void set_publisher_timer(rclcpp::TimerBase::SharedPtr publisher_timer)
113  {
114  publisher_timer_ = publisher_timer;
115  }
116 
118 
122  {
123  std::vector<MetricsMessage> msgs;
124  rclcpp::Time window_end{get_current_nanoseconds_since_epoch()};
125 
126  {
127  std::lock_guard<std::mutex> lock(mutex_);
128  for (auto & collector : subscriber_statistics_collectors_) {
129  const auto collected_stats = collector->GetStatisticsResults();
130  collector->ClearCurrentMeasurements();
131 
132  auto message = libstatistics_collector::collector::GenerateStatisticMessage(
133  node_name_,
134  collector->GetMetricName(),
135  collector->GetMetricUnit(),
136  window_start_,
137  window_end,
138  collected_stats);
139  msgs.push_back(message);
140  }
141  }
142 
143  for (auto & msg : msgs) {
144  publisher_->publish(msg);
145  }
146  window_start_ = window_end;
147  }
148 
149 protected:
151 
156  std::vector<StatisticData> get_current_collector_data() const
157  {
158  std::vector<StatisticData> data;
159  std::lock_guard<std::mutex> lock(mutex_);
160  for (const auto & collector : subscriber_statistics_collectors_) {
161  data.push_back(collector->GetStatisticsResults());
162  }
163  return data;
164  }
165 
166 private:
168 
171  void bring_up()
172  {
173  auto received_message_age = std::make_unique<ReceivedMessageAge>();
174  received_message_age->Start();
175  subscriber_statistics_collectors_.emplace_back(std::move(received_message_age));
176 
177  auto received_message_period = std::make_unique<ReceivedMessagePeriod>();
178  received_message_period->Start();
179  {
180  std::lock_guard<std::mutex> lock(mutex_);
181  subscriber_statistics_collectors_.emplace_back(std::move(received_message_period));
182  }
183 
184  window_start_ = rclcpp::Time(get_current_nanoseconds_since_epoch());
185  }
186 
188 
191  void tear_down()
192  {
193  {
194  std::lock_guard<std::mutex> lock(mutex_);
195  for (auto & collector : subscriber_statistics_collectors_) {
196  collector->Stop();
197  }
198 
199  subscriber_statistics_collectors_.clear();
200  }
201 
202  if (publisher_timer_) {
203  publisher_timer_->cancel();
204  publisher_timer_.reset();
205  }
206 
207  publisher_.reset();
208  }
209 
211 
214  int64_t get_current_nanoseconds_since_epoch() const
215  {
216  const auto now = std::chrono::system_clock::now();
217  return std::chrono::duration_cast<std::chrono::nanoseconds>(now.time_since_epoch()).count();
218  }
219 
221  mutable std::mutex mutex_;
223  std::vector<std::unique_ptr<TopicStatsCollector>> subscriber_statistics_collectors_{};
225  const std::string node_name_;
229  rclcpp::TimerBase::SharedPtr publisher_timer_;
231  rclcpp::Time window_start_;
232 };
233 } // namespace topic_statistics
234 } // namespace rclcpp
235 
236 #endif // RCLCPP__TOPIC_STATISTICS__SUBSCRIPTION_TOPIC_STATISTICS_HPP_
A publisher publishes messages of any type to a topic.
Definition: publisher.hpp:81
std::enable_if_t< rosidl_generator_traits::is_message< T >::value &&std::is_same< T, ROSMessageType >::value > publish(std::unique_ptr< T, ROSMessageTypeDeleter > msg)
Publish a message on the topic.
Definition: publisher.hpp:242
RCLCPP_PUBLIC rcl_time_point_value_t nanoseconds() const
Get the nanoseconds since epoch.
Definition: time.cpp:212
virtual void handle_message(const rmw_message_info_t &message_info, const rclcpp::Time now_nanoseconds) const
Handle a message received by the subscription to collect statistics.
virtual void publish_message_and_reset_measurements()
Publish a populated MetricsStatisticsMessage.
SubscriptionTopicStatistics(const std::string &node_name, rclcpp::Publisher< statistics_msgs::msg::MetricsMessage >::SharedPtr publisher)
Construct a SubscriptionTopicStatistics object.
void set_publisher_timer(rclcpp::TimerBase::SharedPtr publisher_timer)
Set the timer used to publish statistics messages.
std::vector< StatisticData > get_current_collector_data() const
Return a vector of all the currently collected data.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.