Fixes issue with excessive stats updating in TaskQueuePacedSender.

TaskQueuePacedSender::MaybeUpdateStats() is intended to be called when
packets are sent or by a sequence of "scheduled" calls. There should
only be one scheduled call in flight at a time - and that one
reschedules itself if needed when it runs.

A bug however caused the "schedules task in flight" flag to
incorrectly be set to false, leading to more and more schedules tasks
being alive - eating CPU cycles.

This CL fixes that and also makes sure the queue time properly goes
down to zero before the next idle interval check, even if there are no
more packets to send.

Bug: webrtc:10809
Change-Id: I4e13fcf95619a43dcaf0ed38bce9684a5b0d8d5e
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/176330
Reviewed-by: Ilya Nikolaevskiy <ilnik@webrtc.org>
Commit-Queue: Erik Språng <sprang@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#31390}
diff --git a/modules/pacing/pacing_controller.cc b/modules/pacing/pacing_controller.cc
index 77f21be..07e265b 100644
--- a/modules/pacing/pacing_controller.cc
+++ b/modules/pacing/pacing_controller.cc
@@ -403,7 +403,9 @@
     if (target_send_time.IsMinusInfinity()) {
       target_send_time = now;
     } else if (now < target_send_time) {
-      // We are too early, abort and regroup!
+      // We are too early, but if queue is empty still allow draining some debt.
+      TimeDelta elapsed_time = UpdateTimeAndGetElapsed(now);
+      UpdateBudgetWithElapsedTime(elapsed_time);
       return;
     }
 
diff --git a/modules/pacing/task_queue_paced_sender.cc b/modules/pacing/task_queue_paced_sender.cc
index d058e03..e817f1b 100644
--- a/modules/pacing/task_queue_paced_sender.cc
+++ b/modules/pacing/task_queue_paced_sender.cc
@@ -185,6 +185,11 @@
   return GetStats().oldest_packet_wait_time;
 }
 
+void TaskQueuePacedSender::OnStatsUpdated(const Stats& stats) {
+  rtc::CritScope cs(&stats_crit_);
+  current_stats_ = stats;
+}
+
 void TaskQueuePacedSender::MaybeProcessPackets(
     Timestamp scheduled_process_time) {
   RTC_DCHECK_RUN_ON(&task_queue_);
@@ -232,40 +237,61 @@
 
 void TaskQueuePacedSender::MaybeUpdateStats(bool is_scheduled_call) {
   if (is_shutdown_) {
+    if (is_scheduled_call) {
+      stats_update_scheduled_ = false;
+    }
     return;
   }
 
   Timestamp now = clock_->CurrentTime();
-  if (!is_scheduled_call &&
-      now - last_stats_time_ < kMinTimeBetweenStatsUpdates) {
-    // Too frequent unscheduled stats update, return early.
-    return;
+  if (is_scheduled_call) {
+    // Allow scheduled task to process packets to clear up an remaining debt
+    // level in an otherwise empty queue.
+    pacing_controller_.ProcessPackets();
+  } else {
+    if (now - last_stats_time_ < kMinTimeBetweenStatsUpdates) {
+      // Too frequent unscheduled stats update, return early.
+      return;
+    }
   }
 
-  rtc::CritScope cs(&stats_crit_);
-  current_stats_.expected_queue_time = pacing_controller_.ExpectedQueueTime();
-  current_stats_.first_sent_packet_time =
-      pacing_controller_.FirstSentPacketTime();
-  current_stats_.oldest_packet_wait_time =
-      pacing_controller_.OldestPacketWaitTime();
-  current_stats_.queue_size = pacing_controller_.QueueSizeData();
+  Stats new_stats;
+  new_stats.expected_queue_time = pacing_controller_.ExpectedQueueTime();
+  new_stats.first_sent_packet_time = pacing_controller_.FirstSentPacketTime();
+  new_stats.oldest_packet_wait_time = pacing_controller_.OldestPacketWaitTime();
+  new_stats.queue_size = pacing_controller_.QueueSizeData();
+  OnStatsUpdated(new_stats);
+
   last_stats_time_ = now;
 
   bool pacer_drained = pacing_controller_.QueueSizePackets() == 0 &&
                        pacing_controller_.CurrentBufferLevel().IsZero();
 
   // If there's anything interesting to get from the pacer and this is a
-  // scheduled call (no scheduled call in flight), post a new scheduled stats
+  // scheduled call (or no scheduled call in flight), post a new scheduled stats
   // update.
-  if (!pacer_drained && (is_scheduled_call || !stats_update_scheduled_)) {
-    task_queue_.PostDelayedTask(
-        [this]() {
-          RTC_DCHECK_RUN_ON(&task_queue_);
-          MaybeUpdateStats(true);
-        },
-        kMaxTimeBetweenStatsUpdates.ms<uint32_t>());
-    stats_update_scheduled_ = true;
-  } else {
+  if (!pacer_drained) {
+    if (!stats_update_scheduled_) {
+      // There is no pending delayed task to update stats, add one.
+      // Treat this call as being scheduled in order to bootstrap scheduling
+      // loop.
+      stats_update_scheduled_ = true;
+      is_scheduled_call = true;
+    }
+
+    // Only if on the scheduled call loop do we want to schedule a new delayed
+    // task.
+    if (is_scheduled_call) {
+      task_queue_.PostDelayedTask(
+          [this]() {
+            RTC_DCHECK_RUN_ON(&task_queue_);
+            MaybeUpdateStats(true);
+          },
+          kMaxTimeBetweenStatsUpdates.ms<uint32_t>());
+    }
+  } else if (is_scheduled_call) {
+    // This is a scheduled call, signing out since there's nothing interesting
+    // left to check.
     stats_update_scheduled_ = false;
   }
 }
diff --git a/modules/pacing/task_queue_paced_sender.h b/modules/pacing/task_queue_paced_sender.h
index 5e6a177..c4ee546 100644
--- a/modules/pacing/task_queue_paced_sender.h
+++ b/modules/pacing/task_queue_paced_sender.h
@@ -104,7 +104,8 @@
   // specified by SetPacingRates() if needed to achieve this goal.
   void SetQueueTimeLimit(TimeDelta limit) override;
 
- private:
+ protected:
+  // Exposed as protected for test.
   struct Stats {
     Stats()
         : oldest_packet_wait_time(TimeDelta::Zero()),
@@ -115,7 +116,9 @@
     TimeDelta expected_queue_time;
     absl::optional<Timestamp> first_sent_packet_time;
   };
+  virtual void OnStatsUpdated(const Stats& stats);
 
+ private:
   // Check if it is time to send packets, or schedule a delayed task if not.
   // Use Timestamp::MinusInfinity() to indicate that this call has _not_
   // been scheduled by the pacing controller. If this is the case, check if
diff --git a/modules/pacing/task_queue_paced_sender_unittest.cc b/modules/pacing/task_queue_paced_sender_unittest.cc
index 50fceea..876cd96 100644
--- a/modules/pacing/task_queue_paced_sender_unittest.cc
+++ b/modules/pacing/task_queue_paced_sender_unittest.cc
@@ -48,6 +48,38 @@
               (DataSize target_size),
               (override));
 };
+
+class StatsUpdateObserver {
+ public:
+  StatsUpdateObserver() = default;
+  virtual ~StatsUpdateObserver() = default;
+
+  virtual void OnStatsUpdated() = 0;
+};
+
+class TaskQueuePacedSenderForTest : public TaskQueuePacedSender {
+ public:
+  TaskQueuePacedSenderForTest(
+      Clock* clock,
+      PacketRouter* packet_router,
+      RtcEventLog* event_log,
+      const WebRtcKeyValueConfig* field_trials,
+      TaskQueueFactory* task_queue_factory,
+      TimeDelta hold_back_window = PacingController::kMinSleepTime)
+      : TaskQueuePacedSender(clock,
+                             packet_router,
+                             event_log,
+                             field_trials,
+                             task_queue_factory,
+                             hold_back_window) {}
+
+  void OnStatsUpdated(const Stats& stats) override {
+    ++num_stats_updates_;
+    TaskQueuePacedSender::OnStatsUpdated(stats);
+  }
+
+  size_t num_stats_updates_ = 0;
+};
 }  // namespace
 
 namespace test {
@@ -88,11 +120,11 @@
   TEST(TaskQueuePacedSenderTest, PacesPackets) {
     GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
     MockPacketRouter packet_router;
-    TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router,
-                               /*event_log=*/nullptr,
-                               /*field_trials=*/nullptr,
-                               time_controller.GetTaskQueueFactory(),
-                               PacingController::kMinSleepTime);
+    TaskQueuePacedSenderForTest pacer(
+        time_controller.GetClock(), &packet_router,
+        /*event_log=*/nullptr,
+        /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(),
+        PacingController::kMinSleepTime);
 
     // Insert a number of packets, covering one second.
     static constexpr size_t kPacketsToSend = 42;
@@ -127,11 +159,11 @@
   TEST(TaskQueuePacedSenderTest, ReschedulesProcessOnRateChange) {
     GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
     MockPacketRouter packet_router;
-    TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router,
-                               /*event_log=*/nullptr,
-                               /*field_trials=*/nullptr,
-                               time_controller.GetTaskQueueFactory(),
-                               PacingController::kMinSleepTime);
+    TaskQueuePacedSenderForTest pacer(
+        time_controller.GetClock(), &packet_router,
+        /*event_log=*/nullptr,
+        /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(),
+        PacingController::kMinSleepTime);
 
     // Insert a number of packets to be sent 200ms apart.
     const size_t kPacketsPerSecond = 5;
@@ -178,11 +210,11 @@
   TEST(TaskQueuePacedSenderTest, SendsAudioImmediately) {
     GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
     MockPacketRouter packet_router;
-    TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router,
-                               /*event_log=*/nullptr,
-                               /*field_trials=*/nullptr,
-                               time_controller.GetTaskQueueFactory(),
-                               PacingController::kMinSleepTime);
+    TaskQueuePacedSenderForTest pacer(
+        time_controller.GetClock(), &packet_router,
+        /*event_log=*/nullptr,
+        /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(),
+        PacingController::kMinSleepTime);
 
     const DataRate kPacingDataRate = DataRate::KilobitsPerSec(125);
     const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
@@ -210,11 +242,11 @@
     const TimeDelta kCoalescingWindow = TimeDelta::Millis(5);
     GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
     MockPacketRouter packet_router;
-    TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router,
-                               /*event_log=*/nullptr,
-                               /*field_trials=*/nullptr,
-                               time_controller.GetTaskQueueFactory(),
-                               kCoalescingWindow);
+    TaskQueuePacedSenderForTest pacer(
+        time_controller.GetClock(), &packet_router,
+        /*event_log=*/nullptr,
+        /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(),
+        kCoalescingWindow);
 
     // Set rates so one packet adds one ms of buffer level.
     const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
@@ -246,11 +278,11 @@
     const TimeDelta kCoalescingWindow = TimeDelta::Millis(5);
     GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
     MockPacketRouter packet_router;
-    TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router,
-                               /*event_log=*/nullptr,
-                               /*field_trials=*/nullptr,
-                               time_controller.GetTaskQueueFactory(),
-                               kCoalescingWindow);
+    TaskQueuePacedSenderForTest pacer(
+        time_controller.GetClock(), &packet_router,
+        /*event_log=*/nullptr,
+        /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(),
+        kCoalescingWindow);
 
     // Set rates so one packet adds one ms of buffer level.
     const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
@@ -273,5 +305,102 @@
     time_controller.AdvanceTime(kCoalescingWindow - TimeDelta::Millis(1));
   }
 
+  TEST(TaskQueuePacedSenderTest, RespectedMinTimeBetweenStatsUpdates) {
+    const TimeDelta kCoalescingWindow = TimeDelta::Millis(5);
+    GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
+    MockPacketRouter packet_router;
+    TaskQueuePacedSenderForTest pacer(
+        time_controller.GetClock(), &packet_router,
+        /*event_log=*/nullptr,
+        /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(),
+        kCoalescingWindow);
+    const DataRate kPacingDataRate = DataRate::KilobitsPerSec(300);
+    pacer.SetPacingRates(kPacingDataRate, DataRate::Zero());
+
+    const TimeDelta kMinTimeBetweenStatsUpdates = TimeDelta::Millis(1);
+
+    // Nothing inserted, no stats updates yet.
+    EXPECT_EQ(pacer.num_stats_updates_, 0u);
+
+    // Insert one packet, stats should be updated.
+    pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 1));
+    time_controller.AdvanceTime(TimeDelta::Zero());
+    EXPECT_EQ(pacer.num_stats_updates_, 1u);
+
+    // Advance time half of the min stats update interval, and trigger a
+    // refresh - stats should not be updated yet.
+    time_controller.AdvanceTime(kMinTimeBetweenStatsUpdates / 2);
+    pacer.EnqueuePackets({});
+    time_controller.AdvanceTime(TimeDelta::Zero());
+    EXPECT_EQ(pacer.num_stats_updates_, 1u);
+
+    // Advance time the next half, now stats update is triggered.
+    time_controller.AdvanceTime(kMinTimeBetweenStatsUpdates / 2);
+    pacer.EnqueuePackets({});
+    time_controller.AdvanceTime(TimeDelta::Zero());
+    EXPECT_EQ(pacer.num_stats_updates_, 2u);
+  }
+
+  TEST(TaskQueuePacedSenderTest, ThrottlesStatsUpdates) {
+    const TimeDelta kCoalescingWindow = TimeDelta::Millis(5);
+    GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
+    MockPacketRouter packet_router;
+    TaskQueuePacedSenderForTest pacer(
+        time_controller.GetClock(), &packet_router,
+        /*event_log=*/nullptr,
+        /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(),
+        kCoalescingWindow);
+
+    // Set rates so one packet adds 10ms of buffer level.
+    const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
+    const TimeDelta kPacketPacingTime = TimeDelta::Millis(10);
+    const DataRate kPacingDataRate = kPacketSize / kPacketPacingTime;
+    const TimeDelta kMinTimeBetweenStatsUpdates = TimeDelta::Millis(1);
+    const TimeDelta kMaxTimeBetweenStatsUpdates = TimeDelta::Millis(33);
+
+    // Nothing inserted, no stats updates yet.
+    size_t num_expected_stats_updates = 0;
+    EXPECT_EQ(pacer.num_stats_updates_, num_expected_stats_updates);
+    pacer.SetPacingRates(kPacingDataRate, DataRate::Zero());
+    time_controller.AdvanceTime(kMinTimeBetweenStatsUpdates);
+    // Updating pacing rates refreshes stats.
+    EXPECT_EQ(pacer.num_stats_updates_, ++num_expected_stats_updates);
+
+    // Record time when we insert first packet, this triggers the scheduled
+    // stats updating.
+    Clock* const clock = time_controller.GetClock();
+    const Timestamp start_time = clock->CurrentTime();
+
+    while (clock->CurrentTime() - start_time <=
+           kMaxTimeBetweenStatsUpdates - kPacketPacingTime) {
+      // Enqueue packet, expect stats update.
+      pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 1));
+      time_controller.AdvanceTime(TimeDelta::Zero());
+      EXPECT_EQ(pacer.num_stats_updates_, ++num_expected_stats_updates);
+
+      // Advance time to halfway through pacing time, expect another stats
+      // update.
+      time_controller.AdvanceTime(kPacketPacingTime / 2);
+      pacer.EnqueuePackets({});
+      time_controller.AdvanceTime(TimeDelta::Zero());
+      EXPECT_EQ(pacer.num_stats_updates_, ++num_expected_stats_updates);
+
+      // Advance time the rest of the way.
+      time_controller.AdvanceTime(kPacketPacingTime / 2);
+    }
+
+    // At this point, the pace queue is drained so there is no more intersting
+    // update to be made - but there is still as schduled task that should run
+    // |kMaxTimeBetweenStatsUpdates| after the first update.
+    time_controller.AdvanceTime(start_time + kMaxTimeBetweenStatsUpdates -
+                                clock->CurrentTime());
+    EXPECT_EQ(pacer.num_stats_updates_, ++num_expected_stats_updates);
+
+    // Advance time a significant time - don't expect any more calls as stats
+    // updating does not happen when queue is drained.
+    time_controller.AdvanceTime(TimeDelta::Millis(400));
+    EXPECT_EQ(pacer.num_stats_updates_, num_expected_stats_updates);
+  }
+
 }  // namespace test
 }  // namespace webrtc