Replaces ring buffer in RateStatistics with deque.
Since RateStatistics is in practice always used with increasing
timestamps, and is often sparesely populated, replace the pre-allocated
ring buffer with a simple deque where each element tracks which time it
represents.
Bug: webrtc:11600
Change-Id: I866d7cfa607228c35452f0f19575825d2e694f75
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/175906
Commit-Queue: Erik Språng <sprang@webrtc.org>
Reviewed-by: Tommi <tommi@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#31344}
diff --git a/rtc_base/rate_statistics.cc b/rtc_base/rate_statistics.cc
index c4c2e78..85621fa 100644
--- a/rtc_base/rate_statistics.cc
+++ b/rtc_base/rate_statistics.cc
@@ -20,29 +20,26 @@
namespace webrtc {
+RateStatistics::Bucket::Bucket(int64_t timestamp)
+ : sum(0), num_samples(0), timestamp(timestamp) {}
+
RateStatistics::RateStatistics(int64_t window_size_ms, float scale)
- : buckets_(new Bucket[window_size_ms]()),
- accumulated_count_(0),
+ : accumulated_count_(0),
+ first_timestamp_(-1),
num_samples_(0),
- oldest_time_(-window_size_ms),
- oldest_index_(0),
scale_(scale),
max_window_size_ms_(window_size_ms),
current_window_size_ms_(max_window_size_ms_) {}
RateStatistics::RateStatistics(const RateStatistics& other)
- : accumulated_count_(other.accumulated_count_),
+ : buckets_(other.buckets_),
+ accumulated_count_(other.accumulated_count_),
+ first_timestamp_(other.first_timestamp_),
overflow_(other.overflow_),
num_samples_(other.num_samples_),
- oldest_time_(other.oldest_time_),
- oldest_index_(other.oldest_index_),
scale_(other.scale_),
max_window_size_ms_(other.max_window_size_ms_),
- current_window_size_ms_(other.current_window_size_ms_) {
- buckets_ = std::make_unique<Bucket[]>(other.max_window_size_ms_);
- std::copy(other.buckets_.get(),
- other.buckets_.get() + other.max_window_size_ms_, buckets_.get());
-}
+ current_window_size_ms_(other.current_window_size_ms_) {}
RateStatistics::RateStatistics(RateStatistics&& other) = default;
@@ -52,33 +49,33 @@
accumulated_count_ = 0;
overflow_ = false;
num_samples_ = 0;
- oldest_time_ = -max_window_size_ms_;
- oldest_index_ = 0;
+ first_timestamp_ = -1;
current_window_size_ms_ = max_window_size_ms_;
- for (int64_t i = 0; i < max_window_size_ms_; i++)
- buckets_[i] = Bucket();
+ buckets_.clear();
}
void RateStatistics::Update(int64_t count, int64_t now_ms) {
- RTC_DCHECK_LE(0, count);
- if (now_ms < oldest_time_) {
- // Too old data is ignored.
- return;
- }
+ RTC_DCHECK_GE(count, 0);
EraseOld(now_ms);
+ if (first_timestamp_ == -1) {
+ first_timestamp_ = now_ms;
+ }
- // First ever sample, reset window to start now.
- if (!IsInitialized())
- oldest_time_ = now_ms;
+ if (buckets_.empty() || now_ms != buckets_.back().timestamp) {
+ if (!buckets_.empty() && now_ms < buckets_.back().timestamp) {
+ RTC_LOG(LS_WARNING) << "Timestamp " << now_ms
+ << " is before the last added "
+ "timestamp in the rate window: "
+ << buckets_.back().timestamp << ", aligning to that.";
+ now_ms = buckets_.back().timestamp;
+ }
+ buckets_.emplace_back(now_ms);
+ }
+ Bucket& last_bucket = buckets_.back();
+ last_bucket.sum += count;
+ ++last_bucket.num_samples;
- uint32_t now_offset = rtc::dchecked_cast<uint32_t>(now_ms - oldest_time_);
- RTC_DCHECK_LT(now_offset, max_window_size_ms_);
- uint32_t index = oldest_index_ + now_offset;
- if (index >= max_window_size_ms_)
- index -= max_window_size_ms_;
- buckets_[index].sum += count;
- ++buckets_[index].samples;
if (std::numeric_limits<int64_t>::max() - accumulated_count_ > count) {
accumulated_count_ += count;
} else {
@@ -92,10 +89,22 @@
// of the members as mutable...
const_cast<RateStatistics*>(this)->EraseOld(now_ms);
+ int active_window_size = 0;
+ if (first_timestamp_ != -1) {
+ if (first_timestamp_ <= now_ms - current_window_size_ms_) {
+ // Count window as full even if no data points currently in view, if the
+ // data stream started before the window.
+ active_window_size = current_window_size_ms_;
+ } else {
+ // Size of a single bucket is 1ms, so even if now_ms == first_timestmap_
+ // the window size should be 1.
+ active_window_size = now_ms - first_timestamp_ + 1;
+ }
+ }
+
// If window is a single bucket or there is only one sample in a data set that
// has not grown to the full window size, or if the accumulator has
// overflowed, treat this as rate unavailable.
- int active_window_size = now_ms - oldest_time_ + 1;
if (num_samples_ == 0 || active_window_size <= 1 ||
(num_samples_ <= 1 &&
rtc::SafeLt(active_window_size, current_window_size_ms_)) ||
@@ -114,43 +123,35 @@
}
void RateStatistics::EraseOld(int64_t now_ms) {
- if (!IsInitialized())
- return;
-
// New oldest time that is included in data set.
- int64_t new_oldest_time = now_ms - current_window_size_ms_ + 1;
-
- // New oldest time is older than the current one, no need to cull data.
- if (new_oldest_time <= oldest_time_)
- return;
+ const int64_t new_oldest_time = now_ms - current_window_size_ms_ + 1;
// Loop over buckets and remove too old data points.
- while (num_samples_ > 0 && oldest_time_ < new_oldest_time) {
- const Bucket& oldest_bucket = buckets_[oldest_index_];
+ while (!buckets_.empty() && buckets_.front().timestamp < new_oldest_time) {
+ const Bucket& oldest_bucket = buckets_.front();
RTC_DCHECK_GE(accumulated_count_, oldest_bucket.sum);
- RTC_DCHECK_GE(num_samples_, oldest_bucket.samples);
+ RTC_DCHECK_GE(num_samples_, oldest_bucket.num_samples);
accumulated_count_ -= oldest_bucket.sum;
- num_samples_ -= oldest_bucket.samples;
- buckets_[oldest_index_] = Bucket();
- if (++oldest_index_ >= max_window_size_ms_)
- oldest_index_ = 0;
- ++oldest_time_;
+ num_samples_ -= oldest_bucket.num_samples;
+ buckets_.pop_front();
// This does not clear overflow_ even when counter is empty.
// TODO(https://bugs.webrtc.org/11247): Consider if overflow_ can be reset.
}
- oldest_time_ = new_oldest_time;
}
bool RateStatistics::SetWindowSize(int64_t window_size_ms, int64_t now_ms) {
if (window_size_ms <= 0 || window_size_ms > max_window_size_ms_)
return false;
+ if (first_timestamp_ != -1) {
+ // If the window changes (e.g. decreases - removing data point, then
+ // increases again) we need to update the first timestamp mark as
+ // otherwise it indicates the window coveres a region of zeros, suddenly
+ // under-estimating the rate.
+ first_timestamp_ = std::max(first_timestamp_, now_ms - window_size_ms + 1);
+ }
current_window_size_ms_ = window_size_ms;
EraseOld(now_ms);
return true;
}
-bool RateStatistics::IsInitialized() const {
- return oldest_time_ != -max_window_size_ms_;
-}
-
} // namespace webrtc
diff --git a/rtc_base/rate_statistics.h b/rtc_base/rate_statistics.h
index 11c8cee..dc8d7f5 100644
--- a/rtc_base/rate_statistics.h
+++ b/rtc_base/rate_statistics.h
@@ -14,6 +14,7 @@
#include <stddef.h>
#include <stdint.h>
+#include <deque>
#include <memory>
#include "absl/types/optional.h"
@@ -28,6 +29,10 @@
// high; for instance, a 20 Mbit/sec video stream can wrap a 32-bit byte
// counter in 14 minutes.
+// Note that timestamps used in Update(), Rate() and SetWindowSize() must never
+// decrease for two consecutive calls.
+// TODO(bugs.webrtc.org/11600): Migrate from int64_t to Timestamp.
+
class RTC_EXPORT RateStatistics {
public:
static constexpr float kBpsScale = 8000.0f;
@@ -65,19 +70,22 @@
private:
void EraseOld(int64_t now_ms);
- bool IsInitialized() const;
- // Counters are kept in buckets (circular buffer), with one bucket
- // per millisecond.
struct Bucket {
+ explicit Bucket(int64_t timestamp);
int64_t sum; // Sum of all samples in this bucket.
- int samples; // Number of samples in this bucket.
+ int num_samples; // Number of samples in this bucket.
+ const int64_t timestamp; // Timestamp this bucket corresponds to.
};
- std::unique_ptr<Bucket[]> buckets_;
+ // All buckets within the time window, ordered by time.
+ std::deque<Bucket> buckets_;
- // Total count recorded in buckets.
+ // Total count recorded in all buckets.
int64_t accumulated_count_;
+ // Timestamp of the first data point seen, or -1 of none seen.
+ int64_t first_timestamp_;
+
// True if accumulated_count_ has ever grown too large to be
// contained in its integer type.
bool overflow_ = false;
@@ -85,12 +93,6 @@
// The total number of samples in the buckets.
int num_samples_;
- // Oldest time recorded in buckets.
- int64_t oldest_time_;
-
- // Bucket index of oldest counter recorded in buckets.
- int64_t oldest_index_;
-
// To convert counts/ms to desired units
const float scale_;