Add ability to handle data from multiple streams in RateAccCounter.

BUG=webrtc:5283

Review-Url: https://codereview.webrtc.org/2235223002
Cr-Commit-Position: refs/heads/master@{#14864}
diff --git a/webrtc/video/stats_counter.cc b/webrtc/video/stats_counter.cc
index f1b82e8..9923ed2 100644
--- a/webrtc/video/stats_counter.cc
+++ b/webrtc/video/stats_counter.cc
@@ -11,6 +11,8 @@
 #include "webrtc/video/stats_counter.h"
 
 #include <algorithm>
+#include <limits>
+#include <map>
 
 #include "webrtc/base/checks.h"
 #include "webrtc/system_wrappers/include/clock.h"
@@ -20,6 +22,7 @@
 namespace {
 // Default periodic time interval for processing samples.
 const int64_t kDefaultProcessIntervalMs = 2000;
+const uint32_t kStreamId0 = 0;
 }  // namespace
 
 // Class holding periodically computed metrics.
@@ -62,19 +65,98 @@
   AggregatedStats stats_;
 };
 
+// Class holding gathered samples within a process interval.
+class Samples {
+ public:
+  Samples() : total_count_(0) {}
+  ~Samples() {}
+
+  void Add(int sample, uint32_t stream_id) {
+    samples_[stream_id].Add(sample);
+    ++total_count_;
+  }
+  void Set(int sample, uint32_t stream_id) {
+    samples_[stream_id].Set(sample);
+    ++total_count_;
+  }
+
+  int64_t Count() const { return total_count_; }
+  bool Empty() const { return total_count_ == 0; }
+
+  int64_t Sum() const {
+    int64_t sum = 0;
+    for (const auto& it : samples_)
+      sum += it.second.sum_;
+    return sum;
+  }
+
+  int Max() const {
+    int max = std::numeric_limits<int>::min();
+    for (const auto& it : samples_)
+      max = std::max(it.second.max_, max);
+    return max;
+  }
+
+  void Reset() {
+    for (auto& it : samples_)
+      it.second.Reset();
+    total_count_ = 0;
+  }
+
+  int64_t Diff() const {
+    int64_t sum_diff = 0;
+    int count = 0;
+    for (const auto& it : samples_) {
+      if (it.second.count_ > 0) {
+        int64_t diff = it.second.sum_ - it.second.last_sum_;
+        if (diff >= 0) {
+          sum_diff += diff;
+          ++count;
+        }
+      }
+    }
+    return (count > 0) ? sum_diff : -1;
+  }
+
+ private:
+  struct Stats {
+    void Add(int sample) {
+      sum_ += sample;
+      ++count_;
+      max_ = std::max(sample, max_);
+    }
+    void Set(int sample) {
+      sum_ = sample;
+      ++count_;
+    }
+    void Reset() {
+      if (count_ > 0)
+        last_sum_ = sum_;
+      sum_ = 0;
+      count_ = 0;
+      max_ = std::numeric_limits<int>::min();
+    }
+
+    int max_ = std::numeric_limits<int>::min();
+    int64_t count_ = 0;
+    int64_t sum_ = 0;
+    int64_t last_sum_ = 0;
+  };
+
+  int64_t total_count_;
+  std::map<uint32_t, Stats> samples_;  // Gathered samples mapped by stream id.
+};
+
 // StatsCounter class.
 StatsCounter::StatsCounter(Clock* clock,
                            int64_t process_intervals_ms,
                            bool include_empty_intervals,
                            StatsCounterObserver* observer)
-    : max_(0),
-      sum_(0),
-      num_samples_(0),
-      last_sum_(0),
-      aggregated_counter_(new AggregatedCounter()),
+    : include_empty_intervals_(include_empty_intervals),
       process_intervals_ms_(process_intervals_ms),
+      aggregated_counter_(new AggregatedCounter()),
+      samples_(new Samples()),
       clock_(clock),
-      include_empty_intervals_(include_empty_intervals),
       observer_(observer),
       last_process_time_ms_(-1),
       paused_(false) {
@@ -120,21 +202,15 @@
   return true;
 }
 
-void StatsCounter::Set(int sample) {
+void StatsCounter::Set(int sample, uint32_t stream_id) {
   TryProcess();
-  ++num_samples_;
-  sum_ = sample;
+  samples_->Set(sample, stream_id);
   paused_ = false;
 }
 
 void StatsCounter::Add(int sample) {
   TryProcess();
-  ++num_samples_;
-  sum_ += sample;
-
-  if (num_samples_ == 1)
-    max_ = sample;
-  max_ = std::max(sample, max_);
+  samples_->Add(sample, kStreamId0);
   paused_ = false;
 }
 
@@ -164,17 +240,13 @@
     // If there are no samples, all elapsed intervals are empty (otherwise one
     // interval contains sample(s), discard this interval).
     int empty_intervals =
-        (num_samples_ == 0) ? elapsed_intervals : (elapsed_intervals - 1);
+        samples_->Empty() ? elapsed_intervals : (elapsed_intervals - 1);
     ReportMetricToAggregatedCounter(GetValueForEmptyInterval(),
                                     empty_intervals);
   }
 
   // Reset samples for elapsed interval.
-  if (num_samples_ > 0)
-    last_sum_ = sum_;
-  sum_ = 0;
-  max_ = 0;
-  num_samples_ = 0;
+  samples_->Reset();
 }
 
 bool StatsCounter::IncludeEmptyIntervals() const {
@@ -195,9 +267,11 @@
 }
 
 bool AvgCounter::GetMetric(int* metric) const {
-  if (num_samples_ == 0)
+  int64_t count = samples_->Count();
+  if (count == 0)
     return false;
-  *metric = (sum_ + num_samples_ / 2) / num_samples_;
+
+  *metric = (samples_->Sum() + count / 2) / count;
   return true;
 }
 
@@ -218,9 +292,10 @@
 }
 
 bool MaxCounter::GetMetric(int* metric) const {
-  if (num_samples_ == 0)
+  if (samples_->Empty())
     return false;
-  *metric = max_;
+
+  *metric = samples_->Max();
   return true;
 }
 
@@ -240,9 +315,11 @@
 }
 
 bool PercentCounter::GetMetric(int* metric) const {
-  if (num_samples_ == 0)
+  int64_t count = samples_->Count();
+  if (count == 0)
     return false;
-  *metric = (sum_ * 100 + num_samples_ / 2) / num_samples_;
+
+  *metric = (samples_->Sum() * 100 + count / 2) / count;
   return true;
 }
 
@@ -262,9 +339,11 @@
 }
 
 bool PermilleCounter::GetMetric(int* metric) const {
-  if (num_samples_ == 0)
+  int64_t count = samples_->Count();
+  if (count == 0)
     return false;
-  *metric = (sum_ * 1000 + num_samples_ / 2) / num_samples_;
+
+  *metric = (samples_->Sum() * 1000 + count / 2) / count;
   return true;
 }
 
@@ -286,9 +365,11 @@
 }
 
 bool RateCounter::GetMetric(int* metric) const {
-  if (num_samples_ == 0)
+  if (samples_->Empty())
     return false;
-  *metric = (sum_ * 1000 + process_intervals_ms_ / 2) / process_intervals_ms_;
+
+  *metric = (samples_->Sum() * 1000 + process_intervals_ms_ / 2) /
+            process_intervals_ms_;
   return true;
 }
 
@@ -304,15 +385,16 @@
                    include_empty_intervals,
                    observer) {}
 
-void RateAccCounter::Set(int sample) {
-  StatsCounter::Set(sample);
+void RateAccCounter::Set(int sample, uint32_t stream_id) {
+  StatsCounter::Set(sample, stream_id);
 }
 
 bool RateAccCounter::GetMetric(int* metric) const {
-  if (num_samples_ == 0 || last_sum_ > sum_)
+  int64_t diff = samples_->Diff();
+  if (diff < 0 || (!include_empty_intervals_ && diff == 0))
     return false;
-  *metric = ((sum_ - last_sum_) * 1000 + process_intervals_ms_ / 2) /
-            process_intervals_ms_;
+
+  *metric = (diff * 1000 + process_intervals_ms_ / 2) / process_intervals_ms_;
   return true;
 }
 
diff --git a/webrtc/video/stats_counter.h b/webrtc/video/stats_counter.h
index c6e6151..08663f8 100644
--- a/webrtc/video/stats_counter.h
+++ b/webrtc/video/stats_counter.h
@@ -20,6 +20,7 @@
 
 class AggregatedCounter;
 class Clock;
+class Samples;
 
 // |StatsCounterObserver| is called periodically when a metric is updated.
 class StatsCounterObserver {
@@ -104,15 +105,12 @@
                StatsCounterObserver* observer);
 
   void Add(int sample);
-  void Set(int sample);
+  void Set(int sample, uint32_t stream_id);
 
-  int max_;
-  int64_t sum_;
-  int64_t num_samples_;
-  int64_t last_sum_;
-
-  const std::unique_ptr<AggregatedCounter> aggregated_counter_;
+  const bool include_empty_intervals_;
   const int64_t process_intervals_ms_;
+  const std::unique_ptr<AggregatedCounter> aggregated_counter_;
+  const std::unique_ptr<Samples> samples_;
 
  private:
   bool TimeToProcess(int* num_elapsed_intervals);
@@ -121,7 +119,6 @@
   bool IncludeEmptyIntervals() const;
 
   Clock* const clock_;
-  const bool include_empty_intervals_;
   const std::unique_ptr<StatsCounterObserver> observer_;
   int64_t last_process_time_ms_;
   bool paused_;
@@ -262,7 +259,7 @@
                  bool include_empty_intervals);
   ~RateAccCounter() override {}
 
-  void Set(int sample);
+  void Set(int sample, uint32_t stream_id);
 
  private:
   bool GetMetric(int* metric) const override;
diff --git a/webrtc/video/stats_counter_unittest.cc b/webrtc/video/stats_counter_unittest.cc
index 3c9633a..7484c18 100644
--- a/webrtc/video/stats_counter_unittest.cc
+++ b/webrtc/video/stats_counter_unittest.cc
@@ -16,6 +16,7 @@
 namespace webrtc {
 namespace {
 const int kDefaultProcessIntervalMs = 2000;
+const uint32_t kStreamId = 123456;
 
 class StatsCounterObserverImpl : public StatsCounterObserver {
  public:
@@ -42,7 +43,7 @@
   void SetSampleAndAdvance(int sample,
                            int interval_ms,
                            RateAccCounter* counter) {
-    counter->Set(sample);
+    counter->Set(sample, kStreamId);
     clock_.AdvanceTimeMilliseconds(interval_ms);
   }
 
@@ -197,11 +198,11 @@
 TEST_F(StatsCounterTest, TestMetric_RateAccCounter) {
   StatsCounterObserverImpl* observer = new StatsCounterObserverImpl();
   RateAccCounter counter(&clock_, observer, true);
-  counter.Set(175);
-  counter.Set(188);
+  counter.Set(175, kStreamId);
+  counter.Set(188, kStreamId);
   clock_.AdvanceTimeMilliseconds(kDefaultProcessIntervalMs);
   // Trigger process (sample included in next interval).
-  counter.Set(192);
+  counter.Set(192, kStreamId);
   // Rate per interval: (188 - 0) / 2 sec = 94 samples/sec
   EXPECT_EQ(1, observer->num_calls_);
   EXPECT_EQ(94, observer->last_sample_);
@@ -212,6 +213,37 @@
   EXPECT_EQ(94, stats.max);
 }
 
+TEST_F(StatsCounterTest, TestMetric_RateAccCounterWithMultipleStreamIds) {
+  StatsCounterObserverImpl* observer = new StatsCounterObserverImpl();
+  RateAccCounter counter(&clock_, observer, true);
+  counter.Set(175, kStreamId);
+  counter.Set(188, kStreamId);
+  counter.Set(100, kStreamId + 1);
+  clock_.AdvanceTimeMilliseconds(kDefaultProcessIntervalMs);
+  // Trigger process (sample included in next interval).
+  counter.Set(150, kStreamId + 1);
+  // Rate per interval: ((188 - 0) + (100 - 0)) / 2 sec = 144 samples/sec
+  EXPECT_EQ(1, observer->num_calls_);
+  EXPECT_EQ(144, observer->last_sample_);
+  clock_.AdvanceTimeMilliseconds(kDefaultProcessIntervalMs);
+  // Trigger process (sample included in next interval).
+  counter.Set(198, kStreamId);
+  // Rate per interval: (0 + (150 - 100)) / 2 sec = 25 samples/sec
+  EXPECT_EQ(2, observer->num_calls_);
+  EXPECT_EQ(25, observer->last_sample_);
+  clock_.AdvanceTimeMilliseconds(kDefaultProcessIntervalMs);
+  // Trigger process (sample included in next interval).
+  counter.Set(200, kStreamId);
+  // Rate per interval: ((198 - 188) + (0)) / 2 sec = 5 samples/sec
+  EXPECT_EQ(3, observer->num_calls_);
+  EXPECT_EQ(5, observer->last_sample_);
+  // Aggregated stats.
+  AggregatedStats stats = counter.GetStats();
+  EXPECT_EQ(3, stats.num_samples);
+  EXPECT_EQ(5, stats.min);
+  EXPECT_EQ(144, stats.max);
+}
+
 TEST_F(StatsCounterTest, TestGetStats_MultipleIntervals) {
   AvgCounter counter(&clock_, nullptr, false);
   const int kSample1 = 1;
@@ -266,7 +298,7 @@
   EXPECT_EQ(1, observer->num_calls_);
   EXPECT_EQ(100, observer->last_sample_);
   // Trigger process (sample included in next interval).
-  counter.Set(2000);
+  counter.Set(2000, kStreamId);
   EXPECT_EQ(2, observer->num_calls_);
   EXPECT_EQ(300, observer->last_sample_);
   // Aggregated stats.
@@ -386,7 +418,7 @@
   clock_.AdvanceTimeMilliseconds(kDefaultProcessIntervalMs);
   VerifyStatsIsNotSet(counter.ProcessAndGetStats());
   // Add sample and advance 3 intervals (2 w/o samples -> zero reported).
-  counter.Set(12);
+  counter.Set(12, kStreamId);
   clock_.AdvanceTimeMilliseconds(kDefaultProcessIntervalMs * 4 - 1);
   // Trigger process and verify stats: [0:2][6:1]
   counter.ProcessAndGetStats();
@@ -399,7 +431,7 @@
   EXPECT_EQ(0, observer->last_sample_);
   // Insert sample and advance non-complete interval, no change, [0:3][6:1]
   clock_.AdvanceTimeMilliseconds(kDefaultProcessIntervalMs - 1);
-  counter.Set(60);
+  counter.Set(60, kStreamId);
   EXPECT_EQ(4, observer->num_calls_);
   // Make next interval pass, [0:3][6:1][24:1]
   clock_.AdvanceTimeMilliseconds(1);
@@ -415,7 +447,7 @@
   StatsCounterObserverImpl* observer = new StatsCounterObserverImpl();
   RateAccCounter counter(&clock_, observer, false);
   // Add sample and advance 3 intervals (2 w/o samples -> ignored).
-  counter.Set(12);
+  counter.Set(12, kStreamId);
   clock_.AdvanceTimeMilliseconds(kDefaultProcessIntervalMs * 4 - 1);
   // Trigger process and verify stats: [6:1]
   counter.ProcessAndGetStats();
@@ -427,7 +459,7 @@
   EXPECT_EQ(1, observer->num_calls_);
   // Insert sample and advance non-complete interval, no change, [6:1]
   clock_.AdvanceTimeMilliseconds(kDefaultProcessIntervalMs - 1);
-  counter.Set(60);
+  counter.Set(60, kStreamId);
   counter.ProcessAndGetStats();
   EXPECT_EQ(1, observer->num_calls_);
   // Make next interval pass, [6:1][24:1]