Add support for dynamic processing mode in PacedSender.

Behind a default-disabled field trial.

Bug: webrtc:10809
Change-Id: If5d9b69721bd67e59e68b1026e3797e9a1b0a760
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/159783
Commit-Queue: Erik Språng <sprang@webrtc.org>
Reviewed-by: Sebastian Jansson <srte@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#29802}
diff --git a/modules/pacing/paced_sender.cc b/modules/pacing/paced_sender.cc
index fad9018..56eed92 100644
--- a/modules/pacing/paced_sender.cc
+++ b/modules/pacing/paced_sender.cc
@@ -32,11 +32,16 @@
                          RtcEventLog* event_log,
                          const WebRtcKeyValueConfig* field_trials,
                          ProcessThread* process_thread)
-    : pacing_controller_(clock,
+    : process_mode_((field_trials != nullptr &&
+                     field_trials->Lookup("WebRTC-Pacer-DynamicProcess")
+                             .find("Enabled") == 0)
+                        ? PacingController::ProcessMode::kDynamic
+                        : PacingController::ProcessMode::kPeriodic),
+      pacing_controller_(clock,
                          static_cast<PacingController::PacketSender*>(this),
                          event_log,
                          field_trials,
-                         PacingController::ProcessMode::kPeriodic),
+                         process_mode_),
       clock_(clock),
       packet_router_(packet_router),
       process_thread_(process_thread) {
@@ -45,8 +50,9 @@
 }
 
 PacedSender::~PacedSender() {
-  if (process_thread_)
+  if (process_thread_) {
     process_thread_->DeRegisterModule(&module_proxy_);
+  }
 }
 
 void PacedSender::CreateProbeCluster(DataRate bitrate, int cluster_id) {
@@ -62,8 +68,9 @@
 
   // Tell the process thread to call our TimeUntilNextProcess() method to get
   // a new (longer) estimate for when to call Process().
-  if (process_thread_)
+  if (process_thread_) {
     process_thread_->WakeUp(&module_proxy_);
+  }
 }
 
 void PacedSender::Resume() {
@@ -74,31 +81,44 @@
 
   // Tell the process thread to call our TimeUntilNextProcess() method to
   // refresh the estimate for when to call Process().
-  if (process_thread_)
+  if (process_thread_) {
     process_thread_->WakeUp(&module_proxy_);
+  }
 }
 
 void PacedSender::SetCongestionWindow(DataSize congestion_window_size) {
-  rtc::CritScope cs(&critsect_);
-  pacing_controller_.SetCongestionWindow(congestion_window_size);
+  {
+    rtc::CritScope cs(&critsect_);
+    pacing_controller_.SetCongestionWindow(congestion_window_size);
+  }
+  MaybeWakupProcessThread();
 }
 
 void PacedSender::UpdateOutstandingData(DataSize outstanding_data) {
-  rtc::CritScope cs(&critsect_);
-  pacing_controller_.UpdateOutstandingData(outstanding_data);
+  {
+    rtc::CritScope cs(&critsect_);
+    pacing_controller_.UpdateOutstandingData(outstanding_data);
+  }
+  MaybeWakupProcessThread();
 }
 
 void PacedSender::SetPacingRates(DataRate pacing_rate, DataRate padding_rate) {
-  rtc::CritScope cs(&critsect_);
-  pacing_controller_.SetPacingRates(pacing_rate, padding_rate);
+  {
+    rtc::CritScope cs(&critsect_);
+    pacing_controller_.SetPacingRates(pacing_rate, padding_rate);
+  }
+  MaybeWakupProcessThread();
 }
 
 void PacedSender::EnqueuePackets(
     std::vector<std::unique_ptr<RtpPacketToSend>> packets) {
-  rtc::CritScope cs(&critsect_);
-  for (auto& packet : packets) {
-    pacing_controller_.EnqueuePacket(std::move(packet));
+  {
+    rtc::CritScope cs(&critsect_);
+    for (auto& packet : packets) {
+      pacing_controller_.EnqueuePacket(std::move(packet));
+    }
   }
+  MaybeWakupProcessThread();
 }
 
 void PacedSender::SetAccountForAudioPackets(bool account_for_audio) {
@@ -144,9 +164,21 @@
   RTC_DCHECK(!process_thread || process_thread == process_thread_);
 }
 
+void PacedSender::MaybeWakupProcessThread() {
+  // Tell the process thread to call our TimeUntilNextProcess() method to get
+  // a new time for when to call Process().
+  if (process_thread_ &&
+      process_mode_ == PacingController::ProcessMode::kDynamic) {
+    process_thread_->WakeUp(&module_proxy_);
+  }
+}
+
 void PacedSender::SetQueueTimeLimit(TimeDelta limit) {
-  rtc::CritScope cs(&critsect_);
-  pacing_controller_.SetQueueTimeLimit(limit);
+  {
+    rtc::CritScope cs(&critsect_);
+    pacing_controller_.SetQueueTimeLimit(limit);
+  }
+  MaybeWakupProcessThread();
 }
 
 void PacedSender::SendRtpPacket(std::unique_ptr<RtpPacketToSend> packet,
diff --git a/modules/pacing/paced_sender.h b/modules/pacing/paced_sender.h
index 3539c53..06a6c26 100644
--- a/modules/pacing/paced_sender.h
+++ b/modules/pacing/paced_sender.h
@@ -134,9 +134,10 @@
   // Called when the prober is associated with a process thread.
   void ProcessThreadAttached(ProcessThread* process_thread) override;
 
- private:
-  // Methods implementing PacedSenderController:PacketSender.
+  // In dynamic process mode, refreshes the next process time.
+  void MaybeWakupProcessThread();
 
+  // Methods implementing PacedSenderController:PacketSender.
   void SendRtpPacket(std::unique_ptr<RtpPacketToSend> packet,
                      const PacedPacketInfo& cluster_info) override
       RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);
@@ -163,6 +164,7 @@
   } module_proxy_{this};
 
   rtc::CriticalSection critsect_;
+  const PacingController::ProcessMode process_mode_;
   PacingController pacing_controller_ RTC_GUARDED_BY(critsect_);
 
   Clock* const clock_;
diff --git a/modules/pacing/paced_sender_unittest.cc b/modules/pacing/paced_sender_unittest.cc
index feb6c07..23f1d60 100644
--- a/modules/pacing/paced_sender_unittest.cc
+++ b/modules/pacing/paced_sender_unittest.cc
@@ -39,7 +39,6 @@
 namespace webrtc {
 namespace test {
 
-
 // Mock callback implementing the raw api.
 class MockCallback : public PacketRouter {
  public:
@@ -51,69 +50,88 @@
       std::vector<std::unique_ptr<RtpPacketToSend>>(size_t target_size_bytes));
 };
 
-std::unique_ptr<RtpPacketToSend> BuildRtpPacket(RtpPacketToSend::Type type) {
-  auto packet = std::make_unique<RtpPacketToSend>(nullptr);
-  packet->set_packet_type(type);
-  switch (type) {
-    case RtpPacketToSend::Type::kAudio:
-      packet->SetSsrc(kAudioSsrc);
-      break;
-    case RtpPacketToSend::Type::kVideo:
-      packet->SetSsrc(kVideoSsrc);
-      break;
-    case RtpPacketToSend::Type::kRetransmission:
-    case RtpPacketToSend::Type::kPadding:
-      packet->SetSsrc(kVideoRtxSsrc);
-      break;
-    case RtpPacketToSend::Type::kForwardErrorCorrection:
-      packet->SetSsrc(kFlexFecSsrc);
-      break;
+class PacedSenderTest
+    : public ::testing::TestWithParam<PacingController::ProcessMode> {
+ public:
+  PacedSenderTest() : clock_(0), paced_module_(nullptr) {}
+
+  void SetUp() override {
+    EXPECT_CALL(process_thread_, RegisterModule)
+        .WillOnce(SaveArg<0>(&paced_module_));
+
+    pacer_ = std::make_unique<PacedSender>(&clock_, &callback_, nullptr,
+                                           nullptr, &process_thread_);
+    EXPECT_CALL(process_thread_, DeRegisterModule(paced_module_)).Times(1);
   }
 
-  packet->SetPayloadSize(kDefaultPacketSize);
-  return packet;
-}
+ protected:
+  std::unique_ptr<RtpPacketToSend> BuildRtpPacket(RtpPacketToSend::Type type) {
+    auto packet = std::make_unique<RtpPacketToSend>(nullptr);
+    packet->set_packet_type(type);
+    switch (type) {
+      case RtpPacketToSend::Type::kAudio:
+        packet->SetSsrc(kAudioSsrc);
+        break;
+      case RtpPacketToSend::Type::kVideo:
+        packet->SetSsrc(kVideoSsrc);
+        break;
+      case RtpPacketToSend::Type::kRetransmission:
+      case RtpPacketToSend::Type::kPadding:
+        packet->SetSsrc(kVideoRtxSsrc);
+        break;
+      case RtpPacketToSend::Type::kForwardErrorCorrection:
+        packet->SetSsrc(kFlexFecSsrc);
+        break;
+    }
 
-TEST(PacedSenderTest, PacesPackets) {
-  SimulatedClock clock(0);
-  MockCallback callback;
-  MockProcessThread process_thread;
-  Module* paced_module = nullptr;
-  EXPECT_CALL(process_thread, RegisterModule(_, _))
-      .WillOnce(SaveArg<0>(&paced_module));
-  PacedSender pacer(&clock, &callback, nullptr, nullptr, &process_thread);
-  EXPECT_CALL(process_thread, DeRegisterModule(paced_module)).Times(1);
+    packet->SetPayloadSize(kDefaultPacketSize);
+    return packet;
+  }
 
+  SimulatedClock clock_;
+  MockCallback callback_;
+  MockProcessThread process_thread_;
+  Module* paced_module_;
+  std::unique_ptr<PacedSender> pacer_;
+};
+
+TEST_P(PacedSenderTest, PacesPackets) {
   // Insert a number of packets, covering one second.
   static constexpr size_t kPacketsToSend = 42;
-  pacer.SetPacingRates(DataRate::bps(kDefaultPacketSize * 8 * kPacketsToSend),
-                       DataRate::Zero());
+  pacer_->SetPacingRates(DataRate::bps(kDefaultPacketSize * 8 * kPacketsToSend),
+                         DataRate::Zero());
   std::vector<std::unique_ptr<RtpPacketToSend>> packets;
   for (size_t i = 0; i < kPacketsToSend; ++i) {
     packets.emplace_back(BuildRtpPacket(RtpPacketToSend::Type::kVideo));
   }
-  pacer.EnqueuePackets(std::move(packets));
+  pacer_->EnqueuePackets(std::move(packets));
 
   // Expect all of them to be sent.
   size_t packets_sent = 0;
-  clock.AdvanceTimeMilliseconds(paced_module->TimeUntilNextProcess());
-  EXPECT_CALL(callback, SendPacket)
+  clock_.AdvanceTimeMilliseconds(paced_module_->TimeUntilNextProcess());
+  EXPECT_CALL(callback_, SendPacket)
       .WillRepeatedly(
           [&](std::unique_ptr<RtpPacketToSend> packet,
               const PacedPacketInfo& cluster_info) { ++packets_sent; });
 
-  const Timestamp start_time = clock.CurrentTime();
+  const Timestamp start_time = clock_.CurrentTime();
 
   while (packets_sent < kPacketsToSend) {
-    clock.AdvanceTimeMilliseconds(paced_module->TimeUntilNextProcess());
-    paced_module->Process();
+    clock_.AdvanceTimeMilliseconds(paced_module_->TimeUntilNextProcess());
+    paced_module_->Process();
   }
 
   // Packets should be sent over a period of close to 1s. Expect a little lower
   // than this since initial probing is a bit quicker.
-  TimeDelta duration = clock.CurrentTime() - start_time;
+  TimeDelta duration = clock_.CurrentTime() - start_time;
   EXPECT_GT(duration, TimeDelta::ms(900));
 }
 
+INSTANTIATE_TEST_SUITE_P(
+    WithAndWithoutDynamicProcess,
+    PacedSenderTest,
+    ::testing::Values(PacingController::ProcessMode::kPeriodic,
+                      PacingController::ProcessMode::kDynamic));
+
 }  // namespace test
 }  // namespace webrtc