Migrate CallStats and RtpStreamsSynchronizer timers over to RepeatingTask

Bug: none
Change-Id: Ib49a3de74c6d3a6d4ea158383a5e4b69a1e58ab9
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/175000
Reviewed-by: Sebastian Jansson <srte@webrtc.org>
Commit-Queue: Tommi <tommi@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#31252}
diff --git a/rtc_base/task_utils/repeating_task.cc b/rtc_base/task_utils/repeating_task.cc
index 4e460bb..71911e6 100644
--- a/rtc_base/task_utils/repeating_task.cc
+++ b/rtc_base/task_utils/repeating_task.cc
@@ -20,12 +20,14 @@
 RepeatingTaskBase::RepeatingTaskBase(TaskQueueBase* task_queue,
                                      TimeDelta first_delay)
     : task_queue_(task_queue),
-      next_run_time_(Timestamp::Micros(rtc::TimeMicros()) + first_delay) {}
+      next_run_time_(Timestamp::Micros(rtc::TimeMicros()) + first_delay) {
+  sequence_checker_.Detach();
+}
 
 RepeatingTaskBase::~RepeatingTaskBase() = default;
 
 bool RepeatingTaskBase::Run() {
-  RTC_DCHECK_RUN_ON(task_queue_);
+  RTC_DCHECK_RUN_ON(&sequence_checker_);
   // Return true to tell the TaskQueue to destruct this object.
   if (next_run_time_.IsPlusInfinity())
     return true;
@@ -51,6 +53,7 @@
 }
 
 void RepeatingTaskBase::Stop() {
+  RTC_DCHECK_RUN_ON(&sequence_checker_);
   RTC_DCHECK(next_run_time_.IsFinite());
   next_run_time_ = Timestamp::PlusInfinity();
 }
@@ -75,7 +78,6 @@
 
 void RepeatingTaskHandle::Stop() {
   if (repeating_task_) {
-    RTC_DCHECK_RUN_ON(repeating_task_->task_queue_);
     repeating_task_->Stop();
     repeating_task_ = nullptr;
   }
diff --git a/rtc_base/task_utils/repeating_task.h b/rtc_base/task_utils/repeating_task.h
index 1545d6f..75d03bf 100644
--- a/rtc_base/task_utils/repeating_task.h
+++ b/rtc_base/task_utils/repeating_task.h
@@ -20,7 +20,6 @@
 #include "api/units/time_delta.h"
 #include "api/units/timestamp.h"
 #include "rtc_base/synchronization/sequence_checker.h"
-#include "rtc_base/thread_checker.h"
 
 namespace webrtc {
 
@@ -31,18 +30,25 @@
  public:
   RepeatingTaskBase(TaskQueueBase* task_queue, TimeDelta first_delay);
   ~RepeatingTaskBase() override;
-  virtual TimeDelta RunClosure() = 0;
+
+  void Stop();
 
  private:
-  friend class ::webrtc::RepeatingTaskHandle;
+  virtual TimeDelta RunClosure() = 0;
 
   bool Run() final;
-  void Stop() RTC_RUN_ON(task_queue_);
 
   TaskQueueBase* const task_queue_;
   // This is always finite, except for the special case where it's PlusInfinity
   // to signal that the task should stop.
-  Timestamp next_run_time_ RTC_GUARDED_BY(task_queue_);
+  Timestamp next_run_time_ RTC_GUARDED_BY(sequence_checker_);
+  // We use a SequenceChecker to check for correct usage instead of using
+  // RTC_DCHECK_RUN_ON(task_queue_). This is to work around a compatibility
+  // issue with some TQ implementations such as rtc::Thread that don't
+  // consistently set themselves as the 'current' TQ when running tasks.
+  // The SequenceChecker detects those implementations differently but gives
+  // the same effect as far as thread safety goes.
+  SequenceChecker sequence_checker_;
 };
 
 // The template closure pattern is based on rtc::ClosureTask.
@@ -61,9 +67,9 @@
         "");
   }
 
+ private:
   TimeDelta RunClosure() override { return closure_(); }
 
- private:
   typename std::remove_const<
       typename std::remove_reference<Closure>::type>::type closure_;
 };
diff --git a/video/call_stats2.cc b/video/call_stats2.cc
index ce68127..d190294 100644
--- a/video/call_stats2.cc
+++ b/video/call_stats2.cc
@@ -64,9 +64,10 @@
 
 }  // namespace
 
+constexpr TimeDelta CallStats::kUpdateInterval;
+
 CallStats::CallStats(Clock* clock, TaskQueueBase* task_queue)
     : clock_(clock),
-      last_process_time_(clock_->TimeInMilliseconds()),
       max_rtt_ms_(-1),
       avg_rtt_ms_(-1),
       sum_avg_rtt_ms_(0),
@@ -75,39 +76,29 @@
       task_queue_(task_queue) {
   RTC_DCHECK(task_queue_);
   process_thread_checker_.Detach();
-  task_queue_->PostDelayedTask(
-      ToQueuedTask(task_safety_, [this]() { RunTimer(); }), kUpdateIntervalMs);
+  repeating_task_ =
+      RepeatingTaskHandle::DelayedStart(task_queue_, kUpdateInterval, [this]() {
+        UpdateAndReport();
+        return kUpdateInterval;
+      });
 }
 
 CallStats::~CallStats() {
   RTC_DCHECK_RUN_ON(&construction_thread_checker_);
   RTC_DCHECK(observers_.empty());
 
+  repeating_task_.Stop();
+
   UpdateHistograms();
 }
 
-void CallStats::RunTimer() {
-  RTC_DCHECK_RUN_ON(&construction_thread_checker_);
-
-  UpdateAndReport();
-
-  uint32_t interval =
-      last_process_time_ + kUpdateIntervalMs - clock_->TimeInMilliseconds();
-
-  task_queue_->PostDelayedTask(
-      ToQueuedTask(task_safety_, [this]() { RunTimer(); }), interval);
-}
-
 void CallStats::UpdateAndReport() {
   RTC_DCHECK_RUN_ON(&construction_thread_checker_);
 
-  int64_t now = clock_->TimeInMilliseconds();
-  last_process_time_ = now;
-
   // |avg_rtt_ms_| is allowed to be read on the construction thread since that's
   // the only thread that modifies the value.
   int64_t avg_rtt_ms = avg_rtt_ms_;
-  RemoveOldReports(now, &reports_);
+  RemoveOldReports(clock_->CurrentTime().ms(), &reports_);
   max_rtt_ms_ = GetMaxRttMs(reports_);
   avg_rtt_ms = GetNewAvgRttMs(reports_, avg_rtt_ms);
   {
diff --git a/video/call_stats2.h b/video/call_stats2.h
index 49d2db7..8f53358 100644
--- a/video/call_stats2.h
+++ b/video/call_stats2.h
@@ -14,6 +14,7 @@
 #include <list>
 #include <memory>
 
+#include "api/units/timestamp.h"
 #include "modules/include/module_common_types.h"
 #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
 #include "rtc_base/constructor_magic.h"
@@ -21,6 +22,7 @@
 #include "rtc_base/synchronization/sequence_checker.h"
 #include "rtc_base/task_queue.h"
 #include "rtc_base/task_utils/pending_task_safety_flag.h"
+#include "rtc_base/task_utils/repeating_task.h"
 #include "system_wrappers/include/clock.h"
 
 namespace webrtc {
@@ -29,7 +31,7 @@
 class CallStats {
  public:
   // Time interval for updating the observers.
-  static constexpr int64_t kUpdateIntervalMs = 1000;
+  static constexpr TimeDelta kUpdateInterval = TimeDelta::Millis(1000);
 
   CallStats(Clock* clock, TaskQueueBase* task_queue);
   ~CallStats();
@@ -70,8 +72,6 @@
   void OnRttUpdate(int64_t rtt);
   int64_t LastProcessedRttFromProcessThread() const;
 
-  void RunTimer();
-
   void UpdateAndReport();
 
   // This method must only be called when the process thread is not
@@ -102,8 +102,10 @@
 
   Clock* const clock_;
 
-  // The last time 'Process' resulted in statistic update.
-  int64_t last_process_time_ RTC_GUARDED_BY(construction_thread_checker_);
+  // Used to regularly call UpdateAndReport().
+  RepeatingTaskHandle repeating_task_
+      RTC_GUARDED_BY(construction_thread_checker_);
+
   // The last RTT in the statistics update (zero if there is no valid estimate).
   int64_t max_rtt_ms_ RTC_GUARDED_BY(construction_thread_checker_);
 
diff --git a/video/call_stats2_unittest.cc b/video/call_stats2_unittest.cc
index 58af6fd..73fe4b4 100644
--- a/video/call_stats2_unittest.cc
+++ b/video/call_stats2_unittest.cc
@@ -96,12 +96,13 @@
       .Times(2)
       .WillOnce(InvokeWithoutArgs([this] {
         // Advance clock and verify we get an update.
-        fake_clock_.AdvanceTimeMilliseconds(CallStats::kUpdateIntervalMs);
+        fake_clock_.AdvanceTimeMilliseconds(CallStats::kUpdateInterval.ms());
       }))
       .WillRepeatedly(InvokeWithoutArgs([this] {
         AsyncSimulateRttUpdate(kRtt2);
         // Advance clock just too little to get an update.
-        fake_clock_.AdvanceTimeMilliseconds(CallStats::kUpdateIntervalMs - 1);
+        fake_clock_.AdvanceTimeMilliseconds(CallStats::kUpdateInterval.ms() -
+                                            1);
       }));
 
   // In case you're reading this and wondering how this number is arrived at,
@@ -256,7 +257,7 @@
       .Times(AnyNumber())
       .WillOnce(InvokeWithoutArgs([this] {
         EXPECT_EQ(kAvgRtt1, call_stats_.LastProcessedRtt());
-        fake_clock_.AdvanceTimeMilliseconds(CallStats::kUpdateIntervalMs);
+        fake_clock_.AdvanceTimeMilliseconds(CallStats::kUpdateInterval.ms());
         AsyncSimulateRttUpdate(kRttLow);
         AsyncSimulateRttUpdate(kRttHigh);
       }))
@@ -272,7 +273,7 @@
 
   // Set a first values and verify that LastProcessedRtt initially returns the
   // average rtt.
-  fake_clock_.AdvanceTimeMilliseconds(CallStats::kUpdateIntervalMs);
+  fake_clock_.AdvanceTimeMilliseconds(CallStats::kUpdateInterval.ms());
   AsyncSimulateRttUpdate(kRttLow);
   loop_.Run();
   EXPECT_EQ(kAvgRtt2, call_stats_.LastProcessedRtt());
@@ -292,7 +293,7 @@
   AsyncSimulateRttUpdate(kRtt);
   loop_.Run();
   fake_clock_.AdvanceTimeMilliseconds(metrics::kMinRunTimeInSeconds *
-                                      CallStats::kUpdateIntervalMs);
+                                      CallStats::kUpdateInterval.ms());
   AsyncSimulateRttUpdate(kRtt);
   loop_.Run();
 
diff --git a/video/rtp_streams_synchronizer2.cc b/video/rtp_streams_synchronizer2.cc
index 7e3bed1..49be355 100644
--- a/video/rtp_streams_synchronizer2.cc
+++ b/video/rtp_streams_synchronizer2.cc
@@ -23,7 +23,7 @@
 namespace {
 // Time interval for logging stats.
 constexpr int64_t kStatsLogIntervalMs = 10000;
-constexpr uint32_t kSyncIntervalMs = 1000;
+constexpr TimeDelta kSyncInterval = TimeDelta::Millis(1000);
 
 bool UpdateMeasurements(StreamSynchronization::Measurements* stream,
                         const Syncable::Info& info) {
@@ -34,19 +34,20 @@
       info.capture_time_ntp_secs, info.capture_time_ntp_frac,
       info.capture_time_source_clock, &new_rtcp_sr);
 }
+
 }  // namespace
 
 RtpStreamsSynchronizer::RtpStreamsSynchronizer(TaskQueueBase* main_queue,
                                                Syncable* syncable_video)
     : task_queue_(main_queue),
       syncable_video_(syncable_video),
-      last_sync_time_(rtc::TimeNanos()),
       last_stats_log_ms_(rtc::TimeMillis()) {
   RTC_DCHECK(syncable_video);
 }
 
 RtpStreamsSynchronizer::~RtpStreamsSynchronizer() {
   RTC_DCHECK_RUN_ON(&main_checker_);
+  repeating_task_.Stop();
 }
 
 void RtpStreamsSynchronizer::ConfigureSync(Syncable* syncable_audio) {
@@ -58,52 +59,32 @@
 
   syncable_audio_ = syncable_audio;
   sync_.reset(nullptr);
-  if (!syncable_audio_)
+  if (!syncable_audio_) {
+    repeating_task_.Stop();
     return;
+  }
 
   sync_.reset(
       new StreamSynchronization(syncable_video_->id(), syncable_audio_->id()));
-  QueueTimer();
-}
 
-void RtpStreamsSynchronizer::QueueTimer() {
-  RTC_DCHECK_RUN_ON(&main_checker_);
-  if (timer_running_)
+  if (repeating_task_.Running())
     return;
 
-  timer_running_ = true;
-  uint32_t delay = kSyncIntervalMs - (rtc::TimeNanos() - last_sync_time_) /
-                                         rtc::kNumNanosecsPerMillisec;
-  if (delay > kSyncIntervalMs) {
-    // TODO(tommi): |linux_chromium_tsan_rel_ng| bot has shown a failure when
-    // running WebRtcBrowserTest.CallAndModifyStream, indicating that the
-    // underlying clock is not reliable. Possibly there's a fake clock being
-    // used as the tests are flaky. Look into and fix.
-    RTC_LOG(LS_ERROR) << "Unexpected timer value: " << delay;
-    delay = kSyncIntervalMs;
-  }
-
-  RTC_DCHECK_LE(delay, kSyncIntervalMs);
-  task_queue_->PostDelayedTask(ToQueuedTask(task_safety_,
-                                            [this] {
-                                              RTC_DCHECK_RUN_ON(&main_checker_);
-                                              timer_running_ = false;
-                                              UpdateDelay();
-                                            }),
-                               delay);
+  repeating_task_ =
+      RepeatingTaskHandle::DelayedStart(task_queue_, kSyncInterval, [this]() {
+        UpdateDelay();
+        return kSyncInterval;
+      });
 }
 
 void RtpStreamsSynchronizer::UpdateDelay() {
   RTC_DCHECK_RUN_ON(&main_checker_);
-  last_sync_time_ = rtc::TimeNanos();
 
   if (!syncable_audio_)
     return;
 
   RTC_DCHECK(sync_.get());
 
-  QueueTimer();
-
   bool log_stats = false;
   const int64_t now_ms = rtc::TimeMillis();
   if (now_ms - last_stats_log_ms_ > kStatsLogIntervalMs) {
diff --git a/video/rtp_streams_synchronizer2.h b/video/rtp_streams_synchronizer2.h
index 83dd0fb..6a522e8 100644
--- a/video/rtp_streams_synchronizer2.h
+++ b/video/rtp_streams_synchronizer2.h
@@ -15,7 +15,7 @@
 
 #include "rtc_base/synchronization/sequence_checker.h"
 #include "rtc_base/task_queue.h"
-#include "rtc_base/task_utils/pending_task_safety_flag.h"
+#include "rtc_base/task_utils/repeating_task.h"
 #include "video/stream_synchronization.h"
 
 namespace webrtc {
@@ -45,7 +45,6 @@
                                double* estimated_freq_khz) const;
 
  private:
-  void QueueTimer();
   void UpdateDelay();
 
   TaskQueueBase* const task_queue_;
@@ -65,12 +64,8 @@
       RTC_GUARDED_BY(main_checker_);
   StreamSynchronization::Measurements video_measurement_
       RTC_GUARDED_BY(main_checker_);
-  int64_t last_sync_time_ RTC_GUARDED_BY(&main_checker_);
+  RepeatingTaskHandle repeating_task_ RTC_GUARDED_BY(main_checker_);
   int64_t last_stats_log_ms_ RTC_GUARDED_BY(&main_checker_);
-  bool timer_running_ RTC_GUARDED_BY(main_checker_) = false;
-
-  // Used to signal destruction to potentially pending tasks.
-  ScopedTaskSafety task_safety_;
 };
 
 }  // namespace internal
diff --git a/video/video_stream_encoder.h b/video/video_stream_encoder.h
index 5c72167..13b2bdf 100644
--- a/video/video_stream_encoder.h
+++ b/video/video_stream_encoder.h
@@ -38,12 +38,12 @@
 #include "rtc_base/rate_statistics.h"
 #include "rtc_base/synchronization/sequence_checker.h"
 #include "rtc_base/task_queue.h"
+#include "rtc_base/thread_checker.h"
 #include "system_wrappers/include/clock.h"
 #include "video/adaptation/video_stream_encoder_resource_manager.h"
 #include "video/encoder_bitrate_adjuster.h"
 #include "video/frame_encode_metadata_writer.h"
 #include "video/video_source_sink_controller.h"
-
 namespace webrtc {
 
 // VideoStreamEncoder represent a video encoder that accepts raw video frames as