Replace the implementation of `GetContributingSources()` on the video side.

This change replaces the `ContributingSources`-implementation of `GetContributingSources()` and `GetSynchronizationSources()` on the video side with the spec-compliant `SourceTracker`-implementation.

The most noticeable impact is that the per-frame dictionaries are now updated when frames are delivered to the RTCRtpReceiver's MediaStreamTrack rather than when RTP packets are received on the network.

Bug: webrtc:10545
Change-Id: I895b5790280ac94c1501801d226c643633c67349
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/143177
Reviewed-by: Niels Moller <nisse@webrtc.org>
Reviewed-by: Stefan Holmer <stefan@webrtc.org>
Commit-Queue: Chen Xing <chxg@google.com>
Cr-Commit-Position: refs/heads/master@{#28386}
diff --git a/media/engine/webrtc_video_engine_unittest.cc b/media/engine/webrtc_video_engine_unittest.cc
index f7b2a2f..bb989f4 100644
--- a/media/engine/webrtc_video_engine_unittest.cc
+++ b/media/engine/webrtc_video_engine_unittest.cc
@@ -60,6 +60,8 @@
 #include "test/gmock.h"
 
 using ::testing::Field;
+using ::testing::IsEmpty;
+using ::testing::SizeIs;
 using webrtc::BitrateConstraints;
 using webrtc::RtpExtension;
 
@@ -7514,137 +7516,44 @@
                           false);
 }
 
-class WebRtcVideoFakeClock {
- public:
-  WebRtcVideoFakeClock() {
-    fake_clock_.AdvanceTime(webrtc::TimeDelta::ms(1));  // avoid time=0
-  }
-  rtc::ScopedFakeClock fake_clock_;
-};
+TEST_F(WebRtcVideoChannelBaseTest, GetSources) {
+  EXPECT_THAT(channel_->GetSources(kSsrc), IsEmpty());
 
-// The fake clock needs to be initialized before the call, and not
-// destroyed until after all threads spawned by the test have been stopped.
-// This mixin ensures that.
-class WebRtcVideoChannelTestWithClock : public WebRtcVideoFakeClock,
-                                        public WebRtcVideoChannelBaseTest {};
-
-TEST_F(WebRtcVideoChannelTestWithClock, GetSources) {
-  uint8_t data1[] = {0x80, 0x00, 0x00, 0x00, 0x00, 0x00,
-                     0x00, 0x00, 0x00, 0x00, 0x00, 0x00};
-
-  EXPECT_EQ(0u, channel_->GetSources(kSsrc).size());
-
-  rtc::CopyOnWriteBuffer packet1(data1, sizeof(data1));
-  rtc::SetBE32(packet1.data() + 8, kSsrc);
-  channel_->SetSink(kDefaultReceiveSsrc, NULL);
+  EXPECT_TRUE(channel_->SetSink(kDefaultReceiveSsrc, &renderer_));
   EXPECT_TRUE(SetDefaultCodec());
   EXPECT_TRUE(SetSend(true));
-  EXPECT_EQ(0, renderer_.num_rendered_frames());
-  channel_->OnPacketReceived(packet1, /*packet_time_us=*/-1);
+  EXPECT_EQ(renderer_.num_rendered_frames(), 0);
 
-  std::vector<webrtc::RtpSource> sources = channel_->GetSources(kSsrc);
-  EXPECT_EQ(1u, sources.size());
-  EXPECT_EQ(webrtc::RtpSourceType::SSRC, sources[0].source_type());
-  int64_t timestamp1 = sources[0].timestamp_ms();
+  // Send and receive one frame.
+  SendFrame();
+  EXPECT_FRAME_WAIT(1, kVideoWidth, kVideoHeight, kTimeout);
 
-  // a new packet.
-  int64_t timeDeltaMs = 1;
-  fake_clock_.AdvanceTime(webrtc::TimeDelta::ms(timeDeltaMs));
-  channel_->OnPacketReceived(packet1, /*packet_time_us=*/-1);
-  int64_t timestamp2 = channel_->GetSources(kSsrc)[0].timestamp_ms();
-  EXPECT_EQ(timestamp2, timestamp1 + timeDeltaMs);
+  EXPECT_THAT(channel_->GetSources(kSsrc - 1), IsEmpty());
+  EXPECT_THAT(channel_->GetSources(kSsrc), SizeIs(1));
+  EXPECT_THAT(channel_->GetSources(kSsrc + 1), IsEmpty());
 
-  // It only keeps 10s of history.
-  fake_clock_.AdvanceTime(webrtc::TimeDelta::seconds(10));
-  fake_clock_.AdvanceTime(webrtc::TimeDelta::ms(1));
-  EXPECT_EQ(0u, channel_->GetSources(kSsrc).size());
-}
+  webrtc::RtpSource source = channel_->GetSources(kSsrc)[0];
+  EXPECT_EQ(source.source_id(), kSsrc);
+  EXPECT_EQ(source.source_type(), webrtc::RtpSourceType::SSRC);
+  int64_t rtp_timestamp_1 = source.rtp_timestamp();
+  int64_t timestamp_ms_1 = source.timestamp_ms();
 
-TEST_F(WebRtcVideoChannelTestWithClock, GetContributingSources) {
-  uint8_t data1[] = {0x81, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
-                     0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00};
+  // Send and receive another frame.
+  SendFrame();
+  EXPECT_FRAME_WAIT(2, kVideoWidth, kVideoHeight, kTimeout);
 
-  uint32_t kCsrc = 4321u;
-  EXPECT_EQ(0u, channel_->GetSources(kSsrc).size());
-  EXPECT_EQ(0u, channel_->GetSources(kCsrc).size());
+  EXPECT_THAT(channel_->GetSources(kSsrc - 1), IsEmpty());
+  EXPECT_THAT(channel_->GetSources(kSsrc), SizeIs(1));
+  EXPECT_THAT(channel_->GetSources(kSsrc + 1), IsEmpty());
 
-  rtc::CopyOnWriteBuffer packet1(data1, sizeof(data1));
-  rtc::SetBE32(packet1.data() + 8, kSsrc);
-  rtc::SetBE32(packet1.data() + 12, kCsrc);
-  channel_->SetSink(kDefaultReceiveSsrc, NULL);
-  EXPECT_TRUE(SetDefaultCodec());
-  EXPECT_TRUE(SetSend(true));
-  EXPECT_EQ(0, renderer_.num_rendered_frames());
-  channel_->OnPacketReceived(packet1, /*packet_time_us=*/-1);
+  source = channel_->GetSources(kSsrc)[0];
+  EXPECT_EQ(source.source_id(), kSsrc);
+  EXPECT_EQ(source.source_type(), webrtc::RtpSourceType::SSRC);
+  int64_t rtp_timestamp_2 = source.rtp_timestamp();
+  int64_t timestamp_ms_2 = source.timestamp_ms();
 
-  {
-    ASSERT_EQ(2u, channel_->GetSources(kSsrc).size());
-    EXPECT_EQ(0u, channel_->GetSources(kCsrc).size());
-    std::vector<webrtc::RtpSource> sources = channel_->GetSources(kSsrc);
-    EXPECT_EQ(sources[0].timestamp_ms(), sources[1].timestamp_ms());
-    // 1 SSRC and 1 CSRC.
-    EXPECT_EQ(1, absl::c_count_if(sources, [](const webrtc::RtpSource& source) {
-                return source.source_type() == webrtc::RtpSourceType::SSRC;
-              }));
-    EXPECT_EQ(1, absl::c_count_if(sources, [](const webrtc::RtpSource& source) {
-                return source.source_type() == webrtc::RtpSourceType::CSRC;
-              }));
-  }
-  int64_t timestamp1 = channel_->GetSources(kSsrc)[0].timestamp_ms();
-
-  // a new packet with only ssrc (i.e no csrc).
-  uint8_t data2[] = {0x80, 0x00, 0x00, 0x00, 0x00, 0x00,
-                     0x00, 0x00, 0x00, 0x00, 0x00, 0x00};
-  rtc::CopyOnWriteBuffer packet2(data2, sizeof(data2));
-  rtc::SetBE32(packet2.data() + 8, kSsrc);
-
-  int64_t timeDeltaMs = 1;
-  fake_clock_.AdvanceTime(webrtc::TimeDelta::ms(timeDeltaMs));
-  channel_->OnPacketReceived(packet2, /*packet_time_us=*/-1);
-
-  {
-    ASSERT_EQ(2u, channel_->GetSources(kSsrc).size());
-    EXPECT_EQ(0u, channel_->GetSources(kCsrc).size());
-    std::vector<webrtc::RtpSource> sources = channel_->GetSources(kSsrc);
-    EXPECT_NE(sources[0].timestamp_ms(), sources[1].timestamp_ms());
-    // 1 SSRC and 1 CSRC.
-    EXPECT_EQ(1, absl::c_count_if(sources, [](const webrtc::RtpSource& source) {
-                return source.source_type() == webrtc::RtpSourceType::SSRC;
-              }));
-    EXPECT_EQ(1, absl::c_count_if(sources, [](const webrtc::RtpSource& source) {
-                return source.source_type() == webrtc::RtpSourceType::CSRC;
-              }));
-    auto ssrcSource =
-        absl::c_find_if(sources, [](const webrtc::RtpSource& source) {
-          return source.source_type() == webrtc::RtpSourceType::SSRC;
-        });
-    auto csrcSource =
-        absl::c_find_if(sources, [](const webrtc::RtpSource& source) {
-          return source.source_type() == webrtc::RtpSourceType::CSRC;
-        });
-
-    EXPECT_EQ(ssrcSource->timestamp_ms(), timestamp1 + timeDeltaMs);
-    EXPECT_EQ(csrcSource->timestamp_ms(), timestamp1);
-  }
-
-  // It only keeps 10s of history.
-  fake_clock_.AdvanceTime(webrtc::TimeDelta::seconds(10));
-
-  {
-    ASSERT_EQ(1u, channel_->GetSources(kSsrc).size());
-    EXPECT_EQ(0u, channel_->GetSources(kCsrc).size());
-    std::vector<webrtc::RtpSource> sources = channel_->GetSources(kSsrc);
-    EXPECT_EQ(1, absl::c_count_if(sources, [](const webrtc::RtpSource& source) {
-                return source.source_type() == webrtc::RtpSourceType::SSRC;
-              }));
-    EXPECT_EQ(0, absl::c_count_if(sources, [](const webrtc::RtpSource& source) {
-                return source.source_type() == webrtc::RtpSourceType::CSRC;
-              }));
-  }
-
-  fake_clock_.AdvanceTime(webrtc::TimeDelta::ms(1));
-  EXPECT_EQ(0u, channel_->GetSources(kSsrc).size());
-  EXPECT_EQ(0u, channel_->GetSources(kCsrc).size());
+  EXPECT_GT(rtp_timestamp_2, rtp_timestamp_1);
+  EXPECT_GT(timestamp_ms_2, timestamp_ms_1);
 }
 
 TEST_F(WebRtcVideoChannelTest, SetsRidsOnSendStream) {
diff --git a/video/rtp_video_stream_receiver.cc b/video/rtp_video_stream_receiver.cc
index ea0f663..237a4c2 100644
--- a/video/rtp_video_stream_receiver.cc
+++ b/video/rtp_video_stream_receiver.cc
@@ -307,7 +307,7 @@
     return absl::nullopt;
   }
   {
-    rtc::CritScope lock(&rtp_sources_lock_);
+    rtc::CritScope lock(&sync_info_lock_);
     if (!last_received_rtp_timestamp_ || !last_received_rtp_system_time_ms_) {
       return absl::nullopt;
     }
@@ -423,14 +423,9 @@
     // TODO(nisse): Exclude out-of-order packets?
     int64_t now_ms = clock_->TimeInMilliseconds();
     {
-      rtc::CritScope cs(&rtp_sources_lock_);
+      rtc::CritScope cs(&sync_info_lock_);
       last_received_rtp_timestamp_ = packet.Timestamp();
       last_received_rtp_system_time_ms_ = now_ms;
-
-      std::vector<uint32_t> csrcs = packet.Csrcs();
-      contributing_sources_.Update(now_ms, csrcs,
-                                   /* audio level */ absl::nullopt,
-                                   packet.Timestamp());
     }
     // Periodically log the RTP header of incoming packets.
     if (now_ms - last_packet_log_ms_ > kPacketLogIntervalMs) {
@@ -894,22 +889,4 @@
                              sprop_decoder.pps_nalu());
 }
 
-std::vector<webrtc::RtpSource> RtpVideoStreamReceiver::GetSources() const {
-  int64_t now_ms = rtc::TimeMillis();
-  std::vector<RtpSource> sources;
-  {
-    rtc::CritScope cs(&rtp_sources_lock_);
-    sources = contributing_sources_.GetSources(now_ms);
-    if (last_received_rtp_system_time_ms_ >=
-        now_ms - ContributingSources::kHistoryMs) {
-      RTC_DCHECK(last_received_rtp_timestamp_.has_value());
-      sources.emplace_back(*last_received_rtp_system_time_ms_,
-                           config_.rtp.remote_ssrc, RtpSourceType::SSRC,
-                           /* audio_level */ absl::nullopt,
-                           *last_received_rtp_timestamp_);
-    }
-  }
-  return sources;
-}
-
 }  // namespace webrtc
diff --git a/video/rtp_video_stream_receiver.h b/video/rtp_video_stream_receiver.h
index d574184..3202a31 100644
--- a/video/rtp_video_stream_receiver.h
+++ b/video/rtp_video_stream_receiver.h
@@ -175,8 +175,6 @@
   void AddSecondarySink(RtpPacketSinkInterface* sink);
   void RemoveSecondarySink(const RtpPacketSinkInterface* sink);
 
-  std::vector<webrtc::RtpSource> GetSources() const;
-
  private:
   // Used for buffering RTCP feedback messages and sending them all together.
   // Note:
@@ -298,14 +296,13 @@
   std::vector<RtpPacketSinkInterface*> secondary_sinks_
       RTC_GUARDED_BY(worker_task_checker_);
 
-  // Info for GetSources and GetSyncInfo is updated on network or worker thread,
-  // queried on the worker thread.
-  rtc::CriticalSection rtp_sources_lock_;
-  ContributingSources contributing_sources_ RTC_GUARDED_BY(&rtp_sources_lock_);
+  // Info for GetSyncInfo is updated on network or worker thread, and queried on
+  // the worker thread.
+  rtc::CriticalSection sync_info_lock_;
   absl::optional<uint32_t> last_received_rtp_timestamp_
-      RTC_GUARDED_BY(rtp_sources_lock_);
+      RTC_GUARDED_BY(sync_info_lock_);
   absl::optional<int64_t> last_received_rtp_system_time_ms_
-      RTC_GUARDED_BY(rtp_sources_lock_);
+      RTC_GUARDED_BY(sync_info_lock_);
 
   // Used to validate the buffered frame decryptor is always run on the correct
   // thread.
diff --git a/video/video_receive_stream.cc b/video/video_receive_stream.cc
index c8f14d8..b69a0be 100644
--- a/video/video_receive_stream.cc
+++ b/video/video_receive_stream.cc
@@ -191,6 +191,7 @@
                      "DecodingThread",
                      rtc::kHighestPriority),
       call_stats_(call_stats),
+      source_tracker_(clock_),
       stats_proxy_(&config_, clock_),
       rtp_receive_statistics_(
           ReceiveStatistics::Create(clock_, &stats_proxy_, &stats_proxy_)),
@@ -503,6 +504,7 @@
   }
   config_.renderer->OnFrame(video_frame);
 
+  source_tracker_.OnFrameDelivered(video_frame.packet_infos());
   // TODO(tommi): OnRenderFrame grabs a lock too.
   stats_proxy_.OnRenderedFrame(video_frame);
 }
@@ -742,7 +744,7 @@
 }
 
 std::vector<webrtc::RtpSource> VideoReceiveStream::GetSources() const {
-  return rtp_video_stream_receiver_.GetSources();
+  return source_tracker_.GetSources();
 }
 
 }  // namespace internal
diff --git a/video/video_receive_stream.h b/video/video_receive_stream.h
index 9d18218..3acba46 100644
--- a/video/video_receive_stream.h
+++ b/video/video_receive_stream.h
@@ -20,6 +20,7 @@
 #include "call/syncable.h"
 #include "call/video_receive_stream.h"
 #include "modules/rtp_rtcp/include/flexfec_receiver.h"
+#include "modules/rtp_rtcp/source/source_tracker.h"
 #include "modules/video_coding/frame_buffer2.h"
 #include "modules/video_coding/video_coding_impl.h"
 #include "rtc_base/synchronization/sequence_checker.h"
@@ -164,6 +165,7 @@
   bool decoder_running_ RTC_GUARDED_BY(worker_sequence_checker_) = false;
   bool decoder_stopped_ RTC_GUARDED_BY(decode_queue_) = true;
 
+  SourceTracker source_tracker_;
   ReceiveStatisticsProxy stats_proxy_;
   // Shared by media and rtx stream receivers, since the latter has no RtpRtcp
   // module of its own.
diff --git a/video/video_receive_stream_unittest.cc b/video/video_receive_stream_unittest.cc
index d8a845e..005b077 100644
--- a/video/video_receive_stream_unittest.cc
+++ b/video/video_receive_stream_unittest.cc
@@ -8,6 +8,7 @@
  *  be found in the AUTHORS file in the root of the source tree.
  */
 
+#include <algorithm>
 #include <utility>
 #include <vector>
 
@@ -37,7 +38,10 @@
 namespace {
 
 using ::testing::_;
+using ::testing::ElementsAreArray;
 using ::testing::Invoke;
+using ::testing::IsEmpty;
+using ::testing::SizeIs;
 
 constexpr int kDefaultTimeOutMs = 50;
 
@@ -107,14 +111,14 @@
     null_decoder.decoder_factory = &null_decoder_factory_;
     config_.decoders.push_back(null_decoder);
 
-    Clock* clock = Clock::GetRealTimeClock();
-    timing_ = new VCMTiming(clock);
+    clock_ = Clock::GetRealTimeClock();
+    timing_ = new VCMTiming(clock_);
 
     video_receive_stream_ =
         absl::make_unique<webrtc::internal::VideoReceiveStream>(
             task_queue_factory_.get(), &rtp_stream_receiver_controller_,
             kDefaultNumCpuCores, &packet_router_, config_.Copy(),
-            process_thread_.get(), &call_stats_, clock, timing_);
+            process_thread_.get(), &call_stats_, clock_, timing_);
   }
 
  protected:
@@ -131,6 +135,7 @@
   PacketRouter packet_router_;
   RtpStreamReceiverController rtp_stream_receiver_controller_;
   std::unique_ptr<webrtc::internal::VideoReceiveStream> video_receive_stream_;
+  Clock* clock_;
   VCMTiming* timing_;
 };
 
@@ -243,13 +248,13 @@
     fake_decoder.video_format = SdpVideoFormat("VP8");
     fake_decoder.decoder_factory = &fake_decoder_factory_;
     config_.decoders.push_back(fake_decoder);
-    Clock* clock = Clock::GetRealTimeClock();
-    timing_ = new VCMTiming(clock);
+    clock_ = Clock::GetRealTimeClock();
+    timing_ = new VCMTiming(clock_);
 
     video_receive_stream_.reset(new webrtc::internal::VideoReceiveStream(
         task_queue_factory_.get(), &rtp_stream_receiver_controller_,
         kDefaultNumCpuCores, &packet_router_, config_.Copy(),
-        process_thread_.get(), &call_stats_, clock, timing_));
+        process_thread_.get(), &call_stats_, clock_, timing_));
   }
 
  protected:
@@ -263,6 +268,7 @@
   PacketRouter packet_router_;
   RtpStreamReceiverController rtp_stream_receiver_controller_;
   std::unique_ptr<webrtc::internal::VideoReceiveStream> video_receive_stream_;
+  Clock* clock_;
   VCMTiming* timing_;
 };
 
@@ -320,7 +326,85 @@
   video_receive_stream_->OnCompleteFrame(std::move(test_frame));
   EXPECT_TRUE(fake_renderer_.WaitForRenderedFrame(kDefaultTimeOutMs));
 
-  EXPECT_EQ(fake_renderer_.packet_infos().size(), 3U);
+  EXPECT_THAT(fake_renderer_.packet_infos(), ElementsAreArray(packet_infos));
+}
+
+TEST_F(VideoReceiveStreamTestWithFakeDecoder, RenderedFrameUpdatesGetSources) {
+  constexpr uint32_t kSsrc = 1111;
+  constexpr uint32_t kCsrc = 9001;
+  constexpr uint32_t kRtpTimestamp = 12345;
+
+  // Prepare one video frame with per-packet information.
+  auto test_frame = absl::make_unique<FrameObjectFake>();
+  test_frame->SetPayloadType(99);
+  test_frame->id.picture_id = 0;
+  RtpPacketInfos packet_infos;
+  {
+    RtpPacketInfos::vector_type infos;
+
+    RtpPacketInfo info;
+    info.set_ssrc(kSsrc);
+    info.set_csrcs({kCsrc});
+    info.set_rtp_timestamp(kRtpTimestamp);
+
+    info.set_receive_time_ms(clock_->TimeInMilliseconds() - 5000);
+    infos.push_back(info);
+
+    info.set_receive_time_ms(clock_->TimeInMilliseconds() - 3000);
+    infos.push_back(info);
+
+    info.set_receive_time_ms(clock_->TimeInMilliseconds() - 2000);
+    infos.push_back(info);
+
+    info.set_receive_time_ms(clock_->TimeInMilliseconds() - 4000);
+    infos.push_back(info);
+
+    packet_infos = RtpPacketInfos(std::move(infos));
+  }
+  test_frame->SetPacketInfos(packet_infos);
+
+  // Start receive stream.
+  video_receive_stream_->Start();
+  EXPECT_THAT(video_receive_stream_->GetSources(), IsEmpty());
+
+  // Render one video frame.
+  int64_t timestamp_ms_min = clock_->TimeInMilliseconds();
+  video_receive_stream_->OnCompleteFrame(std::move(test_frame));
+  EXPECT_TRUE(fake_renderer_.WaitForRenderedFrame(kDefaultTimeOutMs));
+  int64_t timestamp_ms_max = clock_->TimeInMilliseconds();
+
+  // Verify that the per-packet information is passed to the renderer.
+  EXPECT_THAT(fake_renderer_.packet_infos(), ElementsAreArray(packet_infos));
+
+  // Verify that the per-packet information also updates |GetSources()|.
+  std::vector<RtpSource> sources = video_receive_stream_->GetSources();
+  ASSERT_THAT(sources, SizeIs(2));
+  {
+    auto it = std::find_if(sources.begin(), sources.end(),
+                           [](const RtpSource& source) {
+                             return source.source_type() == RtpSourceType::SSRC;
+                           });
+    ASSERT_NE(it, sources.end());
+
+    EXPECT_EQ(it->source_id(), kSsrc);
+    EXPECT_EQ(it->source_type(), RtpSourceType::SSRC);
+    EXPECT_EQ(it->rtp_timestamp(), kRtpTimestamp);
+    EXPECT_GE(it->timestamp_ms(), timestamp_ms_min);
+    EXPECT_LE(it->timestamp_ms(), timestamp_ms_max);
+  }
+  {
+    auto it = std::find_if(sources.begin(), sources.end(),
+                           [](const RtpSource& source) {
+                             return source.source_type() == RtpSourceType::CSRC;
+                           });
+    ASSERT_NE(it, sources.end());
+
+    EXPECT_EQ(it->source_id(), kCsrc);
+    EXPECT_EQ(it->source_type(), RtpSourceType::CSRC);
+    EXPECT_EQ(it->rtp_timestamp(), kRtpTimestamp);
+    EXPECT_GE(it->timestamp_ms(), timestamp_ms_min);
+    EXPECT_LE(it->timestamp_ms(), timestamp_ms_max);
+  }
 }
 
 }  // namespace webrtc