RTCStatsCollector collecting stats on multiple threads.
Changes GetStatsReport to a callback-based function. Stats collection
is dispatched to three different stats collecting methods, being
invoked asynchronously on the signaling, worker and network threads.
The three resulting stats reports are merged into one before returned.
The only current stats being collected is on the signaling thread, but
a FakeRTCStatsCollector is able to test the multi-threaded and
stats-merging behaviors. Future CLs simply have to put their stats
collecting code in the appropriate ProducePartialResultsOnFooThread
method.
BUG=chromium:627816
Review-Url: https://codereview.webrtc.org/2270033004
Cr-Commit-Position: refs/heads/master@{#14064}
diff --git a/webrtc/stats/rtcstatscollector.cc b/webrtc/stats/rtcstatscollector.cc
index 4bedad6..1cdafd6 100644
--- a/webrtc/stats/rtcstatscollector.cc
+++ b/webrtc/stats/rtcstatscollector.cc
@@ -20,50 +20,144 @@
namespace webrtc {
-RTCStatsCollector::RTCStatsCollector(
- PeerConnection* pc,
- int64_t cache_lifetime_us)
+rtc::scoped_refptr<RTCStatsCollector> RTCStatsCollector::Create(
+ PeerConnection* pc, int64_t cache_lifetime_us) {
+ return rtc::scoped_refptr<RTCStatsCollector>(
+ new rtc::RefCountedObject<RTCStatsCollector>(pc, cache_lifetime_us));
+}
+
+RTCStatsCollector::RTCStatsCollector(PeerConnection* pc,
+ int64_t cache_lifetime_us)
: pc_(pc),
+ signaling_thread_(pc->session()->signaling_thread()),
+ worker_thread_(pc->session()->worker_thread()),
+ network_thread_(pc->session()->network_thread()),
+ num_pending_partial_reports_(0),
+ partial_report_timestamp_us_(0),
cache_timestamp_us_(0),
cache_lifetime_us_(cache_lifetime_us) {
RTC_DCHECK(pc_);
- RTC_DCHECK(IsOnSignalingThread());
+ RTC_DCHECK(signaling_thread_);
+ RTC_DCHECK(worker_thread_);
+ RTC_DCHECK(network_thread_);
RTC_DCHECK_GE(cache_lifetime_us_, 0);
}
-rtc::scoped_refptr<const RTCStatsReport> RTCStatsCollector::GetStatsReport() {
- RTC_DCHECK(IsOnSignalingThread());
+void RTCStatsCollector::GetStatsReport(
+ rtc::scoped_refptr<RTCStatsCollectorCallback> callback) {
+ RTC_DCHECK(signaling_thread_->IsCurrent());
+ RTC_DCHECK(callback);
+ callbacks_.push_back(callback);
+
// "Now" using a monotonically increasing timer.
int64_t cache_now_us = rtc::TimeMicros();
if (cached_report_ &&
cache_now_us - cache_timestamp_us_ <= cache_lifetime_us_) {
- return cached_report_;
+ // We have a fresh cached report to deliver.
+ DeliverCachedReport();
+ } else if (!num_pending_partial_reports_) {
+ // Only start gathering stats if we're not already gathering stats. In the
+ // case of already gathering stats, |callback_| will be invoked when there
+ // are no more pending partial reports.
+
+ // "Now" using a system clock, relative to the UNIX epoch (Jan 1, 1970,
+ // UTC), in microseconds. The system clock could be modified and is not
+ // necessarily monotonically increasing.
+ int64_t timestamp_us = static_cast<int64_t>(
+ rtc::Timing::WallTimeNow() * rtc::kNumMicrosecsPerSec);
+
+ num_pending_partial_reports_ = 3;
+ partial_report_timestamp_us_ = cache_now_us;
+ invoker_.AsyncInvoke<void>(RTC_FROM_HERE, signaling_thread_,
+ rtc::Bind(&RTCStatsCollector::ProducePartialResultsOnSignalingThread,
+ rtc::scoped_refptr<RTCStatsCollector>(this), timestamp_us));
+ invoker_.AsyncInvoke<void>(RTC_FROM_HERE, worker_thread_,
+ rtc::Bind(&RTCStatsCollector::ProducePartialResultsOnWorkerThread,
+ rtc::scoped_refptr<RTCStatsCollector>(this), timestamp_us));
+ invoker_.AsyncInvoke<void>(RTC_FROM_HERE, network_thread_,
+ rtc::Bind(&RTCStatsCollector::ProducePartialResultsOnNetworkThread,
+ rtc::scoped_refptr<RTCStatsCollector>(this), timestamp_us));
}
- cache_timestamp_us_ = cache_now_us;
- // "Now" using a system clock, relative to the UNIX epoch (Jan 1, 1970, UTC),
- // in microseconds. The system clock could be modified and is not necessarily
- // monotonically increasing.
- int64_t timestamp_us = static_cast<int64_t>(
- rtc::Timing::WallTimeNow() * rtc::kNumMicrosecsPerSec);
-
- rtc::scoped_refptr<RTCStatsReport> report = RTCStatsReport::Create();
- report->AddStats(ProducePeerConnectionStats(timestamp_us));
-
- cached_report_ = report;
- return cached_report_;
}
void RTCStatsCollector::ClearCachedStatsReport() {
- RTC_DCHECK(IsOnSignalingThread());
+ RTC_DCHECK(signaling_thread_->IsCurrent());
cached_report_ = nullptr;
}
-bool RTCStatsCollector::IsOnSignalingThread() const {
- return pc_->session()->signaling_thread()->IsCurrent();
+void RTCStatsCollector::ProducePartialResultsOnSignalingThread(
+ int64_t timestamp_us) {
+ RTC_DCHECK(signaling_thread_->IsCurrent());
+ rtc::scoped_refptr<RTCStatsReport> report = RTCStatsReport::Create();
+
+ report->AddStats(ProducePeerConnectionStats_s(timestamp_us));
+
+ AddPartialResults(report);
+}
+
+void RTCStatsCollector::ProducePartialResultsOnWorkerThread(
+ int64_t timestamp_us) {
+ RTC_DCHECK(worker_thread_->IsCurrent());
+ rtc::scoped_refptr<RTCStatsReport> report = RTCStatsReport::Create();
+
+ // TODO(hbos): Gather stats on worker thread.
+
+ AddPartialResults(report);
+}
+
+void RTCStatsCollector::ProducePartialResultsOnNetworkThread(
+ int64_t timestamp_us) {
+ RTC_DCHECK(network_thread_->IsCurrent());
+ rtc::scoped_refptr<RTCStatsReport> report = RTCStatsReport::Create();
+
+ // TODO(hbos): Gather stats on network thread.
+
+ AddPartialResults(report);
+}
+
+void RTCStatsCollector::AddPartialResults(
+ const rtc::scoped_refptr<RTCStatsReport>& partial_report) {
+ if (!signaling_thread_->IsCurrent()) {
+ invoker_.AsyncInvoke<void>(RTC_FROM_HERE, signaling_thread_,
+ rtc::Bind(&RTCStatsCollector::AddPartialResults_s,
+ rtc::scoped_refptr<RTCStatsCollector>(this),
+ partial_report));
+ return;
+ }
+ AddPartialResults_s(partial_report);
+}
+
+void RTCStatsCollector::AddPartialResults_s(
+ rtc::scoped_refptr<RTCStatsReport> partial_report) {
+ RTC_DCHECK(signaling_thread_->IsCurrent());
+ RTC_DCHECK_GT(num_pending_partial_reports_, 0);
+ if (!partial_report_)
+ partial_report_ = partial_report;
+ else
+ partial_report_->TakeMembersFrom(partial_report);
+ --num_pending_partial_reports_;
+ if (!num_pending_partial_reports_) {
+ cache_timestamp_us_ = partial_report_timestamp_us_;
+ cached_report_ = partial_report_;
+ partial_report_ = nullptr;
+ DeliverCachedReport();
+ }
+}
+
+void RTCStatsCollector::DeliverCachedReport() {
+ RTC_DCHECK(signaling_thread_->IsCurrent());
+ RTC_DCHECK(!callbacks_.empty());
+ RTC_DCHECK(cached_report_);
+ for (const rtc::scoped_refptr<RTCStatsCollectorCallback>& callback :
+ callbacks_) {
+ callback->OnStatsDelivered(cached_report_);
+ }
+ callbacks_.clear();
}
std::unique_ptr<RTCPeerConnectionStats>
-RTCStatsCollector::ProducePeerConnectionStats(int64_t timestamp_us) const {
+RTCStatsCollector::ProducePeerConnectionStats_s(int64_t timestamp_us) const {
+ RTC_DCHECK(signaling_thread_->IsCurrent());
// TODO(hbos): If data channels are removed from the peer connection this will
// yield incorrect counts. Address before closing crbug.com/636818. See
// https://w3c.github.io/webrtc-stats/webrtc-stats.html#pcstats-dict*.