Add ability to handle data from multiple streams in RateAccCounter.
BUG=webrtc:5283
Review-Url: https://codereview.webrtc.org/2235223002
Cr-Commit-Position: refs/heads/master@{#14864}
diff --git a/webrtc/video/stats_counter.cc b/webrtc/video/stats_counter.cc
index f1b82e8..9923ed2 100644
--- a/webrtc/video/stats_counter.cc
+++ b/webrtc/video/stats_counter.cc
@@ -11,6 +11,8 @@
#include "webrtc/video/stats_counter.h"
#include <algorithm>
+#include <limits>
+#include <map>
#include "webrtc/base/checks.h"
#include "webrtc/system_wrappers/include/clock.h"
@@ -20,6 +22,7 @@
namespace {
// Default periodic time interval for processing samples.
const int64_t kDefaultProcessIntervalMs = 2000;
+const uint32_t kStreamId0 = 0;
} // namespace
// Class holding periodically computed metrics.
@@ -62,19 +65,98 @@
AggregatedStats stats_;
};
+// Class holding gathered samples within a process interval.
+class Samples {
+ public:
+ Samples() : total_count_(0) {}
+ ~Samples() {}
+
+ void Add(int sample, uint32_t stream_id) {
+ samples_[stream_id].Add(sample);
+ ++total_count_;
+ }
+ void Set(int sample, uint32_t stream_id) {
+ samples_[stream_id].Set(sample);
+ ++total_count_;
+ }
+
+ int64_t Count() const { return total_count_; }
+ bool Empty() const { return total_count_ == 0; }
+
+ int64_t Sum() const {
+ int64_t sum = 0;
+ for (const auto& it : samples_)
+ sum += it.second.sum_;
+ return sum;
+ }
+
+ int Max() const {
+ int max = std::numeric_limits<int>::min();
+ for (const auto& it : samples_)
+ max = std::max(it.second.max_, max);
+ return max;
+ }
+
+ void Reset() {
+ for (auto& it : samples_)
+ it.second.Reset();
+ total_count_ = 0;
+ }
+
+ int64_t Diff() const {
+ int64_t sum_diff = 0;
+ int count = 0;
+ for (const auto& it : samples_) {
+ if (it.second.count_ > 0) {
+ int64_t diff = it.second.sum_ - it.second.last_sum_;
+ if (diff >= 0) {
+ sum_diff += diff;
+ ++count;
+ }
+ }
+ }
+ return (count > 0) ? sum_diff : -1;
+ }
+
+ private:
+ struct Stats {
+ void Add(int sample) {
+ sum_ += sample;
+ ++count_;
+ max_ = std::max(sample, max_);
+ }
+ void Set(int sample) {
+ sum_ = sample;
+ ++count_;
+ }
+ void Reset() {
+ if (count_ > 0)
+ last_sum_ = sum_;
+ sum_ = 0;
+ count_ = 0;
+ max_ = std::numeric_limits<int>::min();
+ }
+
+ int max_ = std::numeric_limits<int>::min();
+ int64_t count_ = 0;
+ int64_t sum_ = 0;
+ int64_t last_sum_ = 0;
+ };
+
+ int64_t total_count_;
+ std::map<uint32_t, Stats> samples_; // Gathered samples mapped by stream id.
+};
+
// StatsCounter class.
StatsCounter::StatsCounter(Clock* clock,
int64_t process_intervals_ms,
bool include_empty_intervals,
StatsCounterObserver* observer)
- : max_(0),
- sum_(0),
- num_samples_(0),
- last_sum_(0),
- aggregated_counter_(new AggregatedCounter()),
+ : include_empty_intervals_(include_empty_intervals),
process_intervals_ms_(process_intervals_ms),
+ aggregated_counter_(new AggregatedCounter()),
+ samples_(new Samples()),
clock_(clock),
- include_empty_intervals_(include_empty_intervals),
observer_(observer),
last_process_time_ms_(-1),
paused_(false) {
@@ -120,21 +202,15 @@
return true;
}
-void StatsCounter::Set(int sample) {
+void StatsCounter::Set(int sample, uint32_t stream_id) {
TryProcess();
- ++num_samples_;
- sum_ = sample;
+ samples_->Set(sample, stream_id);
paused_ = false;
}
void StatsCounter::Add(int sample) {
TryProcess();
- ++num_samples_;
- sum_ += sample;
-
- if (num_samples_ == 1)
- max_ = sample;
- max_ = std::max(sample, max_);
+ samples_->Add(sample, kStreamId0);
paused_ = false;
}
@@ -164,17 +240,13 @@
// If there are no samples, all elapsed intervals are empty (otherwise one
// interval contains sample(s), discard this interval).
int empty_intervals =
- (num_samples_ == 0) ? elapsed_intervals : (elapsed_intervals - 1);
+ samples_->Empty() ? elapsed_intervals : (elapsed_intervals - 1);
ReportMetricToAggregatedCounter(GetValueForEmptyInterval(),
empty_intervals);
}
// Reset samples for elapsed interval.
- if (num_samples_ > 0)
- last_sum_ = sum_;
- sum_ = 0;
- max_ = 0;
- num_samples_ = 0;
+ samples_->Reset();
}
bool StatsCounter::IncludeEmptyIntervals() const {
@@ -195,9 +267,11 @@
}
bool AvgCounter::GetMetric(int* metric) const {
- if (num_samples_ == 0)
+ int64_t count = samples_->Count();
+ if (count == 0)
return false;
- *metric = (sum_ + num_samples_ / 2) / num_samples_;
+
+ *metric = (samples_->Sum() + count / 2) / count;
return true;
}
@@ -218,9 +292,10 @@
}
bool MaxCounter::GetMetric(int* metric) const {
- if (num_samples_ == 0)
+ if (samples_->Empty())
return false;
- *metric = max_;
+
+ *metric = samples_->Max();
return true;
}
@@ -240,9 +315,11 @@
}
bool PercentCounter::GetMetric(int* metric) const {
- if (num_samples_ == 0)
+ int64_t count = samples_->Count();
+ if (count == 0)
return false;
- *metric = (sum_ * 100 + num_samples_ / 2) / num_samples_;
+
+ *metric = (samples_->Sum() * 100 + count / 2) / count;
return true;
}
@@ -262,9 +339,11 @@
}
bool PermilleCounter::GetMetric(int* metric) const {
- if (num_samples_ == 0)
+ int64_t count = samples_->Count();
+ if (count == 0)
return false;
- *metric = (sum_ * 1000 + num_samples_ / 2) / num_samples_;
+
+ *metric = (samples_->Sum() * 1000 + count / 2) / count;
return true;
}
@@ -286,9 +365,11 @@
}
bool RateCounter::GetMetric(int* metric) const {
- if (num_samples_ == 0)
+ if (samples_->Empty())
return false;
- *metric = (sum_ * 1000 + process_intervals_ms_ / 2) / process_intervals_ms_;
+
+ *metric = (samples_->Sum() * 1000 + process_intervals_ms_ / 2) /
+ process_intervals_ms_;
return true;
}
@@ -304,15 +385,16 @@
include_empty_intervals,
observer) {}
-void RateAccCounter::Set(int sample) {
- StatsCounter::Set(sample);
+void RateAccCounter::Set(int sample, uint32_t stream_id) {
+ StatsCounter::Set(sample, stream_id);
}
bool RateAccCounter::GetMetric(int* metric) const {
- if (num_samples_ == 0 || last_sum_ > sum_)
+ int64_t diff = samples_->Diff();
+ if (diff < 0 || (!include_empty_intervals_ && diff == 0))
return false;
- *metric = ((sum_ - last_sum_) * 1000 + process_intervals_ms_ / 2) /
- process_intervals_ms_;
+
+ *metric = (diff * 1000 + process_intervals_ms_ / 2) / process_intervals_ms_;
return true;
}
diff --git a/webrtc/video/stats_counter.h b/webrtc/video/stats_counter.h
index c6e6151..08663f8 100644
--- a/webrtc/video/stats_counter.h
+++ b/webrtc/video/stats_counter.h
@@ -20,6 +20,7 @@
class AggregatedCounter;
class Clock;
+class Samples;
// |StatsCounterObserver| is called periodically when a metric is updated.
class StatsCounterObserver {
@@ -104,15 +105,12 @@
StatsCounterObserver* observer);
void Add(int sample);
- void Set(int sample);
+ void Set(int sample, uint32_t stream_id);
- int max_;
- int64_t sum_;
- int64_t num_samples_;
- int64_t last_sum_;
-
- const std::unique_ptr<AggregatedCounter> aggregated_counter_;
+ const bool include_empty_intervals_;
const int64_t process_intervals_ms_;
+ const std::unique_ptr<AggregatedCounter> aggregated_counter_;
+ const std::unique_ptr<Samples> samples_;
private:
bool TimeToProcess(int* num_elapsed_intervals);
@@ -121,7 +119,6 @@
bool IncludeEmptyIntervals() const;
Clock* const clock_;
- const bool include_empty_intervals_;
const std::unique_ptr<StatsCounterObserver> observer_;
int64_t last_process_time_ms_;
bool paused_;
@@ -262,7 +259,7 @@
bool include_empty_intervals);
~RateAccCounter() override {}
- void Set(int sample);
+ void Set(int sample, uint32_t stream_id);
private:
bool GetMetric(int* metric) const override;
diff --git a/webrtc/video/stats_counter_unittest.cc b/webrtc/video/stats_counter_unittest.cc
index 3c9633a..7484c18 100644
--- a/webrtc/video/stats_counter_unittest.cc
+++ b/webrtc/video/stats_counter_unittest.cc
@@ -16,6 +16,7 @@
namespace webrtc {
namespace {
const int kDefaultProcessIntervalMs = 2000;
+const uint32_t kStreamId = 123456;
class StatsCounterObserverImpl : public StatsCounterObserver {
public:
@@ -42,7 +43,7 @@
void SetSampleAndAdvance(int sample,
int interval_ms,
RateAccCounter* counter) {
- counter->Set(sample);
+ counter->Set(sample, kStreamId);
clock_.AdvanceTimeMilliseconds(interval_ms);
}
@@ -197,11 +198,11 @@
TEST_F(StatsCounterTest, TestMetric_RateAccCounter) {
StatsCounterObserverImpl* observer = new StatsCounterObserverImpl();
RateAccCounter counter(&clock_, observer, true);
- counter.Set(175);
- counter.Set(188);
+ counter.Set(175, kStreamId);
+ counter.Set(188, kStreamId);
clock_.AdvanceTimeMilliseconds(kDefaultProcessIntervalMs);
// Trigger process (sample included in next interval).
- counter.Set(192);
+ counter.Set(192, kStreamId);
// Rate per interval: (188 - 0) / 2 sec = 94 samples/sec
EXPECT_EQ(1, observer->num_calls_);
EXPECT_EQ(94, observer->last_sample_);
@@ -212,6 +213,37 @@
EXPECT_EQ(94, stats.max);
}
+TEST_F(StatsCounterTest, TestMetric_RateAccCounterWithMultipleStreamIds) {
+ StatsCounterObserverImpl* observer = new StatsCounterObserverImpl();
+ RateAccCounter counter(&clock_, observer, true);
+ counter.Set(175, kStreamId);
+ counter.Set(188, kStreamId);
+ counter.Set(100, kStreamId + 1);
+ clock_.AdvanceTimeMilliseconds(kDefaultProcessIntervalMs);
+ // Trigger process (sample included in next interval).
+ counter.Set(150, kStreamId + 1);
+ // Rate per interval: ((188 - 0) + (100 - 0)) / 2 sec = 144 samples/sec
+ EXPECT_EQ(1, observer->num_calls_);
+ EXPECT_EQ(144, observer->last_sample_);
+ clock_.AdvanceTimeMilliseconds(kDefaultProcessIntervalMs);
+ // Trigger process (sample included in next interval).
+ counter.Set(198, kStreamId);
+ // Rate per interval: (0 + (150 - 100)) / 2 sec = 25 samples/sec
+ EXPECT_EQ(2, observer->num_calls_);
+ EXPECT_EQ(25, observer->last_sample_);
+ clock_.AdvanceTimeMilliseconds(kDefaultProcessIntervalMs);
+ // Trigger process (sample included in next interval).
+ counter.Set(200, kStreamId);
+ // Rate per interval: ((198 - 188) + (0)) / 2 sec = 5 samples/sec
+ EXPECT_EQ(3, observer->num_calls_);
+ EXPECT_EQ(5, observer->last_sample_);
+ // Aggregated stats.
+ AggregatedStats stats = counter.GetStats();
+ EXPECT_EQ(3, stats.num_samples);
+ EXPECT_EQ(5, stats.min);
+ EXPECT_EQ(144, stats.max);
+}
+
TEST_F(StatsCounterTest, TestGetStats_MultipleIntervals) {
AvgCounter counter(&clock_, nullptr, false);
const int kSample1 = 1;
@@ -266,7 +298,7 @@
EXPECT_EQ(1, observer->num_calls_);
EXPECT_EQ(100, observer->last_sample_);
// Trigger process (sample included in next interval).
- counter.Set(2000);
+ counter.Set(2000, kStreamId);
EXPECT_EQ(2, observer->num_calls_);
EXPECT_EQ(300, observer->last_sample_);
// Aggregated stats.
@@ -386,7 +418,7 @@
clock_.AdvanceTimeMilliseconds(kDefaultProcessIntervalMs);
VerifyStatsIsNotSet(counter.ProcessAndGetStats());
// Add sample and advance 3 intervals (2 w/o samples -> zero reported).
- counter.Set(12);
+ counter.Set(12, kStreamId);
clock_.AdvanceTimeMilliseconds(kDefaultProcessIntervalMs * 4 - 1);
// Trigger process and verify stats: [0:2][6:1]
counter.ProcessAndGetStats();
@@ -399,7 +431,7 @@
EXPECT_EQ(0, observer->last_sample_);
// Insert sample and advance non-complete interval, no change, [0:3][6:1]
clock_.AdvanceTimeMilliseconds(kDefaultProcessIntervalMs - 1);
- counter.Set(60);
+ counter.Set(60, kStreamId);
EXPECT_EQ(4, observer->num_calls_);
// Make next interval pass, [0:3][6:1][24:1]
clock_.AdvanceTimeMilliseconds(1);
@@ -415,7 +447,7 @@
StatsCounterObserverImpl* observer = new StatsCounterObserverImpl();
RateAccCounter counter(&clock_, observer, false);
// Add sample and advance 3 intervals (2 w/o samples -> ignored).
- counter.Set(12);
+ counter.Set(12, kStreamId);
clock_.AdvanceTimeMilliseconds(kDefaultProcessIntervalMs * 4 - 1);
// Trigger process and verify stats: [6:1]
counter.ProcessAndGetStats();
@@ -427,7 +459,7 @@
EXPECT_EQ(1, observer->num_calls_);
// Insert sample and advance non-complete interval, no change, [6:1]
clock_.AdvanceTimeMilliseconds(kDefaultProcessIntervalMs - 1);
- counter.Set(60);
+ counter.Set(60, kStreamId);
counter.ProcessAndGetStats();
EXPECT_EQ(1, observer->num_calls_);
// Make next interval pass, [6:1][24:1]