[SourceTracker] Move state to the worker thread, remove mutex.

This is in preparation of using the state that SourceTracker manages
for more things than only getContributingSources. Audio levels reported
via getStats(), aren't consistent with levels reported via getCS.

Since more operations will be derived from the ST owned data, moving
the management of it away from the audio thread, reduces the potential
of contention.

Bug: webrtc:14029, webrtc:7517, webrtc:15119
Change-Id: I553f7e473316a1c61eeb43ded905a18242a04424
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/302280
Reviewed-by: Danil Chapovalov <danilchap@webrtc.org>
Commit-Queue: Tomas Gunnarsson <tommi@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#39943}
diff --git a/audio/BUILD.gn b/audio/BUILD.gn
index 1a06502..1db2eee 100644
--- a/audio/BUILD.gn
+++ b/audio/BUILD.gn
@@ -213,6 +213,7 @@
       "../test:mock_transformable_frame",
       "../test:mock_transport",
       "../test:rtp_test_utils",
+      "../test:run_loop",
       "../test:scoped_key_value_config",
       "../test:test_common",
       "../test:test_support",
diff --git a/audio/audio_receive_stream.cc b/audio/audio_receive_stream.cc
index 6d38b26..bde48aa 100644
--- a/audio/audio_receive_stream.cc
+++ b/audio/audio_receive_stream.cc
@@ -373,7 +373,8 @@
                                               AudioFrame* audio_frame) {
   AudioMixer::Source::AudioFrameInfo audio_frame_info =
       channel_receive_->GetAudioFrameWithInfo(sample_rate_hz, audio_frame);
-  if (audio_frame_info != AudioMixer::Source::AudioFrameInfo::kError) {
+  if (audio_frame_info != AudioMixer::Source::AudioFrameInfo::kError &&
+      !audio_frame->packet_infos_.empty()) {
     source_tracker_.OnFrameDelivered(audio_frame->packet_infos_);
   }
   return audio_frame_info;
diff --git a/audio/audio_receive_stream_unittest.cc b/audio/audio_receive_stream_unittest.cc
index 0ddbe29..451d5f9 100644
--- a/audio/audio_receive_stream_unittest.cc
+++ b/audio/audio_receive_stream_unittest.cc
@@ -29,6 +29,7 @@
 #include "test/gtest.h"
 #include "test/mock_audio_decoder_factory.h"
 #include "test/mock_transport.h"
+#include "test/run_loop.h"
 
 namespace webrtc {
 namespace test {
@@ -215,6 +216,7 @@
 }
 
 TEST(AudioReceiveStreamTest, ConstructDestruct) {
+  test::RunLoop loop;
   for (bool use_null_audio_processing : {false, true}) {
     ConfigHelper helper(use_null_audio_processing);
     auto recv_stream = helper.CreateAudioReceiveStream();
@@ -223,6 +225,7 @@
 }
 
 TEST(AudioReceiveStreamTest, ReceiveRtcpPacket) {
+  test::RunLoop loop;
   for (bool use_null_audio_processing : {false, true}) {
     ConfigHelper helper(use_null_audio_processing);
     auto recv_stream = helper.CreateAudioReceiveStream();
@@ -236,6 +239,7 @@
 }
 
 TEST(AudioReceiveStreamTest, GetStats) {
+  test::RunLoop loop;
   for (bool use_null_audio_processing : {false, true}) {
     ConfigHelper helper(use_null_audio_processing);
     auto recv_stream = helper.CreateAudioReceiveStream();
@@ -321,6 +325,7 @@
 }
 
 TEST(AudioReceiveStreamTest, SetGain) {
+  test::RunLoop loop;
   for (bool use_null_audio_processing : {false, true}) {
     ConfigHelper helper(use_null_audio_processing);
     auto recv_stream = helper.CreateAudioReceiveStream();
@@ -332,6 +337,7 @@
 }
 
 TEST(AudioReceiveStreamTest, StreamsShouldBeAddedToMixerOnceOnStart) {
+  test::RunLoop loop;
   for (bool use_null_audio_processing : {false, true}) {
     ConfigHelper helper1(use_null_audio_processing);
     ConfigHelper helper2(helper1.audio_mixer(), use_null_audio_processing);
@@ -366,6 +372,7 @@
 }
 
 TEST(AudioReceiveStreamTest, ReconfigureWithUpdatedConfig) {
+  test::RunLoop loop;
   for (bool use_null_audio_processing : {false, true}) {
     ConfigHelper helper(use_null_audio_processing);
     auto recv_stream = helper.CreateAudioReceiveStream();
@@ -393,6 +400,7 @@
 }
 
 TEST(AudioReceiveStreamTest, ReconfigureWithFrameDecryptor) {
+  test::RunLoop loop;
   for (bool use_null_audio_processing : {false, true}) {
     ConfigHelper helper(use_null_audio_processing);
     auto recv_stream = helper.CreateAudioReceiveStream();
diff --git a/modules/rtp_rtcp/source/source_tracker.cc b/modules/rtp_rtcp/source/source_tracker.cc
index 7a5cbac..13d848d 100644
--- a/modules/rtp_rtcp/source/source_tracker.cc
+++ b/modules/rtp_rtcp/source/source_tracker.cc
@@ -17,26 +17,37 @@
 
 namespace webrtc {
 
-constexpr int64_t SourceTracker::kTimeoutMs;
+SourceTracker::SourceTracker(Clock* clock)
+    : worker_thread_(TaskQueueBase::Current()), clock_(clock) {
+  RTC_DCHECK(worker_thread_);
+  RTC_DCHECK(clock_);
+}
 
-SourceTracker::SourceTracker(Clock* clock) : clock_(clock) {}
-
-void SourceTracker::OnFrameDelivered(const RtpPacketInfos& packet_infos) {
+void SourceTracker::OnFrameDelivered(RtpPacketInfos packet_infos) {
   if (packet_infos.empty()) {
     return;
   }
 
-  TRACE_EVENT0("webrtc", "SourceTracker::OnFrameDelivered");
+  Timestamp now = clock_->CurrentTime();
+  worker_thread_->PostTask(
+      SafeTask(worker_safety_.flag(),
+               [this, packet_infos = std::move(packet_infos), now]() {
+                 RTC_DCHECK_RUN_ON(worker_thread_);
+                 OnFrameDeliveredInternal(now, packet_infos);
+               }));
+}
 
-  int64_t now_ms = clock_->TimeInMilliseconds();
-  MutexLock lock_scope(&lock_);
+void SourceTracker::OnFrameDeliveredInternal(
+    Timestamp now,
+    const RtpPacketInfos& packet_infos) {
+  TRACE_EVENT0("webrtc", "SourceTracker::OnFrameDelivered");
 
   for (const RtpPacketInfo& packet_info : packet_infos) {
     for (uint32_t csrc : packet_info.csrcs()) {
       SourceKey key(RtpSourceType::CSRC, csrc);
       SourceEntry& entry = UpdateEntry(key);
 
-      entry.timestamp_ms = now_ms;
+      entry.timestamp = now;
       entry.audio_level = packet_info.audio_level();
       entry.absolute_capture_time = packet_info.absolute_capture_time();
       entry.local_capture_clock_offset =
@@ -47,30 +58,28 @@
     SourceKey key(RtpSourceType::SSRC, packet_info.ssrc());
     SourceEntry& entry = UpdateEntry(key);
 
-    entry.timestamp_ms = now_ms;
+    entry.timestamp = now;
     entry.audio_level = packet_info.audio_level();
     entry.absolute_capture_time = packet_info.absolute_capture_time();
     entry.local_capture_clock_offset = packet_info.local_capture_clock_offset();
     entry.rtp_timestamp = packet_info.rtp_timestamp();
   }
 
-  PruneEntries(now_ms);
+  PruneEntries(now);
 }
 
 std::vector<RtpSource> SourceTracker::GetSources() const {
+  RTC_DCHECK_RUN_ON(worker_thread_);
+
+  PruneEntries(clock_->CurrentTime());
+
   std::vector<RtpSource> sources;
-
-  int64_t now_ms = clock_->TimeInMilliseconds();
-  MutexLock lock_scope(&lock_);
-
-  PruneEntries(now_ms);
-
   for (const auto& pair : list_) {
     const SourceKey& key = pair.first;
     const SourceEntry& entry = pair.second;
 
     sources.emplace_back(
-        entry.timestamp_ms, key.source, key.source_type, entry.rtp_timestamp,
+        entry.timestamp.ms(), key.source, key.source_type, entry.rtp_timestamp,
         RtpSource::Extensions{
             .audio_level = entry.audio_level,
             .absolute_capture_time = entry.absolute_capture_time,
@@ -97,10 +106,9 @@
   return list_.front().second;
 }
 
-void SourceTracker::PruneEntries(int64_t now_ms) const {
-  int64_t prune_ms = now_ms - kTimeoutMs;
-
-  while (!list_.empty() && list_.back().second.timestamp_ms < prune_ms) {
+void SourceTracker::PruneEntries(Timestamp now) const {
+  Timestamp prune = now - kTimeout;
+  while (!list_.empty() && list_.back().second.timestamp < prune) {
     map_.erase(list_.back().first);
     list_.pop_back();
   }
diff --git a/modules/rtp_rtcp/source/source_tracker.h b/modules/rtp_rtcp/source/source_tracker.h
index f9e8354..30a5b8a 100644
--- a/modules/rtp_rtcp/source/source_tracker.h
+++ b/modules/rtp_rtcp/source/source_tracker.h
@@ -19,9 +19,11 @@
 
 #include "absl/types/optional.h"
 #include "api/rtp_packet_infos.h"
+#include "api/task_queue/pending_task_safety_flag.h"
+#include "api/task_queue/task_queue_base.h"
 #include "api/transport/rtp/rtp_source.h"
 #include "api/units/time_delta.h"
-#include "rtc_base/synchronization/mutex.h"
+#include "api/units/timestamp.h"
 #include "rtc_base/time_utils.h"
 #include "system_wrappers/include/clock.h"
 
@@ -36,7 +38,7 @@
  public:
   // Amount of time before the entry associated with an update is removed. See:
   // https://w3c.github.io/webrtc-pc/#dom-rtcrtpreceiver-getcontributingsources
-  static constexpr int64_t kTimeoutMs = 10000;  // 10 seconds
+  static constexpr TimeDelta kTimeout = TimeDelta::Seconds(10);
 
   explicit SourceTracker(Clock* clock);
 
@@ -47,7 +49,7 @@
 
   // Updates the source entries when a frame is delivered to the
   // RTCRtpReceiver's MediaStreamTrack.
-  void OnFrameDelivered(const RtpPacketInfos& packet_infos);
+  void OnFrameDelivered(RtpPacketInfos packet_infos);
 
   // Returns an `RtpSource` for each unique SSRC and CSRC identifier updated in
   // the last `kTimeoutMs` milliseconds. Entries appear in reverse chronological
@@ -83,11 +85,11 @@
     // Timestamp indicating the most recent time a frame from an RTP packet,
     // originating from this source, was delivered to the RTCRtpReceiver's
     // MediaStreamTrack. Its reference clock is the outer class's `clock_`.
-    int64_t timestamp_ms;
+    Timestamp timestamp = Timestamp::MinusInfinity();
 
     // Audio level from an RFC 6464 or RFC 6465 header extension received with
     // the most recent packet used to assemble the frame associated with
-    // `timestamp_ms`. May be absent. Only relevant for audio receivers. See the
+    // `timestamp`. May be absent. Only relevant for audio receivers. See the
     // specs for `RTCRtpContributingSource` for more info.
     absl::optional<uint8_t> audio_level;
 
@@ -104,8 +106,8 @@
     absl::optional<TimeDelta> local_capture_clock_offset;
 
     // RTP timestamp of the most recent packet used to assemble the frame
-    // associated with `timestamp_ms`.
-    uint32_t rtp_timestamp;
+    // associated with `timestamp`.
+    uint32_t rtp_timestamp = 0;
   };
 
   using SourceList = std::list<std::pair<const SourceKey, SourceEntry>>;
@@ -114,23 +116,27 @@
                                        SourceKeyHasher,
                                        SourceKeyComparator>;
 
+  void OnFrameDeliveredInternal(Timestamp now,
+                                const RtpPacketInfos& packet_infos)
+      RTC_RUN_ON(worker_thread_);
+
   // Updates an entry by creating it (if it didn't previously exist) and moving
   // it to the front of the list. Returns a reference to the entry.
-  SourceEntry& UpdateEntry(const SourceKey& key)
-      RTC_EXCLUSIVE_LOCKS_REQUIRED(lock_);
+  SourceEntry& UpdateEntry(const SourceKey& key) RTC_RUN_ON(worker_thread_);
 
   // Removes entries that have timed out. Marked as "const" so that we can do
   // pruning in getters.
-  void PruneEntries(int64_t now_ms) const RTC_EXCLUSIVE_LOCKS_REQUIRED(lock_);
+  void PruneEntries(Timestamp now) const RTC_RUN_ON(worker_thread_);
 
+  TaskQueueBase* const worker_thread_;
   Clock* const clock_;
-  mutable Mutex lock_;
 
   // Entries are stored in reverse chronological order (i.e. with the most
   // recently updated entries appearing first). Mutability is needed for timeout
   // pruning in const functions.
-  mutable SourceList list_ RTC_GUARDED_BY(lock_);
-  mutable SourceMap map_ RTC_GUARDED_BY(lock_);
+  mutable SourceList list_ RTC_GUARDED_BY(worker_thread_);
+  mutable SourceMap map_ RTC_GUARDED_BY(worker_thread_);
+  ScopedTaskSafety worker_safety_;
 };
 
 }  // namespace webrtc
diff --git a/modules/rtp_rtcp/source/source_tracker_unittest.cc b/modules/rtp_rtcp/source/source_tracker_unittest.cc
index b3e3102..c11142b 100644
--- a/modules/rtp_rtcp/source/source_tracker_unittest.cc
+++ b/modules/rtp_rtcp/source/source_tracker_unittest.cc
@@ -25,6 +25,7 @@
 #include "system_wrappers/include/ntp_time.h"
 #include "test/gmock.h"
 #include "test/gtest.h"
+#include "test/time_controller/simulated_time_controller.h"
 
 namespace webrtc {
 namespace {
@@ -121,29 +122,29 @@
     return RtpPacketInfos(std::move(packet_infos));
   }
 
-  int64_t GenerateClockAdvanceTimeMilliseconds() {
+  TimeDelta GenerateClockAdvanceTime() {
     double roll = std::uniform_real_distribution<double>(0.0, 1.0)(generator_);
 
     if (roll < 0.05) {
-      return 0;
+      return TimeDelta::Zero();
     }
 
     if (roll < 0.08) {
-      return SourceTracker::kTimeoutMs - 1;
+      return SourceTracker::kTimeout - TimeDelta::Millis(1);
     }
 
     if (roll < 0.11) {
-      return SourceTracker::kTimeoutMs;
+      return SourceTracker::kTimeout;
     }
 
     if (roll < 0.19) {
-      return std::uniform_int_distribution<int64_t>(
-          SourceTracker::kTimeoutMs,
-          SourceTracker::kTimeoutMs * 1000)(generator_);
+      return TimeDelta::Millis(std::uniform_int_distribution<int64_t>(
+          SourceTracker::kTimeout.ms(),
+          SourceTracker::kTimeout.ms() * 1000)(generator_));
     }
 
-    return std::uniform_int_distribution<int64_t>(
-        1, SourceTracker::kTimeoutMs - 1)(generator_);
+    return TimeDelta::Millis(std::uniform_int_distribution<int64_t>(
+        1, SourceTracker::kTimeout.ms() - 1)(generator_));
   }
 
  private:
@@ -209,6 +210,10 @@
         std::uniform_int_distribution<int64_t>()(generator_));
   }
 
+ protected:
+  GlobalSimulatedTimeController time_controller_{Timestamp::Seconds(1000)};
+
+ private:
   const uint32_t ssrcs_count_;
   const uint32_t csrcs_count_;
 
@@ -220,9 +225,8 @@
 TEST_P(SourceTrackerRandomTest, RandomOperations) {
   constexpr size_t kIterationsCount = 200;
 
-  SimulatedClock clock(1000000000000ULL);
-  SourceTracker actual_tracker(&clock);
-  ExpectedSourceTracker expected_tracker(&clock);
+  SourceTracker actual_tracker(time_controller_.GetClock());
+  ExpectedSourceTracker expected_tracker(time_controller_.GetClock());
 
   ASSERT_THAT(actual_tracker.GetSources(), IsEmpty());
   ASSERT_THAT(expected_tracker.GetSources(), IsEmpty());
@@ -233,8 +237,7 @@
     actual_tracker.OnFrameDelivered(packet_infos);
     expected_tracker.OnFrameDelivered(packet_infos);
 
-    clock.AdvanceTimeMilliseconds(GenerateClockAdvanceTimeMilliseconds());
-
+    time_controller_.AdvanceTime(GenerateClockAdvanceTime());
     ASSERT_THAT(actual_tracker.GetSources(),
                 ElementsAreArray(expected_tracker.GetSources()));
   }
@@ -246,8 +249,8 @@
                                  /*csrcs_count_=*/Values(0, 1, 3, 7)));
 
 TEST(SourceTrackerTest, StartEmpty) {
-  SimulatedClock clock(1000000000000ULL);
-  SourceTracker tracker(&clock);
+  GlobalSimulatedTimeController time_controller(Timestamp::Seconds(1000));
+  SourceTracker tracker(time_controller.GetClock());
 
   EXPECT_THAT(tracker.GetSources(), IsEmpty());
 }
@@ -269,8 +272,8 @@
   constexpr Timestamp kReceiveTime0 = Timestamp::Millis(60);
   constexpr Timestamp kReceiveTime1 = Timestamp::Millis(70);
 
-  SimulatedClock clock(1000000000000ULL);
-  SourceTracker tracker(&clock);
+  GlobalSimulatedTimeController time_controller(Timestamp::Seconds(1000));
+  SourceTracker tracker(time_controller.GetClock());
 
   tracker.OnFrameDelivered(RtpPacketInfos(
       {RtpPacketInfo(kSsrc1, {kCsrcs0, kCsrcs1}, kRtpTimestamp0, kReceiveTime0)
@@ -282,7 +285,7 @@
            .set_absolute_capture_time(kAbsoluteCaptureTime)
            .set_local_capture_clock_offset(kLocalCaptureClockOffset)}));
 
-  int64_t timestamp_ms = clock.TimeInMilliseconds();
+  Timestamp timestamp = time_controller.GetClock()->CurrentTime();
   constexpr RtpSource::Extensions extensions0 = {
       .audio_level = kAudioLevel0,
       .absolute_capture_time = kAbsoluteCaptureTime,
@@ -292,17 +295,20 @@
       .absolute_capture_time = kAbsoluteCaptureTime,
       .local_capture_clock_offset = kLocalCaptureClockOffset};
 
-  EXPECT_THAT(tracker.GetSources(),
-              ElementsAre(RtpSource(timestamp_ms, kSsrc2, RtpSourceType::SSRC,
-                                    kRtpTimestamp1, extensions1),
-                          RtpSource(timestamp_ms, kCsrcs2, RtpSourceType::CSRC,
-                                    kRtpTimestamp1, extensions1),
-                          RtpSource(timestamp_ms, kSsrc1, RtpSourceType::SSRC,
-                                    kRtpTimestamp0, extensions0),
-                          RtpSource(timestamp_ms, kCsrcs1, RtpSourceType::CSRC,
-                                    kRtpTimestamp0, extensions0),
-                          RtpSource(timestamp_ms, kCsrcs0, RtpSourceType::CSRC,
-                                    kRtpTimestamp0, extensions0)));
+  time_controller.AdvanceTime(TimeDelta::Zero());
+
+  EXPECT_THAT(
+      tracker.GetSources(),
+      ElementsAre(RtpSource(timestamp.ms(), kSsrc2, RtpSourceType::SSRC,
+                            kRtpTimestamp1, extensions1),
+                  RtpSource(timestamp.ms(), kCsrcs2, RtpSourceType::CSRC,
+                            kRtpTimestamp1, extensions1),
+                  RtpSource(timestamp.ms(), kSsrc1, RtpSourceType::SSRC,
+                            kRtpTimestamp0, extensions0),
+                  RtpSource(timestamp.ms(), kCsrcs1, RtpSourceType::CSRC,
+                            kRtpTimestamp0, extensions0),
+                  RtpSource(timestamp.ms(), kCsrcs0, RtpSourceType::CSRC,
+                            kRtpTimestamp0, extensions0)));
 }
 
 TEST(SourceTrackerTest, OnFrameDeliveredRecordsSourcesSameSsrc) {
@@ -324,8 +330,8 @@
   constexpr Timestamp kReceiveTime1 = Timestamp::Millis(70);
   constexpr Timestamp kReceiveTime2 = Timestamp::Millis(80);
 
-  SimulatedClock clock(1000000000000ULL);
-  SourceTracker tracker(&clock);
+  GlobalSimulatedTimeController time_controller(Timestamp::Seconds(1000));
+  SourceTracker tracker(time_controller.GetClock());
 
   tracker.OnFrameDelivered(RtpPacketInfos({
       RtpPacketInfo(kSsrc, {kCsrcs0, kCsrcs1}, kRtpTimestamp0, kReceiveTime0)
@@ -342,7 +348,8 @@
           .set_local_capture_clock_offset(kLocalCaptureClockOffset),
   }));
 
-  int64_t timestamp_ms = clock.TimeInMilliseconds();
+  time_controller.AdvanceTime(TimeDelta::Zero());
+  Timestamp timestamp = time_controller.GetClock()->CurrentTime();
   constexpr RtpSource::Extensions extensions0 = {
       .audio_level = kAudioLevel0,
       .absolute_capture_time = kAbsoluteCaptureTime,
@@ -356,15 +363,16 @@
       .absolute_capture_time = kAbsoluteCaptureTime,
       .local_capture_clock_offset = kLocalCaptureClockOffset};
 
-  EXPECT_THAT(tracker.GetSources(),
-              ElementsAre(RtpSource(timestamp_ms, kSsrc, RtpSourceType::SSRC,
-                                    kRtpTimestamp2, extensions2),
-                          RtpSource(timestamp_ms, kCsrcs0, RtpSourceType::CSRC,
-                                    kRtpTimestamp2, extensions2),
-                          RtpSource(timestamp_ms, kCsrcs2, RtpSourceType::CSRC,
-                                    kRtpTimestamp1, extensions1),
-                          RtpSource(timestamp_ms, kCsrcs1, RtpSourceType::CSRC,
-                                    kRtpTimestamp0, extensions0)));
+  EXPECT_THAT(
+      tracker.GetSources(),
+      ElementsAre(RtpSource(timestamp.ms(), kSsrc, RtpSourceType::SSRC,
+                            kRtpTimestamp2, extensions2),
+                  RtpSource(timestamp.ms(), kCsrcs0, RtpSourceType::CSRC,
+                            kRtpTimestamp2, extensions2),
+                  RtpSource(timestamp.ms(), kCsrcs2, RtpSourceType::CSRC,
+                            kRtpTimestamp1, extensions1),
+                  RtpSource(timestamp.ms(), kCsrcs1, RtpSourceType::CSRC,
+                            kRtpTimestamp0, extensions0)));
 }
 
 TEST(SourceTrackerTest, OnFrameDeliveredUpdatesSources) {
@@ -408,8 +416,8 @@
       .absolute_capture_time = kAbsoluteCaptureTime2,
       .local_capture_clock_offset = kLocalCaptureClockOffset2};
 
-  SimulatedClock clock(1000000000000ULL);
-  SourceTracker tracker(&clock);
+  GlobalSimulatedTimeController time_controller(Timestamp::Seconds(1000));
+  SourceTracker tracker(time_controller.GetClock());
 
   tracker.OnFrameDelivered(RtpPacketInfos(
       {RtpPacketInfo(kSsrc1, {kCsrcs0, kCsrcs1}, kRtpTimestamp0, kReceiveTime0)
@@ -417,40 +425,42 @@
            .set_absolute_capture_time(kAbsoluteCaptureTime0)
            .set_local_capture_clock_offset(kLocalCaptureClockOffset0)}));
 
-  int64_t timestamp_ms_0 = clock.TimeInMilliseconds();
+  time_controller.AdvanceTime(TimeDelta::Zero());
+  Timestamp timestamp_0 = time_controller.GetClock()->CurrentTime();
   EXPECT_THAT(
       tracker.GetSources(),
-      ElementsAre(RtpSource(timestamp_ms_0, kSsrc1, RtpSourceType::SSRC,
+      ElementsAre(RtpSource(timestamp_0.ms(), kSsrc1, RtpSourceType::SSRC,
                             kRtpTimestamp0, extensions0),
-                  RtpSource(timestamp_ms_0, kCsrcs1, RtpSourceType::CSRC,
+                  RtpSource(timestamp_0.ms(), kCsrcs1, RtpSourceType::CSRC,
                             kRtpTimestamp0, extensions0),
-                  RtpSource(timestamp_ms_0, kCsrcs0, RtpSourceType::CSRC,
+                  RtpSource(timestamp_0.ms(), kCsrcs0, RtpSourceType::CSRC,
                             kRtpTimestamp0, extensions0)));
 
   // Deliver packets with updated sources.
 
-  clock.AdvanceTimeMilliseconds(17);
+  time_controller.AdvanceTime(TimeDelta::Millis(17));
   tracker.OnFrameDelivered(RtpPacketInfos(
       {RtpPacketInfo(kSsrc1, {kCsrcs0, kCsrcs2}, kRtpTimestamp1, kReceiveTime1)
            .set_audio_level(kAudioLevel1)
            .set_absolute_capture_time(kAbsoluteCaptureTime1)
            .set_local_capture_clock_offset(kLocalCaptureClockOffset1)}));
 
-  int64_t timestamp_ms_1 = clock.TimeInMilliseconds();
+  time_controller.AdvanceTime(TimeDelta::Zero());
+  Timestamp timestamp_1 = time_controller.GetClock()->CurrentTime();
 
   EXPECT_THAT(
       tracker.GetSources(),
-      ElementsAre(RtpSource(timestamp_ms_1, kSsrc1, RtpSourceType::SSRC,
+      ElementsAre(RtpSource(timestamp_1.ms(), kSsrc1, RtpSourceType::SSRC,
                             kRtpTimestamp1, extensions1),
-                  RtpSource(timestamp_ms_1, kCsrcs2, RtpSourceType::CSRC,
+                  RtpSource(timestamp_1.ms(), kCsrcs2, RtpSourceType::CSRC,
                             kRtpTimestamp1, extensions1),
-                  RtpSource(timestamp_ms_1, kCsrcs0, RtpSourceType::CSRC,
+                  RtpSource(timestamp_1.ms(), kCsrcs0, RtpSourceType::CSRC,
                             kRtpTimestamp1, extensions1),
-                  RtpSource(timestamp_ms_0, kCsrcs1, RtpSourceType::CSRC,
+                  RtpSource(timestamp_0.ms(), kCsrcs1, RtpSourceType::CSRC,
                             kRtpTimestamp0, extensions0)));
 
   // Deliver more packets with update csrcs and a new ssrc.
-  clock.AdvanceTimeMilliseconds(17);
+  time_controller.AdvanceTime(TimeDelta::Millis(17));
 
   tracker.OnFrameDelivered(RtpPacketInfos(
       {RtpPacketInfo(kSsrc2, {kCsrcs0}, kRtpTimestamp2, kReceiveTime2)
@@ -458,19 +468,20 @@
            .set_absolute_capture_time(kAbsoluteCaptureTime2)
            .set_local_capture_clock_offset(kLocalCaptureClockOffset2)}));
 
-  int64_t timestamp_ms_2 = clock.TimeInMilliseconds();
+  time_controller.AdvanceTime(TimeDelta::Zero());
+  Timestamp timestamp_2 = time_controller.GetClock()->CurrentTime();
 
   EXPECT_THAT(
       tracker.GetSources(),
-      ElementsAre(RtpSource(timestamp_ms_2, kSsrc2, RtpSourceType::SSRC,
+      ElementsAre(RtpSource(timestamp_2.ms(), kSsrc2, RtpSourceType::SSRC,
                             kRtpTimestamp2, extensions2),
-                  RtpSource(timestamp_ms_2, kCsrcs0, RtpSourceType::CSRC,
+                  RtpSource(timestamp_2.ms(), kCsrcs0, RtpSourceType::CSRC,
                             kRtpTimestamp2, extensions2),
-                  RtpSource(timestamp_ms_1, kSsrc1, RtpSourceType::SSRC,
+                  RtpSource(timestamp_1.ms(), kSsrc1, RtpSourceType::SSRC,
                             kRtpTimestamp1, extensions1),
-                  RtpSource(timestamp_ms_1, kCsrcs2, RtpSourceType::CSRC,
+                  RtpSource(timestamp_1.ms(), kCsrcs2, RtpSourceType::CSRC,
                             kRtpTimestamp1, extensions1),
-                  RtpSource(timestamp_ms_0, kCsrcs1, RtpSourceType::CSRC,
+                  RtpSource(timestamp_0.ms(), kCsrcs1, RtpSourceType::CSRC,
                             kRtpTimestamp0, extensions0)));
 }
 
@@ -494,8 +505,8 @@
   constexpr Timestamp kReceiveTime0 = Timestamp::Millis(60);
   constexpr Timestamp kReceiveTime1 = Timestamp::Millis(61);
 
-  SimulatedClock clock(1000000000000ULL);
-  SourceTracker tracker(&clock);
+  GlobalSimulatedTimeController time_controller(Timestamp::Seconds(1000));
+  SourceTracker tracker(time_controller.GetClock());
 
   tracker.OnFrameDelivered(RtpPacketInfos(
       {RtpPacketInfo(kSsrc, {kCsrcs0, kCsrcs1}, kRtpTimestamp0, kReceiveTime0)
@@ -503,7 +514,7 @@
            .set_absolute_capture_time(kAbsoluteCaptureTime0)
            .set_local_capture_clock_offset(kLocalCaptureClockOffset0)}));
 
-  clock.AdvanceTimeMilliseconds(17);
+  time_controller.AdvanceTime(TimeDelta::Millis(17));
 
   tracker.OnFrameDelivered(RtpPacketInfos(
       {RtpPacketInfo(kSsrc, {kCsrcs0, kCsrcs2}, kRtpTimestamp1, kReceiveTime1)
@@ -511,9 +522,9 @@
            .set_absolute_capture_time(kAbsoluteCaptureTime1)
            .set_local_capture_clock_offset(kLocalCaptureClockOffset1)}));
 
-  int64_t timestamp_ms_1 = clock.TimeInMilliseconds();
+  Timestamp timestamp_1 = time_controller.GetClock()->CurrentTime();
 
-  clock.AdvanceTimeMilliseconds(SourceTracker::kTimeoutMs);
+  time_controller.AdvanceTime(SourceTracker::kTimeout);
 
   constexpr RtpSource::Extensions extensions1 = {
       .audio_level = kAudioLevel1,
@@ -522,11 +533,11 @@
 
   EXPECT_THAT(
       tracker.GetSources(),
-      ElementsAre(RtpSource(timestamp_ms_1, kSsrc, RtpSourceType::SSRC,
+      ElementsAre(RtpSource(timestamp_1.ms(), kSsrc, RtpSourceType::SSRC,
                             kRtpTimestamp1, extensions1),
-                  RtpSource(timestamp_ms_1, kCsrcs2, RtpSourceType::CSRC,
+                  RtpSource(timestamp_1.ms(), kCsrcs2, RtpSourceType::CSRC,
                             kRtpTimestamp1, extensions1),
-                  RtpSource(timestamp_ms_1, kCsrcs0, RtpSourceType::CSRC,
+                  RtpSource(timestamp_1.ms(), kCsrcs0, RtpSourceType::CSRC,
                             kRtpTimestamp1, extensions1)));
 }
 
diff --git a/pc/BUILD.gn b/pc/BUILD.gn
index 0bdaa27..48c8879 100644
--- a/pc/BUILD.gn
+++ b/pc/BUILD.gn
@@ -2288,6 +2288,7 @@
       "../rtc_base:ssl",
       "../test:test_main",
       "../test:test_support",
+      "../test/time_controller:time_controller",
       "//third_party/abseil-cpp/absl/algorithm:container",
       "//third_party/abseil-cpp/absl/strings",
       "//third_party/abseil-cpp/absl/types:optional",
@@ -2699,6 +2700,7 @@
       "../rtc_base:logging",
       "../rtc_base:macromagic",
       "../rtc_base:mdns_responder_interface",
+      "../rtc_base:null_socket_server",
       "../rtc_base:rtc_base_tests_utils",
       "../rtc_base:rtc_certificate_generator",
       "../rtc_base:rtc_event",
@@ -2720,6 +2722,7 @@
       "../test:scoped_key_value_config",
       "../test:test_support",
       "../test/pc/sctp:fake_sctp_transport",
+      "../test/time_controller",
     ]
     absl_deps = [
       "//third_party/abseil-cpp/absl/algorithm:container",
diff --git a/pc/slow_peer_connection_integration_test.cc b/pc/slow_peer_connection_integration_test.cc
index 004b795..fd9d341 100644
--- a/pc/slow_peer_connection_integration_test.cc
+++ b/pc/slow_peer_connection_integration_test.cc
@@ -67,7 +67,7 @@
     // Some things use a time of "0" as a special value, so we need to start out
     // the fake clock at a nonzero time.
     // TODO(deadbeef): Fix this.
-    AdvanceTime(webrtc::TimeDelta::Seconds(1));
+    AdvanceTime(webrtc::TimeDelta::Seconds(1000));
   }
 
   // Explicit handle.