Call Call::GetStats() from the correct thread in our bandwidth tests.

Bug: webrtc:10847
Change-Id: Ief8cdd72f9d5b600d5306c00c1d249c29fb20396
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/148063
Reviewed-by: Magnus Flodman <mflodman@webrtc.org>
Commit-Queue: Tommi <tommi@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#28765}
diff --git a/video/end_to_end_tests/bandwidth_tests.cc b/video/end_to_end_tests/bandwidth_tests.cc
index 163e84d..4312c0e 100644
--- a/video/end_to_end_tests/bandwidth_tests.cc
+++ b/video/end_to_end_tests/bandwidth_tests.cc
@@ -79,12 +79,14 @@
 
 class BandwidthStatsTest : public test::EndToEndTest {
  public:
-  explicit BandwidthStatsTest(bool send_side_bwe)
+  BandwidthStatsTest(bool send_side_bwe,
+                     test::SingleThreadedTaskQueueForTesting* task_queue)
       : EndToEndTest(test::CallTest::kDefaultTimeoutMs),
         sender_call_(nullptr),
         receiver_call_(nullptr),
         has_seen_pacer_delay_(false),
-        send_side_bwe_(send_side_bwe) {}
+        send_side_bwe_(send_side_bwe),
+        task_queue_(task_queue) {}
 
   void ModifyVideoConfigs(
       VideoSendStream::Config* send_config,
@@ -99,15 +101,22 @@
     }
   }
 
+  // Called on the pacer thread.
   Action OnSendRtp(const uint8_t* packet, size_t length) override {
-    Call::Stats sender_stats = sender_call_->GetStats();
-    Call::Stats receiver_stats = receiver_call_->GetStats();
-    if (!has_seen_pacer_delay_)
-      has_seen_pacer_delay_ = sender_stats.pacer_delay_ms > 0;
-    if (sender_stats.send_bandwidth_bps > 0 && has_seen_pacer_delay_) {
-      if (send_side_bwe_ || receiver_stats.recv_bandwidth_bps > 0)
-        observation_complete_.Set();
-    }
+    // Stats need to be fetched on the thread where the caller objects were
+    // constructed.
+    task_queue_->PostTask([this]() {
+      Call::Stats sender_stats = sender_call_->GetStats();
+      if (!has_seen_pacer_delay_)
+        has_seen_pacer_delay_ = sender_stats.pacer_delay_ms > 0;
+
+      if (sender_stats.send_bandwidth_bps > 0 && has_seen_pacer_delay_) {
+        Call::Stats receiver_stats = receiver_call_->GetStats();
+        if (send_side_bwe_ || receiver_stats.recv_bandwidth_bps > 0)
+          observation_complete_.Set();
+      }
+    });
+
     return SEND_PACKET;
   }
 
@@ -126,15 +135,16 @@
   Call* receiver_call_;
   bool has_seen_pacer_delay_;
   const bool send_side_bwe_;
+  test::SingleThreadedTaskQueueForTesting* const task_queue_;
 };
 
 TEST_F(BandwidthEndToEndTest, VerifySendSideBweStats) {
-  BandwidthStatsTest test(true);
+  BandwidthStatsTest test(true, &task_queue_);
   RunBaseTest(&test);
 }
 
 TEST_F(BandwidthEndToEndTest, VerifyRecvSideBweStats) {
-  BandwidthStatsTest test(false);
+  BandwidthStatsTest test(false, &task_queue_);
   RunBaseTest(&test);
 }
 
@@ -146,18 +156,16 @@
 TEST_F(BandwidthEndToEndTest, RembWithSendSideBwe) {
   class BweObserver : public test::EndToEndTest {
    public:
-    BweObserver()
+    explicit BweObserver(test::SingleThreadedTaskQueueForTesting* task_queue)
         : EndToEndTest(kDefaultTimeoutMs),
           sender_call_(nullptr),
           clock_(Clock::GetRealTimeClock()),
           sender_ssrc_(0),
           remb_bitrate_bps_(1000000),
           receive_transport_(nullptr),
-          poller_thread_(&BitrateStatsPollingThread,
-                         this,
-                         "BitrateStatsPollingThread"),
           state_(kWaitForFirstRampUp),
-          retransmission_rate_limiter_(clock_, 1000) {}
+          retransmission_rate_limiter_(clock_, 1000),
+          task_queue_(task_queue) {}
 
     ~BweObserver() {}
 
@@ -200,56 +208,52 @@
     }
 
     void OnCallsCreated(Call* sender_call, Call* receiver_call) override {
+      RTC_DCHECK(sender_call);
       sender_call_ = sender_call;
-    }
-
-    static void BitrateStatsPollingThread(void* obj) {
-      static_cast<BweObserver*>(obj)->PollStats();
+      pending_task_ = task_queue_->PostTask([this]() { PollStats(); });
     }
 
     void PollStats() {
-      do {
-        if (sender_call_) {
-          Call::Stats stats = sender_call_->GetStats();
-          switch (state_) {
-            case kWaitForFirstRampUp:
-              if (stats.send_bandwidth_bps >= remb_bitrate_bps_) {
-                state_ = kWaitForRemb;
-                remb_bitrate_bps_ /= 2;
-                rtp_rtcp_->SetRemb(
-                    remb_bitrate_bps_,
-                    std::vector<uint32_t>(&sender_ssrc_, &sender_ssrc_ + 1));
-                rtp_rtcp_->SendRTCP(kRtcpRr);
-              }
-              break;
-
-            case kWaitForRemb:
-              if (stats.send_bandwidth_bps == remb_bitrate_bps_) {
-                state_ = kWaitForSecondRampUp;
-                remb_bitrate_bps_ *= 2;
-                rtp_rtcp_->SetRemb(
-                    remb_bitrate_bps_,
-                    std::vector<uint32_t>(&sender_ssrc_, &sender_ssrc_ + 1));
-                rtp_rtcp_->SendRTCP(kRtcpRr);
-              }
-              break;
-
-            case kWaitForSecondRampUp:
-              if (stats.send_bandwidth_bps == remb_bitrate_bps_) {
-                observation_complete_.Set();
-              }
-              break;
+      pending_task_ = ~0;  // for debugging purposes indicate no pending task.
+      Call::Stats stats = sender_call_->GetStats();
+      switch (state_) {
+        case kWaitForFirstRampUp:
+          if (stats.send_bandwidth_bps >= remb_bitrate_bps_) {
+            state_ = kWaitForRemb;
+            remb_bitrate_bps_ /= 2;
+            rtp_rtcp_->SetRemb(
+                remb_bitrate_bps_,
+                std::vector<uint32_t>(&sender_ssrc_, &sender_ssrc_ + 1));
+            rtp_rtcp_->SendRTCP(kRtcpRr);
           }
-        }
-      } while (!stop_event_.Wait(1000));
+          break;
+
+        case kWaitForRemb:
+          if (stats.send_bandwidth_bps == remb_bitrate_bps_) {
+            state_ = kWaitForSecondRampUp;
+            remb_bitrate_bps_ *= 2;
+            rtp_rtcp_->SetRemb(
+                remb_bitrate_bps_,
+                std::vector<uint32_t>(&sender_ssrc_, &sender_ssrc_ + 1));
+            rtp_rtcp_->SendRTCP(kRtcpRr);
+          }
+          break;
+
+        case kWaitForSecondRampUp:
+          if (stats.send_bandwidth_bps == remb_bitrate_bps_) {
+            observation_complete_.Set();
+            return;
+          }
+          break;
+      }
+
+      pending_task_ =
+          task_queue_->PostDelayedTask([this]() { PollStats(); }, 1000);
     }
 
     void PerformTest() override {
-      poller_thread_.Start();
       EXPECT_TRUE(Wait())
           << "Timed out while waiting for bitrate to change according to REMB.";
-      stop_event_.Set();
-      poller_thread_.Stop();
     }
 
    private:
@@ -261,11 +265,11 @@
     int remb_bitrate_bps_;
     std::unique_ptr<RtpRtcp> rtp_rtcp_;
     test::PacketTransport* receive_transport_;
-    rtc::Event stop_event_;
-    rtc::PlatformThread poller_thread_;
     TestState state_;
     RateLimiter retransmission_rate_limiter_;
-  } test;
+    test::SingleThreadedTaskQueueForTesting* const task_queue_;
+    test::SingleThreadedTaskQueueForTesting::TaskId pending_task_ = ~0;
+  } test(&task_queue_);
 
   RunBaseTest(&test);
 }