Use separate queue for alive frames when self view is enabled in DVQA

Bug: b/195652126
Change-Id: Ief1c6ba5216147e0dbfe280e7c001902e1a4d6fc
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/229100
Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org>
Commit-Queue: Artem Titov <titovartem@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#34790}
diff --git a/test/pc/e2e/analyzer/video/default_video_quality_analyzer.cc b/test/pc/e2e/analyzer/video/default_video_quality_analyzer.cc
index 04fdaca..f93e66e 100644
--- a/test/pc/e2e/analyzer/video/default_video_quality_analyzer.cc
+++ b/test/pc/e2e/analyzer/video/default_video_quality_analyzer.cc
@@ -790,26 +790,25 @@
 }
 
 uint16_t DefaultVideoQualityAnalyzer::StreamState::PopFront(size_t peer) {
-  absl::optional<uint16_t> frame_id = frame_ids_.PopFront(peer);
+  size_t peer_queue = GetPeerQueueIndex(peer);
+  size_t alive_frames_queue = GetAliveFramesQueueIndex();
+  absl::optional<uint16_t> frame_id = frame_ids_.PopFront(peer_queue);
   RTC_DCHECK(frame_id.has_value());
 
   // If alive's frame queue is longer than all others, than also pop frame from
   // it, because that frame is received by all receivers.
-  size_t owner_size = frame_ids_.size(owner_);
+  size_t alive_size = frame_ids_.size(alive_frames_queue);
   size_t other_size = 0;
   for (size_t i = 0; i < frame_ids_.readers_count(); ++i) {
     size_t cur_size = frame_ids_.size(i);
-    if (i != owner_ && cur_size > other_size) {
+    if (i != alive_frames_queue && cur_size > other_size) {
       other_size = cur_size;
     }
   }
-  // Pops frame from owner queue if owner's queue is the longest and one of
-  // next conditions is true:
-  // 1. If `enable_receive_own_stream_` and `peer` == `owner_`
-  // 2. If !`enable_receive_own_stream_`
-  if (owner_size > other_size &&
-      (!enable_receive_own_stream_ || peer == owner_)) {
-    absl::optional<uint16_t> alive_frame_id = frame_ids_.PopFront(owner_);
+  // Pops frame from alive queue if alive's queue is the longest one.
+  if (alive_size > other_size) {
+    absl::optional<uint16_t> alive_frame_id =
+        frame_ids_.PopFront(alive_frames_queue);
     RTC_DCHECK(alive_frame_id.has_value());
     RTC_DCHECK_EQ(frame_id.value(), alive_frame_id.value());
   }
@@ -818,7 +817,8 @@
 }
 
 uint16_t DefaultVideoQualityAnalyzer::StreamState::MarkNextAliveFrameAsDead() {
-  absl::optional<uint16_t> frame_id = frame_ids_.PopFront(owner_);
+  absl::optional<uint16_t> frame_id =
+      frame_ids_.PopFront(GetAliveFramesQueueIndex());
   RTC_DCHECK(frame_id.has_value());
   return frame_id.value();
 }
@@ -840,6 +840,31 @@
   return MaybeGetValue(last_rendered_frame_time_, peer);
 }
 
+size_t DefaultVideoQualityAnalyzer::StreamState::GetPeerQueueIndex(
+    size_t peer_index) const {
+  // When sender isn't expecting to receive its own stream we will use their
+  // queue for tracking alive frames. Otherwise we will use the queue #0 to
+  // track alive frames and will shift all other queues for peers on 1.
+  // It means when `enable_receive_own_stream_` is true peer's queue will have
+  // index equal to `peer_index` + 1 and when `enable_receive_own_stream_` is
+  // false peer's queue will have index equal to `peer_index`.
+  if (!enable_receive_own_stream_) {
+    return peer_index;
+  }
+  return peer_index + 1;
+}
+
+size_t DefaultVideoQualityAnalyzer::StreamState::GetAliveFramesQueueIndex()
+    const {
+  // When sender isn't expecting to receive its own stream we will use their
+  // queue for tracking alive frames. Otherwise we will use the queue #0 to
+  // track alive frames and will shift all other queues for peers on 1.
+  if (!enable_receive_own_stream_) {
+    return owner_;
+  }
+  return 0;
+}
+
 bool DefaultVideoQualityAnalyzer::FrameInFlight::RemoveFrame() {
   if (!frame_) {
     return false;
diff --git a/test/pc/e2e/analyzer/video/default_video_quality_analyzer.h b/test/pc/e2e/analyzer/video/default_video_quality_analyzer.h
index 57b202e..04fe5f2 100644
--- a/test/pc/e2e/analyzer/video/default_video_quality_analyzer.h
+++ b/test/pc/e2e/analyzer/video/default_video_quality_analyzer.h
@@ -104,7 +104,8 @@
         : owner_(owner),
           enable_receive_own_stream_(enable_receive_own_stream),
           stream_started_time_(stream_started_time),
-          frame_ids_(peers_count) {}
+          frame_ids_(enable_receive_own_stream ? peers_count + 1
+                                               : peers_count) {}
 
     size_t owner() const { return owner_; }
     Timestamp stream_started_time() const { return stream_started_time_; }
@@ -113,22 +114,37 @@
     // Crash if state is empty. Guarantees that there can be no alive frames
     // that are not in the owner queue
     uint16_t PopFront(size_t peer);
-    bool IsEmpty(size_t peer) const { return frame_ids_.IsEmpty(peer); }
+    bool IsEmpty(size_t peer) const {
+      return frame_ids_.IsEmpty(GetPeerQueueIndex(peer));
+    }
     // Crash if state is empty.
-    uint16_t Front(size_t peer) const { return frame_ids_.Front(peer).value(); }
+    uint16_t Front(size_t peer) const {
+      return frame_ids_.Front(GetPeerQueueIndex(peer)).value();
+    }
 
     // When new peer is added - all current alive frames will be sent to it as
     // well. So we need to register them as expected by copying owner_ head to
     // the new head.
-    void AddPeer() { frame_ids_.AddHead(owner_); }
+    void AddPeer() { frame_ids_.AddHead(GetAliveFramesQueueIndex()); }
 
-    size_t GetAliveFramesCount() { return frame_ids_.size(owner_); }
+    size_t GetAliveFramesCount() const {
+      return frame_ids_.size(GetAliveFramesQueueIndex());
+    }
     uint16_t MarkNextAliveFrameAsDead();
 
     void SetLastRenderedFrameTime(size_t peer, Timestamp time);
     absl::optional<Timestamp> last_rendered_frame_time(size_t peer) const;
 
    private:
+    // Returns index of the `frame_ids_` queue which is used for specified
+    // `peer_index`.
+    size_t GetPeerQueueIndex(size_t peer_index) const;
+
+    // Returns index of the `frame_ids_` queue which is used to track alive
+    // frames for this stream. The frame is alive if it contains VideoFrame
+    // payload in `captured_frames_in_flight_`.
+    size_t GetAliveFramesQueueIndex() const;
+
     // Index of the owner. Owner's queue in `frame_ids_` will keep alive frames.
     const size_t owner_;
     const bool enable_receive_own_stream_;
@@ -145,10 +161,6 @@
     // If we received frame with id frame_id3, then we will pop frame_id1 and
     // frame_id2 and consider that frames as dropped and then compare received
     // frame with the one from `captured_frames_in_flight_` with id frame_id3.
-    //
-    // To track alive frames (frames that contains frame's payload in
-    // `captured_frames_in_flight_`) the head which corresponds to `owner_` will
-    // be used. So that head will point to the first alive frame in frames list.
     MultiHeadQueue<uint16_t> frame_ids_;
     std::map<size_t, Timestamp> last_rendered_frame_time_;
   };
diff --git a/test/pc/e2e/analyzer/video/default_video_quality_analyzer_test.cc b/test/pc/e2e/analyzer/video/default_video_quality_analyzer_test.cc
index 50d62f4..523bb0a 100644
--- a/test/pc/e2e/analyzer/video/default_video_quality_analyzer_test.cc
+++ b/test/pc/e2e/analyzer/video/default_video_quality_analyzer_test.cc
@@ -965,7 +965,8 @@
   EXPECT_EQ(frame_counters.rendered, 2);
 }
 
-TEST(DefaultVideoQualityAnalyzerTest, FrameCanBeReceivedBySender) {
+TEST(DefaultVideoQualityAnalyzerTest,
+     FrameCanBeReceivedBySenderAfterItWasReceivedByReceiver) {
   std::unique_ptr<test::FrameGeneratorInterface> frame_generator =
       test::CreateSquareFrameGenerator(kFrameWidth, kFrameHeight,
                                        /*type=*/absl::nullopt,
@@ -978,37 +979,45 @@
                  std::vector<std::string>{kSenderPeerName, kReceiverPeerName},
                  kAnalyzerMaxThreadsCount);
 
-  VideoFrame frame = NextFrame(frame_generator.get(), 1);
-
-  frame.set_id(analyzer.OnFrameCaptured(kSenderPeerName, kStreamLabel, frame));
-  analyzer.OnFramePreEncode(kSenderPeerName, frame);
-  analyzer.OnFrameEncoded(kSenderPeerName, frame.id(), FakeEncode(frame),
-                          VideoQualityAnalyzerInterface::EncoderStats());
+  std::vector<VideoFrame> frames;
+  for (int i = 0; i < 3; ++i) {
+    VideoFrame frame = NextFrame(frame_generator.get(), 1);
+    frame.set_id(
+        analyzer.OnFrameCaptured(kSenderPeerName, kStreamLabel, frame));
+    frames.push_back(frame);
+    analyzer.OnFramePreEncode(kSenderPeerName, frame);
+    analyzer.OnFrameEncoded(kSenderPeerName, frame.id(), FakeEncode(frame),
+                            VideoQualityAnalyzerInterface::EncoderStats());
+  }
 
   // Receive by 2nd peer.
-  VideoFrame received_frame = DeepCopy(frame);
-  analyzer.OnFramePreDecode(kReceiverPeerName, received_frame.id(),
-                            FakeEncode(received_frame));
-  analyzer.OnFrameDecoded(kReceiverPeerName, received_frame,
-                          VideoQualityAnalyzerInterface::DecoderStats());
-  analyzer.OnFrameRendered(kReceiverPeerName, received_frame);
+  for (VideoFrame& frame : frames) {
+    VideoFrame received_frame = DeepCopy(frame);
+    analyzer.OnFramePreDecode(kReceiverPeerName, received_frame.id(),
+                              FakeEncode(received_frame));
+    analyzer.OnFrameDecoded(kReceiverPeerName, received_frame,
+                            VideoQualityAnalyzerInterface::DecoderStats());
+    analyzer.OnFrameRendered(kReceiverPeerName, received_frame);
+  }
 
   // Check that we still have that frame in flight.
   AnalyzerStats analyzer_stats = analyzer.GetAnalyzerStats();
   std::vector<StatsSample> frames_in_flight_sizes =
       GetSortedSamples(analyzer_stats.frames_in_flight_left_count);
-  EXPECT_EQ(frames_in_flight_sizes.back().value, 1)
+  EXPECT_EQ(frames_in_flight_sizes.back().value, 3)
       << "Expected that frame is still in flight, "
       << "because it wasn't received by sender"
       << ToString(frames_in_flight_sizes);
 
   // Receive by sender
-  received_frame = DeepCopy(frame);
-  analyzer.OnFramePreDecode(kSenderPeerName, received_frame.id(),
-                            FakeEncode(received_frame));
-  analyzer.OnFrameDecoded(kSenderPeerName, received_frame,
-                          VideoQualityAnalyzerInterface::DecoderStats());
-  analyzer.OnFrameRendered(kSenderPeerName, received_frame);
+  for (VideoFrame& frame : frames) {
+    VideoFrame received_frame = DeepCopy(frame);
+    analyzer.OnFramePreDecode(kSenderPeerName, received_frame.id(),
+                              FakeEncode(received_frame));
+    analyzer.OnFrameDecoded(kSenderPeerName, received_frame,
+                            VideoQualityAnalyzerInterface::DecoderStats());
+    analyzer.OnFrameRendered(kSenderPeerName, received_frame);
+  }
 
   // Give analyzer some time to process frames on async thread. The computations
   // have to be fast (heavy metrics are disabled!), so if doesn't fit 100ms it
@@ -1017,7 +1026,7 @@
   analyzer.Stop();
 
   analyzer_stats = analyzer.GetAnalyzerStats();
-  EXPECT_EQ(analyzer_stats.comparisons_done, 2);
+  EXPECT_EQ(analyzer_stats.comparisons_done, 6);
 
   frames_in_flight_sizes =
       GetSortedSamples(analyzer_stats.frames_in_flight_left_count);
@@ -1025,29 +1034,124 @@
       << ToString(frames_in_flight_sizes);
 
   FrameCounters frame_counters = analyzer.GetGlobalCounters();
-  EXPECT_EQ(frame_counters.captured, 1);
-  EXPECT_EQ(frame_counters.rendered, 2);
+  EXPECT_EQ(frame_counters.captured, 3);
+  EXPECT_EQ(frame_counters.rendered, 6);
 
   EXPECT_EQ(analyzer.GetStats().size(), 2lu);
   {
     FrameCounters stream_conters = analyzer.GetPerStreamCounters().at(
         StatsKey(kStreamLabel, kSenderPeerName, kReceiverPeerName));
-    EXPECT_EQ(stream_conters.captured, 1);
-    EXPECT_EQ(stream_conters.pre_encoded, 1);
-    EXPECT_EQ(stream_conters.encoded, 1);
-    EXPECT_EQ(stream_conters.received, 1);
-    EXPECT_EQ(stream_conters.decoded, 1);
-    EXPECT_EQ(stream_conters.rendered, 1);
+    EXPECT_EQ(stream_conters.captured, 3);
+    EXPECT_EQ(stream_conters.pre_encoded, 3);
+    EXPECT_EQ(stream_conters.encoded, 3);
+    EXPECT_EQ(stream_conters.received, 3);
+    EXPECT_EQ(stream_conters.decoded, 3);
+    EXPECT_EQ(stream_conters.rendered, 3);
   }
   {
     FrameCounters stream_conters = analyzer.GetPerStreamCounters().at(
         StatsKey(kStreamLabel, kSenderPeerName, kSenderPeerName));
-    EXPECT_EQ(stream_conters.captured, 1);
-    EXPECT_EQ(stream_conters.pre_encoded, 1);
-    EXPECT_EQ(stream_conters.encoded, 1);
-    EXPECT_EQ(stream_conters.received, 1);
-    EXPECT_EQ(stream_conters.decoded, 1);
-    EXPECT_EQ(stream_conters.rendered, 1);
+    EXPECT_EQ(stream_conters.captured, 3);
+    EXPECT_EQ(stream_conters.pre_encoded, 3);
+    EXPECT_EQ(stream_conters.encoded, 3);
+    EXPECT_EQ(stream_conters.received, 3);
+    EXPECT_EQ(stream_conters.decoded, 3);
+    EXPECT_EQ(stream_conters.rendered, 3);
+  }
+}
+
+TEST(DefaultVideoQualityAnalyzerTest,
+     FrameCanBeReceivedByRecieverAfterItWasReceivedBySender) {
+  std::unique_ptr<test::FrameGeneratorInterface> frame_generator =
+      test::CreateSquareFrameGenerator(kFrameWidth, kFrameHeight,
+                                       /*type=*/absl::nullopt,
+                                       /*num_squares=*/absl::nullopt);
+
+  DefaultVideoQualityAnalyzerOptions options = AnalyzerOptionsForTest();
+  options.enable_receive_own_stream = true;
+  DefaultVideoQualityAnalyzer analyzer(Clock::GetRealTimeClock(), options);
+  analyzer.Start("test_case",
+                 std::vector<std::string>{kSenderPeerName, kReceiverPeerName},
+                 kAnalyzerMaxThreadsCount);
+
+  std::vector<VideoFrame> frames;
+  for (int i = 0; i < 3; ++i) {
+    VideoFrame frame = NextFrame(frame_generator.get(), 1);
+    frame.set_id(
+        analyzer.OnFrameCaptured(kSenderPeerName, kStreamLabel, frame));
+    frames.push_back(frame);
+    analyzer.OnFramePreEncode(kSenderPeerName, frame);
+    analyzer.OnFrameEncoded(kSenderPeerName, frame.id(), FakeEncode(frame),
+                            VideoQualityAnalyzerInterface::EncoderStats());
+  }
+
+  // Receive by sender
+  for (VideoFrame& frame : frames) {
+    VideoFrame received_frame = DeepCopy(frame);
+    analyzer.OnFramePreDecode(kSenderPeerName, received_frame.id(),
+                              FakeEncode(received_frame));
+    analyzer.OnFrameDecoded(kSenderPeerName, received_frame,
+                            VideoQualityAnalyzerInterface::DecoderStats());
+    analyzer.OnFrameRendered(kSenderPeerName, received_frame);
+  }
+
+  // Check that we still have that frame in flight.
+  AnalyzerStats analyzer_stats = analyzer.GetAnalyzerStats();
+  std::vector<StatsSample> frames_in_flight_sizes =
+      GetSortedSamples(analyzer_stats.frames_in_flight_left_count);
+  EXPECT_EQ(frames_in_flight_sizes.back().value, 3)
+      << "Expected that frame is still in flight, "
+      << "because it wasn't received by sender"
+      << ToString(frames_in_flight_sizes);
+
+  // Receive by 2nd peer.
+  for (VideoFrame& frame : frames) {
+    VideoFrame received_frame = DeepCopy(frame);
+    analyzer.OnFramePreDecode(kReceiverPeerName, received_frame.id(),
+                              FakeEncode(received_frame));
+    analyzer.OnFrameDecoded(kReceiverPeerName, received_frame,
+                            VideoQualityAnalyzerInterface::DecoderStats());
+    analyzer.OnFrameRendered(kReceiverPeerName, received_frame);
+  }
+
+  // Give analyzer some time to process frames on async thread. The computations
+  // have to be fast (heavy metrics are disabled!), so if doesn't fit 100ms it
+  // means we have an issue!
+  SleepMs(100);
+  analyzer.Stop();
+
+  analyzer_stats = analyzer.GetAnalyzerStats();
+  EXPECT_EQ(analyzer_stats.comparisons_done, 6);
+
+  frames_in_flight_sizes =
+      GetSortedSamples(analyzer_stats.frames_in_flight_left_count);
+  EXPECT_EQ(frames_in_flight_sizes.back().value, 0)
+      << ToString(frames_in_flight_sizes);
+
+  FrameCounters frame_counters = analyzer.GetGlobalCounters();
+  EXPECT_EQ(frame_counters.captured, 3);
+  EXPECT_EQ(frame_counters.rendered, 6);
+
+  EXPECT_EQ(analyzer.GetStats().size(), 2lu);
+  {
+    FrameCounters stream_conters = analyzer.GetPerStreamCounters().at(
+        StatsKey(kStreamLabel, kSenderPeerName, kReceiverPeerName));
+    EXPECT_EQ(stream_conters.captured, 3);
+    EXPECT_EQ(stream_conters.pre_encoded, 3);
+    EXPECT_EQ(stream_conters.encoded, 3);
+    EXPECT_EQ(stream_conters.received, 3);
+    EXPECT_EQ(stream_conters.decoded, 3);
+    EXPECT_EQ(stream_conters.rendered, 3);
+  }
+  {
+    FrameCounters stream_conters = analyzer.GetPerStreamCounters().at(
+        StatsKey(kStreamLabel, kSenderPeerName, kSenderPeerName));
+    EXPECT_EQ(stream_conters.captured, 3);
+    EXPECT_EQ(stream_conters.pre_encoded, 3);
+    EXPECT_EQ(stream_conters.encoded, 3);
+    EXPECT_EQ(stream_conters.received, 3);
+    EXPECT_EQ(stream_conters.decoded, 3);
+    EXPECT_EQ(stream_conters.rendered, 3);
   }
 }
 
diff --git a/test/pc/e2e/analyzer/video/multi_head_queue.h b/test/pc/e2e/analyzer/video/multi_head_queue.h
index 0af6a19..1486780 100644
--- a/test/pc/e2e/analyzer/video/multi_head_queue.h
+++ b/test/pc/e2e/analyzer/video/multi_head_queue.h
@@ -36,6 +36,8 @@
   }
 
   // Creates a copy of an existing head. Complexity O(MultiHeadQueue::size()).
+  // `copy_index` - index of the queue that will be used as a source for
+  //     copying.
   void AddHead(size_t copy_index) { queues_.push_back(queues_[copy_index]); }
 
   // Add value to the end of the queue. Complexity O(readers_count).