Replaces ring buffer in RateStatistics with deque.

Since RateStatistics is in practice always used with increasing
timestamps, and is often sparesely populated, replace the pre-allocated
ring buffer with a simple deque where each element tracks which time it
represents.

Bug: webrtc:11600
Change-Id: I866d7cfa607228c35452f0f19575825d2e694f75
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/175906
Commit-Queue: Erik Språng <sprang@webrtc.org>
Reviewed-by: Tommi <tommi@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#31344}
diff --git a/rtc_base/rate_statistics.cc b/rtc_base/rate_statistics.cc
index c4c2e78..85621fa 100644
--- a/rtc_base/rate_statistics.cc
+++ b/rtc_base/rate_statistics.cc
@@ -20,29 +20,26 @@
 
 namespace webrtc {
 
+RateStatistics::Bucket::Bucket(int64_t timestamp)
+    : sum(0), num_samples(0), timestamp(timestamp) {}
+
 RateStatistics::RateStatistics(int64_t window_size_ms, float scale)
-    : buckets_(new Bucket[window_size_ms]()),
-      accumulated_count_(0),
+    : accumulated_count_(0),
+      first_timestamp_(-1),
       num_samples_(0),
-      oldest_time_(-window_size_ms),
-      oldest_index_(0),
       scale_(scale),
       max_window_size_ms_(window_size_ms),
       current_window_size_ms_(max_window_size_ms_) {}
 
 RateStatistics::RateStatistics(const RateStatistics& other)
-    : accumulated_count_(other.accumulated_count_),
+    : buckets_(other.buckets_),
+      accumulated_count_(other.accumulated_count_),
+      first_timestamp_(other.first_timestamp_),
       overflow_(other.overflow_),
       num_samples_(other.num_samples_),
-      oldest_time_(other.oldest_time_),
-      oldest_index_(other.oldest_index_),
       scale_(other.scale_),
       max_window_size_ms_(other.max_window_size_ms_),
-      current_window_size_ms_(other.current_window_size_ms_) {
-  buckets_ = std::make_unique<Bucket[]>(other.max_window_size_ms_);
-  std::copy(other.buckets_.get(),
-            other.buckets_.get() + other.max_window_size_ms_, buckets_.get());
-}
+      current_window_size_ms_(other.current_window_size_ms_) {}
 
 RateStatistics::RateStatistics(RateStatistics&& other) = default;
 
@@ -52,33 +49,33 @@
   accumulated_count_ = 0;
   overflow_ = false;
   num_samples_ = 0;
-  oldest_time_ = -max_window_size_ms_;
-  oldest_index_ = 0;
+  first_timestamp_ = -1;
   current_window_size_ms_ = max_window_size_ms_;
-  for (int64_t i = 0; i < max_window_size_ms_; i++)
-    buckets_[i] = Bucket();
+  buckets_.clear();
 }
 
 void RateStatistics::Update(int64_t count, int64_t now_ms) {
-  RTC_DCHECK_LE(0, count);
-  if (now_ms < oldest_time_) {
-    // Too old data is ignored.
-    return;
-  }
+  RTC_DCHECK_GE(count, 0);
 
   EraseOld(now_ms);
+  if (first_timestamp_ == -1) {
+    first_timestamp_ = now_ms;
+  }
 
-  // First ever sample, reset window to start now.
-  if (!IsInitialized())
-    oldest_time_ = now_ms;
+  if (buckets_.empty() || now_ms != buckets_.back().timestamp) {
+    if (!buckets_.empty() && now_ms < buckets_.back().timestamp) {
+      RTC_LOG(LS_WARNING) << "Timestamp " << now_ms
+                          << " is before the last added "
+                             "timestamp in the rate window: "
+                          << buckets_.back().timestamp << ", aligning to that.";
+      now_ms = buckets_.back().timestamp;
+    }
+    buckets_.emplace_back(now_ms);
+  }
+  Bucket& last_bucket = buckets_.back();
+  last_bucket.sum += count;
+  ++last_bucket.num_samples;
 
-  uint32_t now_offset = rtc::dchecked_cast<uint32_t>(now_ms - oldest_time_);
-  RTC_DCHECK_LT(now_offset, max_window_size_ms_);
-  uint32_t index = oldest_index_ + now_offset;
-  if (index >= max_window_size_ms_)
-    index -= max_window_size_ms_;
-  buckets_[index].sum += count;
-  ++buckets_[index].samples;
   if (std::numeric_limits<int64_t>::max() - accumulated_count_ > count) {
     accumulated_count_ += count;
   } else {
@@ -92,10 +89,22 @@
   // of the members as mutable...
   const_cast<RateStatistics*>(this)->EraseOld(now_ms);
 
+  int active_window_size = 0;
+  if (first_timestamp_ != -1) {
+    if (first_timestamp_ <= now_ms - current_window_size_ms_) {
+      // Count window as full even if no data points currently in view, if the
+      // data stream started before the window.
+      active_window_size = current_window_size_ms_;
+    } else {
+      // Size of a single bucket is 1ms, so even if now_ms == first_timestmap_
+      // the window size should be 1.
+      active_window_size = now_ms - first_timestamp_ + 1;
+    }
+  }
+
   // If window is a single bucket or there is only one sample in a data set that
   // has not grown to the full window size, or if the accumulator has
   // overflowed, treat this as rate unavailable.
-  int active_window_size = now_ms - oldest_time_ + 1;
   if (num_samples_ == 0 || active_window_size <= 1 ||
       (num_samples_ <= 1 &&
        rtc::SafeLt(active_window_size, current_window_size_ms_)) ||
@@ -114,43 +123,35 @@
 }
 
 void RateStatistics::EraseOld(int64_t now_ms) {
-  if (!IsInitialized())
-    return;
-
   // New oldest time that is included in data set.
-  int64_t new_oldest_time = now_ms - current_window_size_ms_ + 1;
-
-  // New oldest time is older than the current one, no need to cull data.
-  if (new_oldest_time <= oldest_time_)
-    return;
+  const int64_t new_oldest_time = now_ms - current_window_size_ms_ + 1;
 
   // Loop over buckets and remove too old data points.
-  while (num_samples_ > 0 && oldest_time_ < new_oldest_time) {
-    const Bucket& oldest_bucket = buckets_[oldest_index_];
+  while (!buckets_.empty() && buckets_.front().timestamp < new_oldest_time) {
+    const Bucket& oldest_bucket = buckets_.front();
     RTC_DCHECK_GE(accumulated_count_, oldest_bucket.sum);
-    RTC_DCHECK_GE(num_samples_, oldest_bucket.samples);
+    RTC_DCHECK_GE(num_samples_, oldest_bucket.num_samples);
     accumulated_count_ -= oldest_bucket.sum;
-    num_samples_ -= oldest_bucket.samples;
-    buckets_[oldest_index_] = Bucket();
-    if (++oldest_index_ >= max_window_size_ms_)
-      oldest_index_ = 0;
-    ++oldest_time_;
+    num_samples_ -= oldest_bucket.num_samples;
+    buckets_.pop_front();
     // This does not clear overflow_ even when counter is empty.
     // TODO(https://bugs.webrtc.org/11247): Consider if overflow_ can be reset.
   }
-  oldest_time_ = new_oldest_time;
 }
 
 bool RateStatistics::SetWindowSize(int64_t window_size_ms, int64_t now_ms) {
   if (window_size_ms <= 0 || window_size_ms > max_window_size_ms_)
     return false;
+  if (first_timestamp_ != -1) {
+    // If the window changes (e.g. decreases - removing data point, then
+    // increases again) we need to update the first timestamp mark as
+    // otherwise it indicates the window coveres a region of zeros, suddenly
+    // under-estimating the rate.
+    first_timestamp_ = std::max(first_timestamp_, now_ms - window_size_ms + 1);
+  }
   current_window_size_ms_ = window_size_ms;
   EraseOld(now_ms);
   return true;
 }
 
-bool RateStatistics::IsInitialized() const {
-  return oldest_time_ != -max_window_size_ms_;
-}
-
 }  // namespace webrtc
diff --git a/rtc_base/rate_statistics.h b/rtc_base/rate_statistics.h
index 11c8cee..dc8d7f5 100644
--- a/rtc_base/rate_statistics.h
+++ b/rtc_base/rate_statistics.h
@@ -14,6 +14,7 @@
 #include <stddef.h>
 #include <stdint.h>
 
+#include <deque>
 #include <memory>
 
 #include "absl/types/optional.h"
@@ -28,6 +29,10 @@
 // high; for instance, a 20 Mbit/sec video stream can wrap a 32-bit byte
 // counter in 14 minutes.
 
+// Note that timestamps used in Update(), Rate() and SetWindowSize() must never
+// decrease for two consecutive calls.
+// TODO(bugs.webrtc.org/11600): Migrate from int64_t to Timestamp.
+
 class RTC_EXPORT RateStatistics {
  public:
   static constexpr float kBpsScale = 8000.0f;
@@ -65,19 +70,22 @@
 
  private:
   void EraseOld(int64_t now_ms);
-  bool IsInitialized() const;
 
-  // Counters are kept in buckets (circular buffer), with one bucket
-  // per millisecond.
   struct Bucket {
+    explicit Bucket(int64_t timestamp);
     int64_t sum;  // Sum of all samples in this bucket.
-    int samples;  // Number of samples in this bucket.
+    int num_samples;          // Number of samples in this bucket.
+    const int64_t timestamp;  // Timestamp this bucket corresponds to.
   };
-  std::unique_ptr<Bucket[]> buckets_;
+  // All buckets within the time window, ordered by time.
+  std::deque<Bucket> buckets_;
 
-  // Total count recorded in buckets.
+  // Total count recorded in all buckets.
   int64_t accumulated_count_;
 
+  // Timestamp of the first data point seen, or -1 of none seen.
+  int64_t first_timestamp_;
+
   // True if accumulated_count_ has ever grown too large to be
   // contained in its integer type.
   bool overflow_ = false;
@@ -85,12 +93,6 @@
   // The total number of samples in the buckets.
   int num_samples_;
 
-  // Oldest time recorded in buckets.
-  int64_t oldest_time_;
-
-  // Bucket index of oldest counter recorded in buckets.
-  int64_t oldest_index_;
-
   // To convert counts/ms to desired units
   const float scale_;