Fix RampUp tests to call Call::GetStats() from the right thread - and remove the need for a dedicated polling thread.

Bug: webrtc:10847
Change-Id: I01492d2e385840e50d2d94f498063b5e4eea3665
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/148067
Reviewed-by: Magnus Flodman <mflodman@webrtc.org>
Commit-Queue: Tommi <tommi@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#28764}
diff --git a/call/rampup_tests.cc b/call/rampup_tests.cc
index 0fc3627..8f689e7 100644
--- a/call/rampup_tests.cc
+++ b/call/rampup_tests.cc
@@ -60,7 +60,8 @@
                            const std::string& extension_type,
                            bool rtx,
                            bool red,
-                           bool report_perf_stats)
+                           bool report_perf_stats,
+                           test::SingleThreadedTaskQueueForTesting* task_queue)
     : EndToEndTest(test::CallTest::kLongTimeoutMs),
       clock_(Clock::GetRealTimeClock()),
       num_video_streams_(num_video_streams),
@@ -82,15 +83,22 @@
       video_ssrcs_(GenerateSsrcs(num_video_streams_, 100)),
       video_rtx_ssrcs_(GenerateSsrcs(num_video_streams_, 200)),
       audio_ssrcs_(GenerateSsrcs(num_audio_streams_, 300)),
-      poller_thread_(&BitrateStatsPollingThread,
-                     this,
-                     "BitrateStatsPollingThread") {
+      task_queue_(task_queue) {
   if (red_)
     EXPECT_EQ(0u, num_flexfec_streams_);
   EXPECT_LE(num_audio_streams_, 1u);
 }
 
-RampUpTester::~RampUpTester() {}
+RampUpTester::~RampUpTester() {
+  // Special case for WebRTC-QuickPerfTest/Enabled/
+  task_queue_->SendTask([this]() {
+    if (pending_task_ !=
+        static_cast<test::SingleThreadedTaskQueueForTesting::TaskId>(-1)) {
+      task_queue_->CancelTask(pending_task_);
+      pending_task_ = -1;
+    }
+  });
+}
 
 void RampUpTester::ModifySenderBitrateConfig(
     BitrateConstraints* bitrate_config) {
@@ -309,27 +317,25 @@
 }
 
 void RampUpTester::OnCallsCreated(Call* sender_call, Call* receiver_call) {
+  RTC_DCHECK(sender_call);
   sender_call_ = sender_call;
-}
-
-void RampUpTester::BitrateStatsPollingThread(void* obj) {
-  static_cast<RampUpTester*>(obj)->PollStats();
+  pending_task_ = task_queue_->PostTask([this]() { PollStats(); });
 }
 
 void RampUpTester::PollStats() {
-  do {
-    if (sender_call_) {
-      Call::Stats stats = sender_call_->GetStats();
+  pending_task_ = -1;
+  Call::Stats stats = sender_call_->GetStats();
+  EXPECT_GE(expected_bitrate_bps_, 0);
 
-      EXPECT_GE(expected_bitrate_bps_, 0);
-      if (stats.send_bandwidth_bps >= expected_bitrate_bps_ &&
-          (min_run_time_ms_ == -1 ||
-           clock_->TimeInMilliseconds() - test_start_ms_ >= min_run_time_ms_)) {
-        ramp_up_finished_ms_ = clock_->TimeInMilliseconds();
-        observation_complete_.Set();
-      }
-    }
-  } while (!stop_event_.Wait(kPollIntervalMs));
+  if (stats.send_bandwidth_bps >= expected_bitrate_bps_ &&
+      (min_run_time_ms_ == -1 ||
+       clock_->TimeInMilliseconds() - test_start_ms_ >= min_run_time_ms_)) {
+    ramp_up_finished_ms_ = clock_->TimeInMilliseconds();
+    observation_complete_.Set();
+  } else {
+    pending_task_ = task_queue_->PostDelayedTask([this]() { PollStats(); },
+                                                 kPollIntervalMs);
+  }
 }
 
 void RampUpTester::ReportResult(const std::string& measurement,
@@ -365,7 +371,18 @@
   if (!send_stream_)
     return;
 
+  // Stop polling stats.
+  // Corner case for field_trials=WebRTC-QuickPerfTest/Enabled/
+  task_queue_->SendTask([this]() {
+    if (pending_task_ !=
+        static_cast<test::SingleThreadedTaskQueueForTesting::TaskId>(-1)) {
+      task_queue_->CancelTask(pending_task_);
+      pending_task_ = -1;
+    }
+  });
+
   VideoSendStream::Stats send_stats = send_stream_->GetStats();
+  send_stream_ = nullptr;  // To avoid dereferencing a bad pointer.
 
   size_t total_packets_sent = 0;
   size_t total_sent = 0;
@@ -401,22 +418,21 @@
 
 void RampUpTester::PerformTest() {
   test_start_ms_ = clock_->TimeInMilliseconds();
-  poller_thread_.Start();
   EXPECT_TRUE(Wait()) << "Timed out while waiting for ramp-up to complete.";
   TriggerTestDone();
-  stop_event_.Set();
-  poller_thread_.Stop();
 }
 
-RampUpDownUpTester::RampUpDownUpTester(size_t num_video_streams,
-                                       size_t num_audio_streams,
-                                       size_t num_flexfec_streams,
-                                       unsigned int start_bitrate_bps,
-                                       const std::string& extension_type,
-                                       bool rtx,
-                                       bool red,
-                                       const std::vector<int>& loss_rates,
-                                       bool report_perf_stats)
+RampUpDownUpTester::RampUpDownUpTester(
+    size_t num_video_streams,
+    size_t num_audio_streams,
+    size_t num_flexfec_streams,
+    unsigned int start_bitrate_bps,
+    const std::string& extension_type,
+    bool rtx,
+    bool red,
+    const std::vector<int>& loss_rates,
+    bool report_perf_stats,
+    test::SingleThreadedTaskQueueForTesting* task_queue)
     : RampUpTester(num_video_streams,
                    num_audio_streams,
                    num_flexfec_streams,
@@ -425,7 +441,8 @@
                    extension_type,
                    rtx,
                    red,
-                   report_perf_stats),
+                   report_perf_stats,
+                   task_queue),
       link_rates_({4 * GetExpectedHighBitrate() / (3 * 1000),
                    kLowBandwidthLimitBps / 1000,
                    4 * GetExpectedHighBitrate() / (3 * 1000), 0}),
@@ -443,23 +460,30 @@
 RampUpDownUpTester::~RampUpDownUpTester() {}
 
 void RampUpDownUpTester::PollStats() {
-  do {
-    int transmit_bitrate_bps = 0;
-    bool suspended = false;
-    if (num_video_streams_ > 0) {
-      webrtc::VideoSendStream::Stats stats = send_stream_->GetStats();
-      for (const auto& it : stats.substreams) {
-        transmit_bitrate_bps += it.second.total_bitrate_bps;
-      }
-      suspended = stats.suspended;
+  pending_task_ = -1;
+  bool last_round = (test_state_ == kTestEnd);
+
+  int transmit_bitrate_bps = 0;
+  bool suspended = false;
+  if (num_video_streams_ > 0 && send_stream_) {
+    webrtc::VideoSendStream::Stats stats = send_stream_->GetStats();
+    for (const auto& it : stats.substreams) {
+      transmit_bitrate_bps += it.second.total_bitrate_bps;
     }
-    if (num_audio_streams_ > 0 && sender_call_ != nullptr) {
-      // An audio send stream doesn't have bitrate stats, so the call send BW is
-      // currently used instead.
-      transmit_bitrate_bps = sender_call_->GetStats().send_bandwidth_bps;
-    }
-    EvolveTestState(transmit_bitrate_bps, suspended);
-  } while (!stop_event_.Wait(kPollIntervalMs));
+    suspended = stats.suspended;
+  }
+  if (num_audio_streams_ > 0 && sender_call_) {
+    // An audio send stream doesn't have bitrate stats, so the call send BW is
+    // currently used instead.
+    transmit_bitrate_bps = sender_call_->GetStats().send_bandwidth_bps;
+  }
+
+  EvolveTestState(transmit_bitrate_bps, suspended);
+
+  if (!last_round) {
+    pending_task_ = task_queue_->PostDelayedTask([this]() { PollStats(); },
+                                                 kPollIntervalMs);
+  }
 }
 
 void RampUpDownUpTester::ModifyReceiverBitrateConfig(
@@ -614,7 +638,7 @@
   std::vector<int> loss_rates = {0, 0, 0, 0};
   RampUpDownUpTester test(3, 0, 0, kStartBitrateBps,
                           RtpExtension::kAbsSendTimeUri, true, true, loss_rates,
-                          true);
+                          true, &task_queue_);
   RunBaseTest(&test);
 }
 
@@ -630,7 +654,7 @@
   std::vector<int> loss_rates = {0, 0, 0, 0};
   RampUpDownUpTester test(3, 0, 0, kStartBitrateBps,
                           RtpExtension::kTransportSequenceNumberUri, true,
-                          false, loss_rates, true);
+                          false, loss_rates, true, &task_queue_);
   RunBaseTest(&test);
 }
 
@@ -642,7 +666,7 @@
   std::vector<int> loss_rates = {20, 0, 0, 0};
   RampUpDownUpTester test(1, 0, 1, kStartBitrateBps,
                           RtpExtension::kTransportSequenceNumberUri, true,
-                          false, loss_rates, false);
+                          false, loss_rates, false, &task_queue_);
   RunBaseTest(&test);
 }
 
@@ -659,7 +683,7 @@
   std::vector<int> loss_rates = {0, 0, 0, 0};
   RampUpDownUpTester test(3, 1, 0, kStartBitrateBps,
                           RtpExtension::kTransportSequenceNumberUri, true,
-                          false, loss_rates, false);
+                          false, loss_rates, false, &task_queue_);
   RunBaseTest(&test);
 }
 
@@ -668,50 +692,50 @@
   std::vector<int> loss_rates = {0, 0, 0, 0};
   RampUpDownUpTester test(0, 1, 0, kStartBitrateBps,
                           RtpExtension::kTransportSequenceNumberUri, true,
-                          false, loss_rates, false);
+                          false, loss_rates, false, &task_queue_);
   RunBaseTest(&test);
 }
 
 TEST_F(RampUpTest, TOffsetSimulcastRedRtx) {
   RampUpTester test(3, 0, 0, 0, 0, RtpExtension::kTimestampOffsetUri, true,
-                    true, true);
+                    true, true, &task_queue_);
   RunBaseTest(&test);
 }
 
 TEST_F(RampUpTest, AbsSendTime) {
   RampUpTester test(1, 0, 0, 0, 0, RtpExtension::kAbsSendTimeUri, false, false,
-                    false);
+                    false, &task_queue_);
   RunBaseTest(&test);
 }
 
 TEST_F(RampUpTest, AbsSendTimeSimulcastRedRtx) {
   RampUpTester test(3, 0, 0, 0, 0, RtpExtension::kAbsSendTimeUri, true, true,
-                    true);
+                    true, &task_queue_);
   RunBaseTest(&test);
 }
 
 TEST_F(RampUpTest, TransportSequenceNumber) {
   RampUpTester test(1, 0, 0, 0, 0, RtpExtension::kTransportSequenceNumberUri,
-                    false, false, false);
+                    false, false, false, &task_queue_);
   RunBaseTest(&test);
 }
 
 TEST_F(RampUpTest, TransportSequenceNumberSimulcast) {
   RampUpTester test(3, 0, 0, 0, 0, RtpExtension::kTransportSequenceNumberUri,
-                    false, false, false);
+                    false, false, false, &task_queue_);
   RunBaseTest(&test);
 }
 
 TEST_F(RampUpTest, TransportSequenceNumberSimulcastRedRtx) {
   RampUpTester test(3, 0, 0, 0, 0, RtpExtension::kTransportSequenceNumberUri,
-                    true, true, true);
+                    true, true, true, &task_queue_);
   RunBaseTest(&test);
 }
 
 TEST_F(RampUpTest, AudioTransportSequenceNumber) {
   RampUpTester test(0, 1, 0, 300000, 10000,
                     RtpExtension::kTransportSequenceNumberUri, false, false,
-                    false);
+                    false, &task_queue_);
   RunBaseTest(&test);
 }
 }  // namespace webrtc
diff --git a/call/rampup_tests.h b/call/rampup_tests.h
index b7d4af5..e3ab5f2 100644
--- a/call/rampup_tests.h
+++ b/call/rampup_tests.h
@@ -42,7 +42,8 @@
                const std::string& extension_type,
                bool rtx,
                bool red,
-               bool report_perf_stats);
+               bool report_perf_stats,
+               test::SingleThreadedTaskQueueForTesting* task_queue);
   ~RampUpTester() override;
 
   size_t GetNumVideoStreams() const override;
@@ -101,8 +102,6 @@
       std::vector<FlexfecReceiveStream::Config>* receive_configs) override;
   void OnCallsCreated(Call* sender_call, Call* receiver_call) override;
 
-  static void BitrateStatsPollingThread(void* obj);
-
   const int start_bitrate_bps_;
   const int64_t min_run_time_ms_;
   int expected_bitrate_bps_;
@@ -114,7 +113,9 @@
   std::vector<uint32_t> video_rtx_ssrcs_;
   std::vector<uint32_t> audio_ssrcs_;
 
-  rtc::PlatformThread poller_thread_;
+ protected:
+  test::SingleThreadedTaskQueueForTesting* const task_queue_;
+  test::SingleThreadedTaskQueueForTesting::TaskId pending_task_ = -1;
 };
 
 class RampUpDownUpTester : public RampUpTester {
@@ -127,7 +128,8 @@
                      bool rtx,
                      bool red,
                      const std::vector<int>& loss_rates,
-                     bool report_perf_stats);
+                     bool report_perf_stats,
+                     test::SingleThreadedTaskQueueForTesting* task_queue);
   ~RampUpDownUpTester() override;
 
  protected: