Move thread handling from source tracker.

This makes it simpler to use in more contexts.

Bug: b/364184684
Change-Id: I1b08ebd24e51ba1b3f85261eed503a78cd006fd8
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/361480
Reviewed-by: Danil Chapovalov <danilchap@webrtc.org>
Commit-Queue: Jakob Ivarsson‎ <jakobi@webrtc.org>
Reviewed-by: Åsa Persson <asapersson@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#42956}
diff --git a/audio/audio_receive_stream.cc b/audio/audio_receive_stream.cc
index 44c95f3..bf2da5d 100644
--- a/audio/audio_receive_stream.cc
+++ b/audio/audio_receive_stream.cc
@@ -102,7 +102,6 @@
     std::unique_ptr<voe::ChannelReceiveInterface> channel_receive)
     : config_(config),
       audio_state_(audio_state),
-      source_tracker_(&env.clock()),
       channel_receive_(std::move(channel_receive)) {
   RTC_LOG(LS_INFO) << "AudioReceiveStreamImpl: " << config.rtp.remote_ssrc;
   RTC_DCHECK(config.decoder_factory);
@@ -114,11 +113,6 @@
   // Configure bandwidth estimation.
   channel_receive_->RegisterReceiverCongestionControlObjects(packet_router);
 
-  // When output is muted, ChannelReceive will directly notify the source
-  // tracker of "delivered" frames, so RtpReceiver information will continue to
-  // be updated.
-  channel_receive_->SetSourceTracker(&source_tracker_);
-
   // Complete configuration.
   // TODO(solenberg): Config NACK history window (which is a packet count),
   // using the actual packet size for the configured codec.
@@ -378,19 +372,13 @@
 
 std::vector<RtpSource> AudioReceiveStreamImpl::GetSources() const {
   RTC_DCHECK_RUN_ON(&worker_thread_checker_);
-  return source_tracker_.GetSources();
+  return channel_receive_->GetSources();
 }
 
 AudioMixer::Source::AudioFrameInfo
 AudioReceiveStreamImpl::GetAudioFrameWithInfo(int sample_rate_hz,
                                               AudioFrame* audio_frame) {
-  AudioMixer::Source::AudioFrameInfo audio_frame_info =
-      channel_receive_->GetAudioFrameWithInfo(sample_rate_hz, audio_frame);
-  if (audio_frame_info != AudioMixer::Source::AudioFrameInfo::kError &&
-      !audio_frame->packet_infos_.empty()) {
-    source_tracker_.OnFrameDelivered(audio_frame->packet_infos_);
-  }
-  return audio_frame_info;
+  return channel_receive_->GetAudioFrameWithInfo(sample_rate_hz, audio_frame);
 }
 
 int AudioReceiveStreamImpl::Ssrc() const {
diff --git a/audio/audio_receive_stream.h b/audio/audio_receive_stream.h
index 4eac8a7..cb1450b 100644
--- a/audio/audio_receive_stream.h
+++ b/audio/audio_receive_stream.h
@@ -25,7 +25,6 @@
 #include "audio/audio_state.h"
 #include "call/audio_receive_stream.h"
 #include "call/syncable.h"
-#include "modules/rtp_rtcp/source/source_tracker.h"
 #include "rtc_base/system/no_unique_address.h"
 
 namespace webrtc {
@@ -156,7 +155,6 @@
       SequenceChecker::kDetached};
   webrtc::AudioReceiveStreamInterface::Config config_;
   rtc::scoped_refptr<webrtc::AudioState> audio_state_;
-  SourceTracker source_tracker_;
   const std::unique_ptr<voe::ChannelReceiveInterface> channel_receive_;
   AudioSendStream* associated_send_stream_
       RTC_GUARDED_BY(packet_sequence_checker_) = nullptr;
diff --git a/audio/audio_receive_stream_unittest.cc b/audio/audio_receive_stream_unittest.cc
index a263ff9..0afda40 100644
--- a/audio/audio_receive_stream_unittest.cc
+++ b/audio/audio_receive_stream_unittest.cc
@@ -132,7 +132,6 @@
         .WillRepeatedly(Invoke([](const std::map<int, SdpAudioFormat>& codecs) {
           EXPECT_THAT(codecs, ::testing::IsEmpty());
         }));
-    EXPECT_CALL(*channel_receive_, SetSourceTracker(_));
     EXPECT_CALL(*channel_receive_, GetLocalSsrc())
         .WillRepeatedly(Return(kLocalSsrc));
 
diff --git a/audio/channel_receive.cc b/audio/channel_receive.cc
index 7fcdf66..86c8cfb 100644
--- a/audio/channel_receive.cc
+++ b/audio/channel_receive.cc
@@ -171,7 +171,7 @@
 
   int PreferredSampleRate() const override;
 
-  void SetSourceTracker(SourceTracker* source_tracker) override;
+  std::vector<RtpSource> GetSources() const override;
 
   // Associate to a send channel.
   // Used for obtaining RTT for a receive-only channel.
@@ -240,7 +240,7 @@
   std::unique_ptr<ReceiveStatistics> rtp_receive_statistics_;
   std::unique_ptr<ModuleRtpRtcpImpl2> rtp_rtcp_;
   const uint32_t remote_ssrc_;
-  SourceTracker* source_tracker_ = nullptr;
+  SourceTracker source_tracker_ RTC_GUARDED_BY(&worker_thread_checker_);
 
   // Info for GetSyncInfo is updated on network or worker thread, and queried on
   // the worker thread.
@@ -325,20 +325,18 @@
     // Avoid inserting into NetEQ when we are not playing. Count the
     // packet as discarded.
 
-    // If we have a source_tracker_, tell it that the frame has been
-    // "delivered". Normally, this happens in AudioReceiveStreamInterface when
-    // audio frames are pulled out, but when playout is muted, nothing is
-    // pulling frames. The downside of this approach is that frames delivered
-    // this way won't be delayed for playout, and therefore will be
-    // unsynchronized with (a) audio delay when playing and (b) any audio/video
-    // synchronization. But the alternative is that muting playout also stops
-    // the SourceTracker from updating RtpSource information.
-    if (source_tracker_) {
-      RtpPacketInfos::vector_type packet_vector = {
-          RtpPacketInfo(rtpHeader, receive_time)};
-      source_tracker_->OnFrameDelivered(RtpPacketInfos(packet_vector));
-    }
-
+    // Tell source_tracker_ that the frame has been "delivered". Normally, this
+    // happens in AudioReceiveStreamInterface when audio frames are pulled out,
+    // but when playout is muted, nothing is pulling frames. The downside of
+    // this approach is that frames delivered this way won't be delayed for
+    // playout, and therefore will be unsynchronized with (a) audio delay when
+    // playing and (b) any audio/video synchronization. But the alternative is
+    // that muting playout also stops the SourceTracker from updating RtpSource
+    // information.
+    RtpPacketInfos::vector_type packet_vector = {
+        RtpPacketInfo(rtpHeader, receive_time)};
+    source_tracker_.OnFrameDelivered(RtpPacketInfos(packet_vector),
+                                     env_.clock().CurrentTime());
     return;
   }
 
@@ -482,7 +480,16 @@
     }
     packet_infos.push_back(std::move(new_packet_info));
   }
-  audio_frame->packet_infos_ = RtpPacketInfos(packet_infos);
+  audio_frame->packet_infos_ = RtpPacketInfos(std::move(packet_infos));
+  if (!audio_frame->packet_infos_.empty()) {
+    RtpPacketInfos infos_copy = audio_frame->packet_infos_;
+    Timestamp delivery_time = env_.clock().CurrentTime();
+    worker_thread_->PostTask(
+        SafeTask(worker_safety_.flag(), [this, infos_copy, delivery_time]() {
+          RTC_DCHECK_RUN_ON(&worker_thread_checker_);
+          source_tracker_.OnFrameDelivered(infos_copy, delivery_time);
+        }));
+  }
 
   ++audio_frame_interval_count_;
   if (audio_frame_interval_count_ >= kHistogramReportingInterval) {
@@ -514,10 +521,6 @@
                   acm_receiver_.last_output_sample_rate_hz());
 }
 
-void ChannelReceive::SetSourceTracker(SourceTracker* source_tracker) {
-  source_tracker_ = source_tracker;
-}
-
 ChannelReceive::ChannelReceive(
     const Environment& env,
     NetEqFactory* neteq_factory,
@@ -538,6 +541,7 @@
       worker_thread_(TaskQueueBase::Current()),
       rtp_receive_statistics_(ReceiveStatistics::Create(&env_.clock())),
       remote_ssrc_(remote_ssrc),
+      source_tracker_(&env_.clock()),
       acm_receiver_(env_,
                     AcmConfig(neteq_factory,
                               decoder_factory,
@@ -1102,6 +1106,11 @@
              : acm_receiver_.last_output_sample_rate_hz();
 }
 
+std::vector<RtpSource> ChannelReceive::GetSources() const {
+  RTC_DCHECK_RUN_ON(&worker_thread_checker_);
+  return source_tracker_.GetSources();
+}
+
 }  // namespace
 
 std::unique_ptr<ChannelReceiveInterface> CreateChannelReceive(
diff --git a/audio/channel_receive.h b/audio/channel_receive.h
index 0a11573..4e2048d 100644
--- a/audio/channel_receive.h
+++ b/audio/channel_receive.h
@@ -148,9 +148,7 @@
 
   virtual int PreferredSampleRate() const = 0;
 
-  // Sets the source tracker to notify about "delivered" packets when output is
-  // muted.
-  virtual void SetSourceTracker(SourceTracker* source_tracker) = 0;
+  virtual std::vector<RtpSource> GetSources() const = 0;
 
   // Associate to a send channel.
   // Used for obtaining RTT for a receive-only channel.
diff --git a/audio/mock_voe_channel_proxy.h b/audio/mock_voe_channel_proxy.h
index 1a62f02..fc1cecb 100644
--- a/audio/mock_voe_channel_proxy.h
+++ b/audio/mock_voe_channel_proxy.h
@@ -62,7 +62,7 @@
               (int sample_rate_hz, AudioFrame*),
               (override));
   MOCK_METHOD(int, PreferredSampleRate, (), (const, override));
-  MOCK_METHOD(void, SetSourceTracker, (SourceTracker*), (override));
+  MOCK_METHOD(std::vector<RtpSource>, GetSources, (), (const, override));
   MOCK_METHOD(void,
               SetAssociatedSendChannel,
               (const voe::ChannelSendInterface*),
diff --git a/modules/rtp_rtcp/source/source_tracker.cc b/modules/rtp_rtcp/source/source_tracker.cc
index cd881f1..4d1f406 100644
--- a/modules/rtp_rtcp/source/source_tracker.cc
+++ b/modules/rtp_rtcp/source/source_tracker.cc
@@ -17,42 +17,26 @@
 
 namespace webrtc {
 
-SourceTracker::SourceTracker(Clock* clock)
-    : worker_thread_(TaskQueueBase::Current()), clock_(clock) {
-  RTC_DCHECK(worker_thread_);
+SourceTracker::SourceTracker(Clock* clock) : clock_(clock) {
   RTC_DCHECK(clock_);
 }
 
-void SourceTracker::OnFrameDelivered(RtpPacketInfos packet_infos) {
+void SourceTracker::OnFrameDelivered(const RtpPacketInfos& packet_infos,
+                                     Timestamp delivery_time) {
+  TRACE_EVENT0("webrtc", "SourceTracker::OnFrameDelivered");
   if (packet_infos.empty()) {
     return;
   }
-
-  Timestamp now = clock_->CurrentTime();
-  if (worker_thread_->IsCurrent()) {
-    RTC_DCHECK_RUN_ON(worker_thread_);
-    OnFrameDeliveredInternal(now, packet_infos);
-  } else {
-    worker_thread_->PostTask(
-        SafeTask(worker_safety_.flag(),
-                 [this, packet_infos = std::move(packet_infos), now]() {
-                   RTC_DCHECK_RUN_ON(worker_thread_);
-                   OnFrameDeliveredInternal(now, packet_infos);
-                 }));
+  if (delivery_time.IsInfinite()) {
+    delivery_time = clock_->CurrentTime();
   }
-}
-
-void SourceTracker::OnFrameDeliveredInternal(
-    Timestamp now,
-    const RtpPacketInfos& packet_infos) {
-  TRACE_EVENT0("webrtc", "SourceTracker::OnFrameDelivered");
 
   for (const RtpPacketInfo& packet_info : packet_infos) {
     for (uint32_t csrc : packet_info.csrcs()) {
       SourceKey key(RtpSourceType::CSRC, csrc);
       SourceEntry& entry = UpdateEntry(key);
 
-      entry.timestamp = now;
+      entry.timestamp = delivery_time;
       entry.audio_level = packet_info.audio_level();
       entry.absolute_capture_time = packet_info.absolute_capture_time();
       entry.local_capture_clock_offset =
@@ -63,19 +47,17 @@
     SourceKey key(RtpSourceType::SSRC, packet_info.ssrc());
     SourceEntry& entry = UpdateEntry(key);
 
-    entry.timestamp = now;
+    entry.timestamp = delivery_time;
     entry.audio_level = packet_info.audio_level();
     entry.absolute_capture_time = packet_info.absolute_capture_time();
     entry.local_capture_clock_offset = packet_info.local_capture_clock_offset();
     entry.rtp_timestamp = packet_info.rtp_timestamp();
   }
 
-  PruneEntries(now);
+  PruneEntries(delivery_time);
 }
 
 std::vector<RtpSource> SourceTracker::GetSources() const {
-  RTC_DCHECK_RUN_ON(worker_thread_);
-
   PruneEntries(clock_->CurrentTime());
 
   std::vector<RtpSource> sources;
diff --git a/modules/rtp_rtcp/source/source_tracker.h b/modules/rtp_rtcp/source/source_tracker.h
index 9d39599..3be339f 100644
--- a/modules/rtp_rtcp/source/source_tracker.h
+++ b/modules/rtp_rtcp/source/source_tracker.h
@@ -19,8 +19,6 @@
 #include <vector>
 
 #include "api/rtp_packet_infos.h"
-#include "api/task_queue/pending_task_safety_flag.h"
-#include "api/task_queue/task_queue_base.h"
 #include "api/transport/rtp/rtp_source.h"
 #include "api/units/time_delta.h"
 #include "api/units/timestamp.h"
@@ -34,6 +32,7 @@
 //   - https://w3c.github.io/webrtc-pc/#dom-rtcrtpcontributingsource
 //   - https://w3c.github.io/webrtc-pc/#dom-rtcrtpsynchronizationsource
 //
+// This class is thread unsafe.
 class SourceTracker {
  public:
   // Amount of time before the entry associated with an update is removed. See:
@@ -49,7 +48,8 @@
 
   // Updates the source entries when a frame is delivered to the
   // RTCRtpReceiver's MediaStreamTrack.
-  void OnFrameDelivered(RtpPacketInfos packet_infos);
+  void OnFrameDelivered(const RtpPacketInfos& packet_infos,
+                        Timestamp delivery_time = Timestamp::MinusInfinity());
 
   // Returns an `RtpSource` for each unique SSRC and CSRC identifier updated in
   // the last `kTimeoutMs` milliseconds. Entries appear in reverse chronological
@@ -116,27 +116,21 @@
                                        SourceKeyHasher,
                                        SourceKeyComparator>;
 
-  void OnFrameDeliveredInternal(Timestamp now,
-                                const RtpPacketInfos& packet_infos)
-      RTC_RUN_ON(worker_thread_);
-
   // Updates an entry by creating it (if it didn't previously exist) and moving
   // it to the front of the list. Returns a reference to the entry.
-  SourceEntry& UpdateEntry(const SourceKey& key) RTC_RUN_ON(worker_thread_);
+  SourceEntry& UpdateEntry(const SourceKey& key);
 
   // Removes entries that have timed out. Marked as "const" so that we can do
   // pruning in getters.
-  void PruneEntries(Timestamp now) const RTC_RUN_ON(worker_thread_);
+  void PruneEntries(Timestamp now) const;
 
-  TaskQueueBase* const worker_thread_;
   Clock* const clock_;
 
   // Entries are stored in reverse chronological order (i.e. with the most
   // recently updated entries appearing first). Mutability is needed for timeout
   // pruning in const functions.
-  mutable SourceList list_ RTC_GUARDED_BY(worker_thread_);
-  mutable SourceMap map_ RTC_GUARDED_BY(worker_thread_);
-  ScopedTaskSafety worker_safety_;
+  mutable SourceList list_;
+  mutable SourceMap map_;
 };
 
 }  // namespace webrtc
diff --git a/video/video_receive_stream2.cc b/video/video_receive_stream2.cc
index c6a20ef..4372fda 100644
--- a/video/video_receive_stream2.cc
+++ b/video/video_receive_stream2.cc
@@ -625,11 +625,11 @@
 }
 
 void VideoReceiveStream2::OnFrame(const VideoFrame& video_frame) {
-  source_tracker_.OnFrameDelivered(video_frame.packet_infos());
   config_.renderer->OnFrame(video_frame);
 
-  // TODO(bugs.webrtc.org/10739): we should set local capture clock offset for
-  // `video_frame.packet_infos`. But VideoFrame is const qualified here.
+  // TODO: bugs.webrtc.org/42220804 - we should set local capture clock offset
+  // for `packet_infos`.
+  RtpPacketInfos packet_infos = video_frame.packet_infos();
 
   // For frame delay metrics, calculated in `OnRenderedFrame`, to better reflect
   // user experience measurements must be done as close as possible to frame
@@ -640,7 +640,7 @@
   // rendered" callback from the renderer.
   VideoFrameMetaData frame_meta(video_frame, env_.clock().CurrentTime());
   call_->worker_thread()->PostTask(
-      SafeTask(task_safety_.flag(), [frame_meta, this]() {
+      SafeTask(task_safety_.flag(), [frame_meta, packet_infos, this]() {
         RTC_DCHECK_RUN_ON(&worker_sequence_checker_);
         int64_t video_playout_ntp_ms;
         int64_t sync_offset_ms;
@@ -652,6 +652,8 @@
                                            estimated_freq_khz);
         }
         stats_proxy_.OnRenderedFrame(frame_meta);
+        source_tracker_.OnFrameDelivered(packet_infos,
+                                         frame_meta.decode_timestamp);
       }));
 
   webrtc::MutexLock lock(&pending_resolution_mutex_);
@@ -1047,6 +1049,7 @@
 }
 
 std::vector<webrtc::RtpSource> VideoReceiveStream2::GetSources() const {
+  RTC_DCHECK_RUN_ON(&worker_sequence_checker_);
   return source_tracker_.GetSources();
 }
 
diff --git a/video/video_receive_stream2.h b/video/video_receive_stream2.h
index f161f63..2c7eb4c 100644
--- a/video/video_receive_stream2.h
+++ b/video/video_receive_stream2.h
@@ -264,7 +264,7 @@
   bool decoder_running_ RTC_GUARDED_BY(worker_sequence_checker_) = false;
   bool decoder_stopped_ RTC_GUARDED_BY(decode_sequence_checker_) = true;
 
-  SourceTracker source_tracker_;
+  SourceTracker source_tracker_ RTC_GUARDED_BY(worker_sequence_checker_);
   ReceiveStatisticsProxy stats_proxy_;
   // Shared by media and rtx stream receivers, since the latter has no RtpRtcp
   // module of its own.