RTCStatsCollector collecting stats on multiple threads.

Changes GetStatsReport to a callback-based function. Stats collection
is dispatched to three different stats collecting methods, being
invoked asynchronously on the signaling, worker and network threads.
The three resulting stats reports are merged into one before returned.

The only current stats being collected is on the signaling thread, but
a FakeRTCStatsCollector is able to test the multi-threaded and
stats-merging behaviors. Future CLs simply have to put their stats
collecting code in the appropriate ProducePartialResultsOnFooThread
method.

BUG=chromium:627816

Review-Url: https://codereview.webrtc.org/2270033004
Cr-Commit-Position: refs/heads/master@{#14064}
diff --git a/webrtc/stats/rtcstatscollector.cc b/webrtc/stats/rtcstatscollector.cc
index 4bedad6..1cdafd6 100644
--- a/webrtc/stats/rtcstatscollector.cc
+++ b/webrtc/stats/rtcstatscollector.cc
@@ -20,50 +20,144 @@
 
 namespace webrtc {
 
-RTCStatsCollector::RTCStatsCollector(
-    PeerConnection* pc,
-    int64_t cache_lifetime_us)
+rtc::scoped_refptr<RTCStatsCollector> RTCStatsCollector::Create(
+    PeerConnection* pc, int64_t cache_lifetime_us) {
+  return rtc::scoped_refptr<RTCStatsCollector>(
+      new rtc::RefCountedObject<RTCStatsCollector>(pc, cache_lifetime_us));
+}
+
+RTCStatsCollector::RTCStatsCollector(PeerConnection* pc,
+                                     int64_t cache_lifetime_us)
     : pc_(pc),
+      signaling_thread_(pc->session()->signaling_thread()),
+      worker_thread_(pc->session()->worker_thread()),
+      network_thread_(pc->session()->network_thread()),
+      num_pending_partial_reports_(0),
+      partial_report_timestamp_us_(0),
       cache_timestamp_us_(0),
       cache_lifetime_us_(cache_lifetime_us) {
   RTC_DCHECK(pc_);
-  RTC_DCHECK(IsOnSignalingThread());
+  RTC_DCHECK(signaling_thread_);
+  RTC_DCHECK(worker_thread_);
+  RTC_DCHECK(network_thread_);
   RTC_DCHECK_GE(cache_lifetime_us_, 0);
 }
 
-rtc::scoped_refptr<const RTCStatsReport> RTCStatsCollector::GetStatsReport() {
-  RTC_DCHECK(IsOnSignalingThread());
+void RTCStatsCollector::GetStatsReport(
+    rtc::scoped_refptr<RTCStatsCollectorCallback> callback) {
+  RTC_DCHECK(signaling_thread_->IsCurrent());
+  RTC_DCHECK(callback);
+  callbacks_.push_back(callback);
+
   // "Now" using a monotonically increasing timer.
   int64_t cache_now_us = rtc::TimeMicros();
   if (cached_report_ &&
       cache_now_us - cache_timestamp_us_ <= cache_lifetime_us_) {
-    return cached_report_;
+    // We have a fresh cached report to deliver.
+    DeliverCachedReport();
+  } else if (!num_pending_partial_reports_) {
+    // Only start gathering stats if we're not already gathering stats. In the
+    // case of already gathering stats, |callback_| will be invoked when there
+    // are no more pending partial reports.
+
+    // "Now" using a system clock, relative to the UNIX epoch (Jan 1, 1970,
+    // UTC), in microseconds. The system clock could be modified and is not
+    // necessarily monotonically increasing.
+    int64_t timestamp_us = static_cast<int64_t>(
+        rtc::Timing::WallTimeNow() * rtc::kNumMicrosecsPerSec);
+
+    num_pending_partial_reports_ = 3;
+    partial_report_timestamp_us_ = cache_now_us;
+    invoker_.AsyncInvoke<void>(RTC_FROM_HERE, signaling_thread_,
+        rtc::Bind(&RTCStatsCollector::ProducePartialResultsOnSignalingThread,
+            rtc::scoped_refptr<RTCStatsCollector>(this), timestamp_us));
+    invoker_.AsyncInvoke<void>(RTC_FROM_HERE, worker_thread_,
+        rtc::Bind(&RTCStatsCollector::ProducePartialResultsOnWorkerThread,
+            rtc::scoped_refptr<RTCStatsCollector>(this), timestamp_us));
+    invoker_.AsyncInvoke<void>(RTC_FROM_HERE, network_thread_,
+        rtc::Bind(&RTCStatsCollector::ProducePartialResultsOnNetworkThread,
+            rtc::scoped_refptr<RTCStatsCollector>(this), timestamp_us));
   }
-  cache_timestamp_us_ = cache_now_us;
-  // "Now" using a system clock, relative to the UNIX epoch (Jan 1, 1970, UTC),
-  // in microseconds. The system clock could be modified and is not necessarily
-  // monotonically increasing.
-  int64_t timestamp_us = static_cast<int64_t>(
-      rtc::Timing::WallTimeNow() * rtc::kNumMicrosecsPerSec);
-
-  rtc::scoped_refptr<RTCStatsReport> report = RTCStatsReport::Create();
-  report->AddStats(ProducePeerConnectionStats(timestamp_us));
-
-  cached_report_ = report;
-  return cached_report_;
 }
 
 void RTCStatsCollector::ClearCachedStatsReport() {
-  RTC_DCHECK(IsOnSignalingThread());
+  RTC_DCHECK(signaling_thread_->IsCurrent());
   cached_report_ = nullptr;
 }
 
-bool RTCStatsCollector::IsOnSignalingThread() const {
-  return pc_->session()->signaling_thread()->IsCurrent();
+void RTCStatsCollector::ProducePartialResultsOnSignalingThread(
+    int64_t timestamp_us) {
+  RTC_DCHECK(signaling_thread_->IsCurrent());
+  rtc::scoped_refptr<RTCStatsReport> report = RTCStatsReport::Create();
+
+  report->AddStats(ProducePeerConnectionStats_s(timestamp_us));
+
+  AddPartialResults(report);
+}
+
+void RTCStatsCollector::ProducePartialResultsOnWorkerThread(
+    int64_t timestamp_us) {
+  RTC_DCHECK(worker_thread_->IsCurrent());
+  rtc::scoped_refptr<RTCStatsReport> report = RTCStatsReport::Create();
+
+  // TODO(hbos): Gather stats on worker thread.
+
+  AddPartialResults(report);
+}
+
+void RTCStatsCollector::ProducePartialResultsOnNetworkThread(
+    int64_t timestamp_us) {
+  RTC_DCHECK(network_thread_->IsCurrent());
+  rtc::scoped_refptr<RTCStatsReport> report = RTCStatsReport::Create();
+
+  // TODO(hbos): Gather stats on network thread.
+
+  AddPartialResults(report);
+}
+
+void RTCStatsCollector::AddPartialResults(
+    const rtc::scoped_refptr<RTCStatsReport>& partial_report) {
+  if (!signaling_thread_->IsCurrent()) {
+    invoker_.AsyncInvoke<void>(RTC_FROM_HERE, signaling_thread_,
+        rtc::Bind(&RTCStatsCollector::AddPartialResults_s,
+                  rtc::scoped_refptr<RTCStatsCollector>(this),
+                  partial_report));
+    return;
+  }
+  AddPartialResults_s(partial_report);
+}
+
+void RTCStatsCollector::AddPartialResults_s(
+    rtc::scoped_refptr<RTCStatsReport> partial_report) {
+  RTC_DCHECK(signaling_thread_->IsCurrent());
+  RTC_DCHECK_GT(num_pending_partial_reports_, 0);
+  if (!partial_report_)
+    partial_report_ = partial_report;
+  else
+    partial_report_->TakeMembersFrom(partial_report);
+  --num_pending_partial_reports_;
+  if (!num_pending_partial_reports_) {
+    cache_timestamp_us_ = partial_report_timestamp_us_;
+    cached_report_ = partial_report_;
+    partial_report_ = nullptr;
+    DeliverCachedReport();
+  }
+}
+
+void RTCStatsCollector::DeliverCachedReport() {
+  RTC_DCHECK(signaling_thread_->IsCurrent());
+  RTC_DCHECK(!callbacks_.empty());
+  RTC_DCHECK(cached_report_);
+  for (const rtc::scoped_refptr<RTCStatsCollectorCallback>& callback :
+       callbacks_) {
+    callback->OnStatsDelivered(cached_report_);
+  }
+  callbacks_.clear();
 }
 
 std::unique_ptr<RTCPeerConnectionStats>
-RTCStatsCollector::ProducePeerConnectionStats(int64_t timestamp_us) const {
+RTCStatsCollector::ProducePeerConnectionStats_s(int64_t timestamp_us) const {
+  RTC_DCHECK(signaling_thread_->IsCurrent());
   // TODO(hbos): If data channels are removed from the peer connection this will
   // yield incorrect counts. Address before closing crbug.com/636818. See
   // https://w3c.github.io/webrtc-stats/webrtc-stats.html#pcstats-dict*.
diff --git a/webrtc/stats/rtcstatscollector.h b/webrtc/stats/rtcstatscollector.h
index 735421d..ee59a10 100644
--- a/webrtc/stats/rtcstatscollector.h
+++ b/webrtc/stats/rtcstatscollector.h
@@ -12,9 +12,12 @@
 #define WEBRTC_STATS_RTCSTATSCOLLECTOR_H_
 
 #include <memory>
+#include <vector>
 
 #include "webrtc/api/rtcstats_objects.h"
 #include "webrtc/api/rtcstatsreport.h"
+#include "webrtc/base/asyncinvoker.h"
+#include "webrtc/base/refcount.h"
 #include "webrtc/base/scoped_ref_ptr.h"
 #include "webrtc/base/timeutils.h"
 
@@ -22,11 +25,21 @@
 
 class PeerConnection;
 
-// All calls to the collector and gathering of stats is performed on the
-// signaling thread. A stats report is cached for |cache_lifetime_| ms.
-class RTCStatsCollector {
+class RTCStatsCollectorCallback : public virtual rtc::RefCountInterface {
  public:
-  explicit RTCStatsCollector(
+  virtual ~RTCStatsCollectorCallback() {}
+
+  virtual void OnStatsDelivered(
+      const rtc::scoped_refptr<const RTCStatsReport>& report) = 0;
+};
+
+// All public methods of the collector are to be called on the signaling thread.
+// Stats are gathered on the signaling, worker and network threads
+// asynchronously. The callback is invoked on the signaling thread. Resulting
+// reports are cached for |cache_lifetime_| ms.
+class RTCStatsCollector : public virtual rtc::RefCountInterface {
+ public:
+  static rtc::scoped_refptr<RTCStatsCollector> Create(
       PeerConnection* pc,
       int64_t cache_lifetime_us = 50 * rtc::kNumMicrosecsPerMillisec);
 
@@ -34,18 +47,42 @@
   // it is returned, otherwise new stats are gathered and returned. A report is
   // considered fresh for |cache_lifetime_| ms. const RTCStatsReports are safe
   // to use across multiple threads and may be destructed on any thread.
-  rtc::scoped_refptr<const RTCStatsReport> GetStatsReport();
+  void GetStatsReport(rtc::scoped_refptr<RTCStatsCollectorCallback> callback);
   // Clears the cache's reference to the most recent stats report. Subsequently
   // calling |GetStatsReport| guarantees fresh stats.
   void ClearCachedStatsReport();
 
- private:
-  bool IsOnSignalingThread() const;
+ protected:
+  RTCStatsCollector(PeerConnection* pc, int64_t cache_lifetime_us);
 
-  std::unique_ptr<RTCPeerConnectionStats> ProducePeerConnectionStats(
+  // Stats gathering on a particular thread. Calls |AddPartialResults| before
+  // returning. Virtual for the sake of testing.
+  virtual void ProducePartialResultsOnSignalingThread(int64_t timestamp_us);
+  virtual void ProducePartialResultsOnWorkerThread(int64_t timestamp_us);
+  virtual void ProducePartialResultsOnNetworkThread(int64_t timestamp_us);
+
+  // Can be called on any thread.
+  void AddPartialResults(
+      const rtc::scoped_refptr<RTCStatsReport>& partial_report);
+
+ private:
+  void AddPartialResults_s(rtc::scoped_refptr<RTCStatsReport> partial_report);
+  void DeliverCachedReport();
+
+  std::unique_ptr<RTCPeerConnectionStats> ProducePeerConnectionStats_s(
       int64_t timestamp_us) const;
 
   PeerConnection* const pc_;
+  rtc::Thread* const signaling_thread_;
+  rtc::Thread* const worker_thread_;
+  rtc::Thread* const network_thread_;
+  rtc::AsyncInvoker invoker_;
+
+  int num_pending_partial_reports_;
+  int64_t partial_report_timestamp_us_;
+  rtc::scoped_refptr<RTCStatsReport> partial_report_;
+  std::vector<rtc::scoped_refptr<RTCStatsCollectorCallback>> callbacks_;
+
   // A timestamp, in microseconds, that is based on a timer that is
   // monotonically increasing. That is, even if the system clock is modified the
   // difference between the timer and this timestamp is how fresh the cached
diff --git a/webrtc/stats/rtcstatscollector_unittest.cc b/webrtc/stats/rtcstatscollector_unittest.cc
index f917a75..1ead77b 100644
--- a/webrtc/stats/rtcstatscollector_unittest.cc
+++ b/webrtc/stats/rtcstatscollector_unittest.cc
@@ -24,6 +24,7 @@
 #include "webrtc/base/fakeclock.h"
 #include "webrtc/base/gunit.h"
 #include "webrtc/base/logging.h"
+#include "webrtc/base/thread_checker.h"
 #include "webrtc/base/timedelta.h"
 #include "webrtc/base/timeutils.h"
 #include "webrtc/base/timing.h"
@@ -34,9 +35,13 @@
 
 namespace webrtc {
 
-class RTCStatsCollectorTester : public SetSessionDescriptionObserver {
+namespace {
+
+const int64_t kGetStatsReportTimeoutMs = 1000;
+
+class RTCStatsCollectorTestHelper : public SetSessionDescriptionObserver {
  public:
-  RTCStatsCollectorTester()
+  RTCStatsCollectorTestHelper()
       : worker_thread_(rtc::Thread::Current()),
         network_thread_(rtc::Thread::Current()),
         channel_manager_(new cricket::ChannelManager(
@@ -77,41 +82,241 @@
   std::vector<rtc::scoped_refptr<DataChannel>> data_channels_;
 };
 
-class RTCStatsCollectorTest : public testing::Test {
+class RTCTestStats : public RTCStats {
  public:
-  RTCStatsCollectorTest()
-    : test_(new rtc::RefCountedObject<RTCStatsCollectorTester>()),
-      collector_(&test_->pc(), 50 * rtc::kNumMicrosecsPerMillisec) {
+  RTCTestStats(const std::string& id, int64_t timestamp_us)
+      : RTCStats(id, timestamp_us),
+        dummy_stat("dummyStat") {}
+
+  WEBRTC_RTCSTATS_IMPL(RTCStats, RTCTestStats,
+      &dummy_stat);
+
+  RTCStatsMember<int32_t> dummy_stat;
+};
+
+const char RTCTestStats::kType[] = "test-stats";
+
+// Overrides the stats collection to verify thread usage and that the resulting
+// partial reports are merged.
+class FakeRTCStatsCollector : public RTCStatsCollector,
+                              public RTCStatsCollectorCallback {
+ public:
+  static rtc::scoped_refptr<FakeRTCStatsCollector> Create(
+      PeerConnection* pc,
+      int64_t cache_lifetime_us) {
+    return rtc::scoped_refptr<FakeRTCStatsCollector>(
+        new rtc::RefCountedObject<FakeRTCStatsCollector>(
+            pc, cache_lifetime_us));
+  }
+
+  // RTCStatsCollectorCallback implementation.
+  void OnStatsDelivered(
+      const rtc::scoped_refptr<const RTCStatsReport>& report) override {
+    EXPECT_TRUE(signaling_thread_->IsCurrent());
+    rtc::CritScope cs(&lock_);
+    delivered_report_ = report;
+  }
+
+  void VerifyThreadUsageAndResultsMerging() {
+    GetStatsReport(rtc::scoped_refptr<RTCStatsCollectorCallback>(this));
+    EXPECT_TRUE_WAIT(HasVerifiedResults(), kGetStatsReportTimeoutMs);
+  }
+
+  bool HasVerifiedResults() {
+    EXPECT_TRUE(signaling_thread_->IsCurrent());
+    rtc::CritScope cs(&lock_);
+    if (!delivered_report_)
+      return false;
+    EXPECT_EQ(produced_on_signaling_thread_, 1);
+    EXPECT_EQ(produced_on_worker_thread_, 1);
+    EXPECT_EQ(produced_on_network_thread_, 1);
+
+    EXPECT_TRUE(delivered_report_->Get("SignalingThreadStats"));
+    EXPECT_TRUE(delivered_report_->Get("WorkerThreadStats"));
+    EXPECT_TRUE(delivered_report_->Get("NetworkThreadStats"));
+
+    produced_on_signaling_thread_ = 0;
+    produced_on_worker_thread_ = 0;
+    produced_on_network_thread_ = 0;
+    delivered_report_ = nullptr;
+    return true;
   }
 
  protected:
-  rtc::scoped_refptr<RTCStatsCollectorTester> test_;
-  RTCStatsCollector collector_;
+  FakeRTCStatsCollector(
+      PeerConnection* pc,
+      int64_t cache_lifetime)
+      : RTCStatsCollector(pc, cache_lifetime),
+        signaling_thread_(pc->session()->signaling_thread()),
+        worker_thread_(pc->session()->worker_thread()),
+        network_thread_(pc->session()->network_thread()) {
+  }
+
+  void ProducePartialResultsOnSignalingThread(int64_t timestamp_us) override {
+    EXPECT_TRUE(signaling_thread_->IsCurrent());
+    {
+      rtc::CritScope cs(&lock_);
+      EXPECT_FALSE(delivered_report_);
+      ++produced_on_signaling_thread_;
+    }
+
+    rtc::scoped_refptr<RTCStatsReport> signaling_report =
+        RTCStatsReport::Create();
+    signaling_report->AddStats(std::unique_ptr<const RTCStats>(
+        new RTCTestStats("SignalingThreadStats", timestamp_us)));
+    AddPartialResults(signaling_report);
+  }
+  void ProducePartialResultsOnWorkerThread(int64_t timestamp_us) override {
+    EXPECT_TRUE(worker_thread_->IsCurrent());
+    {
+      rtc::CritScope cs(&lock_);
+      EXPECT_FALSE(delivered_report_);
+      ++produced_on_worker_thread_;
+    }
+
+    rtc::scoped_refptr<RTCStatsReport> worker_report = RTCStatsReport::Create();
+    worker_report->AddStats(std::unique_ptr<const RTCStats>(
+        new RTCTestStats("WorkerThreadStats", timestamp_us)));
+    AddPartialResults(worker_report);
+  }
+  void ProducePartialResultsOnNetworkThread(int64_t timestamp_us) override {
+    EXPECT_TRUE(network_thread_->IsCurrent());
+    {
+      rtc::CritScope cs(&lock_);
+      EXPECT_FALSE(delivered_report_);
+      ++produced_on_network_thread_;
+    }
+
+    rtc::scoped_refptr<RTCStatsReport> network_report =
+        RTCStatsReport::Create();
+    network_report->AddStats(std::unique_ptr<const RTCStats>(
+        new RTCTestStats("NetworkThreadStats", timestamp_us)));
+    AddPartialResults(network_report);
+  }
+
+ private:
+  rtc::Thread* const signaling_thread_;
+  rtc::Thread* const worker_thread_;
+  rtc::Thread* const network_thread_;
+
+  rtc::CriticalSection lock_;
+  rtc::scoped_refptr<const RTCStatsReport> delivered_report_;
+  int produced_on_signaling_thread_ = 0;
+  int produced_on_worker_thread_ = 0;
+  int produced_on_network_thread_ = 0;
 };
 
-TEST_F(RTCStatsCollectorTest, CachedStatsReport) {
+class StatsCallback : public RTCStatsCollectorCallback {
+ public:
+  static rtc::scoped_refptr<StatsCallback> Create(
+      rtc::scoped_refptr<const RTCStatsReport>* report_ptr = nullptr) {
+    return rtc::scoped_refptr<StatsCallback>(
+        new rtc::RefCountedObject<StatsCallback>(report_ptr));
+  }
+
+  void OnStatsDelivered(
+      const rtc::scoped_refptr<const RTCStatsReport>& report) override {
+    EXPECT_TRUE(thread_checker_.CalledOnValidThread());
+    report_ = report;
+    if (report_ptr_)
+      *report_ptr_ = report_;
+  }
+
+  rtc::scoped_refptr<const RTCStatsReport> report() const {
+    EXPECT_TRUE(thread_checker_.CalledOnValidThread());
+    return report_;
+  }
+
+ protected:
+  explicit StatsCallback(rtc::scoped_refptr<const RTCStatsReport>* report_ptr)
+      : report_ptr_(report_ptr) {}
+
+ private:
+  rtc::ThreadChecker thread_checker_;
+  rtc::scoped_refptr<const RTCStatsReport> report_;
+  rtc::scoped_refptr<const RTCStatsReport>* report_ptr_;
+};
+
+class RTCStatsCollectorTest : public testing::Test {
+ public:
+  RTCStatsCollectorTest()
+    : test_(new rtc::RefCountedObject<RTCStatsCollectorTestHelper>()),
+      collector_(RTCStatsCollector::Create(
+          &test_->pc(), 50 * rtc::kNumMicrosecsPerMillisec)) {
+  }
+
+  rtc::scoped_refptr<const RTCStatsReport> GetStatsReport() {
+    rtc::scoped_refptr<StatsCallback> callback = StatsCallback::Create();
+    collector_->GetStatsReport(callback);
+    EXPECT_TRUE_WAIT(callback->report(), kGetStatsReportTimeoutMs);
+    return callback->report();
+  }
+
+ protected:
+  rtc::scoped_refptr<RTCStatsCollectorTestHelper> test_;
+  rtc::scoped_refptr<RTCStatsCollector> collector_;
+};
+
+TEST_F(RTCStatsCollectorTest, SingleCallback) {
+  rtc::scoped_refptr<const RTCStatsReport> result;
+  collector_->GetStatsReport(StatsCallback::Create(&result));
+  EXPECT_TRUE_WAIT(result, kGetStatsReportTimeoutMs);
+}
+
+TEST_F(RTCStatsCollectorTest, MultipleCallbacks) {
+  rtc::scoped_refptr<const RTCStatsReport> a;
+  rtc::scoped_refptr<const RTCStatsReport> b;
+  rtc::scoped_refptr<const RTCStatsReport> c;
+  collector_->GetStatsReport(StatsCallback::Create(&a));
+  collector_->GetStatsReport(StatsCallback::Create(&b));
+  collector_->GetStatsReport(StatsCallback::Create(&c));
+  EXPECT_TRUE_WAIT(a, kGetStatsReportTimeoutMs);
+  EXPECT_TRUE_WAIT(b, kGetStatsReportTimeoutMs);
+  EXPECT_TRUE_WAIT(c, kGetStatsReportTimeoutMs);
+  EXPECT_EQ(a.get(), b.get());
+  EXPECT_EQ(b.get(), c.get());
+}
+
+TEST_F(RTCStatsCollectorTest, CachedStatsReports) {
   rtc::ScopedFakeClock fake_clock;
   // Caching should ensure |a| and |b| are the same report.
-  rtc::scoped_refptr<const RTCStatsReport> a = collector_.GetStatsReport();
-  rtc::scoped_refptr<const RTCStatsReport> b = collector_.GetStatsReport();
-  EXPECT_TRUE(a);
+  rtc::scoped_refptr<const RTCStatsReport> a = GetStatsReport();
+  rtc::scoped_refptr<const RTCStatsReport> b = GetStatsReport();
   EXPECT_EQ(a.get(), b.get());
   // Invalidate cache by clearing it.
-  collector_.ClearCachedStatsReport();
-  rtc::scoped_refptr<const RTCStatsReport> c = collector_.GetStatsReport();
-  EXPECT_TRUE(c);
+  collector_->ClearCachedStatsReport();
+  rtc::scoped_refptr<const RTCStatsReport> c = GetStatsReport();
   EXPECT_NE(b.get(), c.get());
   // Invalidate cache by advancing time.
   fake_clock.AdvanceTime(rtc::TimeDelta::FromMilliseconds(51));
-  rtc::scoped_refptr<const RTCStatsReport> d = collector_.GetStatsReport();
+  rtc::scoped_refptr<const RTCStatsReport> d = GetStatsReport();
   EXPECT_TRUE(d);
   EXPECT_NE(c.get(), d.get());
 }
 
+TEST_F(RTCStatsCollectorTest, MultipleCallbacksWithInvalidatedCacheInBetween) {
+  rtc::ScopedFakeClock fake_clock;
+  rtc::scoped_refptr<const RTCStatsReport> a;
+  rtc::scoped_refptr<const RTCStatsReport> b;
+  rtc::scoped_refptr<const RTCStatsReport> c;
+  collector_->GetStatsReport(StatsCallback::Create(&a));
+  collector_->GetStatsReport(StatsCallback::Create(&b));
+  // Cache is invalidated after 50 ms.
+  fake_clock.AdvanceTime(rtc::TimeDelta::FromMilliseconds(51));
+  collector_->GetStatsReport(StatsCallback::Create(&c));
+  EXPECT_TRUE_WAIT(a, kGetStatsReportTimeoutMs);
+  EXPECT_TRUE_WAIT(b, kGetStatsReportTimeoutMs);
+  EXPECT_TRUE_WAIT(c, kGetStatsReportTimeoutMs);
+  EXPECT_EQ(a.get(), b.get());
+  // The act of doing |AdvanceTime| processes all messages. If this was not the
+  // case we might not require |c| to be fresher than |b|.
+  EXPECT_NE(c.get(), b.get());
+}
+
 TEST_F(RTCStatsCollectorTest, CollectRTCPeerConnectionStats) {
   int64_t before = static_cast<int64_t>(
       rtc::Timing::WallTimeNow() * rtc::kNumMicrosecsPerSec);
-  rtc::scoped_refptr<const RTCStatsReport> report = collector_.GetStatsReport();
+  rtc::scoped_refptr<const RTCStatsReport> report = GetStatsReport();
   int64_t after = static_cast<int64_t>(
       rtc::Timing::WallTimeNow() * rtc::kNumMicrosecsPerSec);
   EXPECT_EQ(report->GetStatsOfType<RTCPeerConnectionStats>().size(),
@@ -137,8 +342,8 @@
   test_->data_channels().push_back(
       new MockDataChannel(DataChannelInterface::kClosed));
 
-  collector_.ClearCachedStatsReport();
-  report = collector_.GetStatsReport();
+  collector_->ClearCachedStatsReport();
+  report = GetStatsReport();
   EXPECT_EQ(report->GetStatsOfType<RTCPeerConnectionStats>().size(),
             static_cast<size_t>(1)) << "Expecting 1 RTCPeerConnectionStats.";
   stats = report->Get("RTCPeerConnection");
@@ -156,4 +361,23 @@
   }
 }
 
+class RTCStatsCollectorTestWithFakeCollector : public testing::Test {
+ public:
+  RTCStatsCollectorTestWithFakeCollector()
+    : test_(new rtc::RefCountedObject<RTCStatsCollectorTestHelper>()),
+      collector_(FakeRTCStatsCollector::Create(
+          &test_->pc(), 50 * rtc::kNumMicrosecsPerMillisec)) {
+  }
+
+ protected:
+  rtc::scoped_refptr<RTCStatsCollectorTestHelper> test_;
+  rtc::scoped_refptr<FakeRTCStatsCollector> collector_;
+};
+
+TEST_F(RTCStatsCollectorTestWithFakeCollector, ThreadUsageAndResultsMerging) {
+  collector_->VerifyThreadUsageAndResultsMerging();
+}
+
+}  // namespace
+
 }  // namespace webrtc