Allow DVQA analyse selft stream for peer

Bug: b/195652126
Change-Id: Ie65e238028b932866a39aeec3797ac576c5f6c1c
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/227769
Commit-Queue: Artem Titov <titovartem@webrtc.org>
Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#34656}
diff --git a/test/pc/e2e/analyzer/video/default_video_quality_analyzer.cc b/test/pc/e2e/analyzer/video/default_video_quality_analyzer.cc
index 606ee01..9f285b5 100644
--- a/test/pc/e2e/analyzer/video/default_video_quality_analyzer.cc
+++ b/test/pc/e2e/analyzer/video/default_video_quality_analyzer.cc
@@ -180,7 +180,7 @@
     // Ensure stats for this stream exists.
     MutexLock lock(&comparison_lock_);
     for (size_t i = 0; i < peers_count; ++i) {
-      if (i == peer_index) {
+      if (i == peer_index && !options_.enable_receive_own_stream) {
         continue;
       }
       InternalStatsKey stats_key(stream_index, peer_index, i);
@@ -205,7 +205,7 @@
     stream_to_sender_[stream_index] = peer_index;
     frame_counters_.captured++;
     for (size_t i = 0; i < peers_->size(); ++i) {
-      if (i != peer_index) {
+      if (i != peer_index || options_.enable_receive_own_stream) {
         InternalStatsKey key(stream_index, peer_index, i);
         stream_frame_counters_[key].captured++;
       }
@@ -214,7 +214,8 @@
     auto state_it = stream_states_.find(stream_index);
     if (state_it == stream_states_.end()) {
       stream_states_.emplace(stream_index,
-                             StreamState(peer_index, peers_->size()));
+                             StreamState(peer_index, peers_->size(),
+                                         options_.enable_receive_own_stream));
     }
     StreamState* state = &stream_states_.at(stream_index);
     state->PushBack(frame_id);
@@ -225,7 +226,7 @@
       // still in flight, it means that this stream wasn't rendered for long
       // time and we need to process existing frame as dropped.
       for (size_t i = 0; i < peers_->size(); ++i) {
-        if (i == peer_index) {
+        if (i == peer_index && !options_.enable_receive_own_stream) {
           continue;
         }
 
@@ -248,7 +249,8 @@
     captured_frames_in_flight_.emplace(
         frame_id,
         FrameInFlight(stream_index, frame,
-                      /*captured_time=*/Now(), peer_index, peers_->size()));
+                      /*captured_time=*/Now(), peer_index, peers_->size(),
+                      options_.enable_receive_own_stream));
     // Set frame id on local copy of the frame
     captured_frames_in_flight_.at(frame_id).SetFrameId(frame_id);
 
@@ -287,7 +289,7 @@
   frame_counters_.pre_encoded++;
   size_t peer_index = peers_->index(peer_name);
   for (size_t i = 0; i < peers_->size(); ++i) {
-    if (i != peer_index) {
+    if (i != peer_index || options_.enable_receive_own_stream) {
       InternalStatsKey key(it->second.stream(), peer_index, i);
       stream_frame_counters_.at(key).pre_encoded++;
     }
@@ -318,7 +320,7 @@
     frame_counters_.encoded++;
     size_t peer_index = peers_->index(peer_name);
     for (size_t i = 0; i < peers_->size(); ++i) {
-      if (i != peer_index) {
+      if (i != peer_index || options_.enable_receive_own_stream) {
         InternalStatsKey key(it->second.stream(), peer_index, i);
         stream_frame_counters_.at(key).encoded++;
       }
@@ -965,7 +967,7 @@
 
 std::string DefaultVideoQualityAnalyzer::StatsKeyToMetricName(
     const StatsKey& key) const {
-  if (peers_->size() <= 2) {
+  if (peers_->size() <= 2 && key.sender != key.receiver) {
     return key.stream_label;
   }
   return key.ToString();
@@ -1026,7 +1028,12 @@
       other_size = cur_size;
     }
   }
-  if (owner_size > other_size) {
+  // Pops frame from owner queue if owner's queue is the longest and one of
+  // next conditions is true:
+  // 1. If `enable_receive_own_stream_` and `peer` == `owner_`
+  // 2. If !`enable_receive_own_stream_`
+  if (owner_size > other_size &&
+      (!enable_receive_own_stream_ || peer == owner_)) {
     absl::optional<uint16_t> alive_frame_id = frame_ids_.PopFront(owner_);
     RTC_DCHECK(alive_frame_id.has_value());
     RTC_DCHECK_EQ(frame_id.value(), alive_frame_id.value());
@@ -1077,8 +1084,11 @@
   std::vector<size_t> out;
   for (size_t i = 0; i < peers_count_; ++i) {
     auto it = receiver_stats_.find(i);
-    if (i != owner_ && it != receiver_stats_.end() &&
-        it->second.rendered_time.IsInfinite()) {
+    bool should_current_peer_receive =
+        i != owner_ || enable_receive_own_stream_;
+    if (should_current_peer_receive &&
+        (it == receiver_stats_.end() ||
+         it->second.rendered_time.IsInfinite())) {
       out.push_back(i);
     }
   }
@@ -1087,7 +1097,8 @@
 
 bool DefaultVideoQualityAnalyzer::FrameInFlight::HaveAllPeersReceived() const {
   for (size_t i = 0; i < peers_count_; ++i) {
-    if (i == owner_) {
+    // Skip `owner_` only if peer can't receive its own stream.
+    if (i == owner_ && !enable_receive_own_stream_) {
       continue;
     }
 
diff --git a/test/pc/e2e/analyzer/video/default_video_quality_analyzer.h b/test/pc/e2e/analyzer/video/default_video_quality_analyzer.h
index b8e312a..f5f3920 100644
--- a/test/pc/e2e/analyzer/video/default_video_quality_analyzer.h
+++ b/test/pc/e2e/analyzer/video/default_video_quality_analyzer.h
@@ -183,6 +183,8 @@
   // receivers per stream.
   size_t max_frames_in_flight_per_stream_count =
       kDefaultMaxFramesInFlightPerStream;
+  // If true, the analyzer will expect peers to receive their own video streams.
+  bool enable_receive_own_stream = false;
 };
 
 class DefaultVideoQualityAnalyzer : public VideoQualityAnalyzerInterface {
@@ -304,8 +306,12 @@
   // Represents a current state of video stream.
   class StreamState {
    public:
-    StreamState(size_t owner, size_t peers_count)
-        : owner_(owner), frame_ids_(peers_count) {}
+    StreamState(size_t owner,
+                size_t peers_count,
+                bool enable_receive_own_stream)
+        : owner_(owner),
+          enable_receive_own_stream_(enable_receive_own_stream),
+          frame_ids_(peers_count) {}
 
     size_t owner() const { return owner_; }
 
@@ -331,6 +337,7 @@
    private:
     // Index of the owner. Owner's queue in `frame_ids_` will keep alive frames.
     const size_t owner_;
+    const bool enable_receive_own_stream_;
     // To correctly determine dropped frames we have to know sequence of frames
     // in each stream so we will keep a list of frame ids inside the stream.
     // This list is represented by multi head queue of frame ids with separate
@@ -373,10 +380,12 @@
                   VideoFrame frame,
                   Timestamp captured_time,
                   size_t owner,
-                  size_t peers_count)
+                  size_t peers_count,
+                  bool enable_receive_own_stream)
         : stream_(stream),
           owner_(owner),
           peers_count_(peers_count),
+          enable_receive_own_stream_(enable_receive_own_stream),
           frame_(std::move(frame)),
           captured_time_(captured_time) {}
 
@@ -435,6 +444,7 @@
     const size_t stream_;
     const size_t owner_;
     size_t peers_count_;
+    const bool enable_receive_own_stream_;
     absl::optional<VideoFrame> frame_;
 
     // Frame events timestamp.
diff --git a/test/pc/e2e/analyzer/video/default_video_quality_analyzer_test.cc b/test/pc/e2e/analyzer/video/default_video_quality_analyzer_test.cc
index 5a99f97..46c8c34 100644
--- a/test/pc/e2e/analyzer/video/default_video_quality_analyzer_test.cc
+++ b/test/pc/e2e/analyzer/video/default_video_quality_analyzer_test.cc
@@ -963,6 +963,92 @@
   EXPECT_EQ(frame_counters.rendered, 2);
 }
 
+TEST(DefaultVideoQualityAnalyzerTest, FrameCanBeReceivedBySender) {
+  std::unique_ptr<test::FrameGeneratorInterface> frame_generator =
+      test::CreateSquareFrameGenerator(kFrameWidth, kFrameHeight,
+                                       /*type=*/absl::nullopt,
+                                       /*num_squares=*/absl::nullopt);
+
+  DefaultVideoQualityAnalyzerOptions options = AnalyzerOptionsForTest();
+  options.enable_receive_own_stream = true;
+  DefaultVideoQualityAnalyzer analyzer(Clock::GetRealTimeClock(), options);
+  analyzer.Start("test_case",
+                 std::vector<std::string>{kSenderPeerName, kReceiverPeerName},
+                 kAnalyzerMaxThreadsCount);
+
+  VideoFrame frame = NextFrame(frame_generator.get(), 1);
+
+  frame.set_id(analyzer.OnFrameCaptured(kSenderPeerName, kStreamLabel, frame));
+  analyzer.OnFramePreEncode(kSenderPeerName, frame);
+  analyzer.OnFrameEncoded(kSenderPeerName, frame.id(), FakeEncode(frame),
+                          VideoQualityAnalyzerInterface::EncoderStats());
+
+  // Receive by 2nd peer.
+  VideoFrame received_frame = DeepCopy(frame);
+  analyzer.OnFramePreDecode(kReceiverPeerName, received_frame.id(),
+                            FakeEncode(received_frame));
+  analyzer.OnFrameDecoded(kReceiverPeerName, received_frame,
+                          VideoQualityAnalyzerInterface::DecoderStats());
+  analyzer.OnFrameRendered(kReceiverPeerName, received_frame);
+
+  // Check that we still have that frame in flight.
+  AnalyzerStats analyzer_stats = analyzer.GetAnalyzerStats();
+  std::vector<StatsSample> frames_in_flight_sizes =
+      GetSortedSamples(analyzer_stats.frames_in_flight_left_count);
+  EXPECT_EQ(frames_in_flight_sizes.back().value, 1)
+      << "Expected that frame is still in flight, "
+      << "because it wasn't received by sender"
+      << ToString(frames_in_flight_sizes);
+
+  // Receive by sender
+  received_frame = DeepCopy(frame);
+  analyzer.OnFramePreDecode(kSenderPeerName, received_frame.id(),
+                            FakeEncode(received_frame));
+  analyzer.OnFrameDecoded(kSenderPeerName, received_frame,
+                          VideoQualityAnalyzerInterface::DecoderStats());
+  analyzer.OnFrameRendered(kSenderPeerName, received_frame);
+
+  // Give analyzer some time to process frames on async thread. The computations
+  // have to be fast (heavy metrics are disabled!), so if doesn't fit 100ms it
+  // means we have an issue!
+  SleepMs(100);
+  analyzer.Stop();
+
+  analyzer_stats = analyzer.GetAnalyzerStats();
+  EXPECT_EQ(analyzer_stats.comparisons_done, 2);
+
+  frames_in_flight_sizes =
+      GetSortedSamples(analyzer_stats.frames_in_flight_left_count);
+  EXPECT_EQ(frames_in_flight_sizes.back().value, 0)
+      << ToString(frames_in_flight_sizes);
+
+  FrameCounters frame_counters = analyzer.GetGlobalCounters();
+  EXPECT_EQ(frame_counters.captured, 1);
+  EXPECT_EQ(frame_counters.rendered, 2);
+
+  EXPECT_EQ(analyzer.GetStats().size(), 2lu);
+  {
+    FrameCounters stream_conters = analyzer.GetPerStreamCounters().at(
+        StatsKey(kStreamLabel, kSenderPeerName, kReceiverPeerName));
+    EXPECT_EQ(stream_conters.captured, 1);
+    EXPECT_EQ(stream_conters.pre_encoded, 1);
+    EXPECT_EQ(stream_conters.encoded, 1);
+    EXPECT_EQ(stream_conters.received, 1);
+    EXPECT_EQ(stream_conters.decoded, 1);
+    EXPECT_EQ(stream_conters.rendered, 1);
+  }
+  {
+    FrameCounters stream_conters = analyzer.GetPerStreamCounters().at(
+        StatsKey(kStreamLabel, kSenderPeerName, kSenderPeerName));
+    EXPECT_EQ(stream_conters.captured, 1);
+    EXPECT_EQ(stream_conters.pre_encoded, 1);
+    EXPECT_EQ(stream_conters.encoded, 1);
+    EXPECT_EQ(stream_conters.received, 1);
+    EXPECT_EQ(stream_conters.decoded, 1);
+    EXPECT_EQ(stream_conters.rendered, 1);
+  }
+}
+
 }  // namespace
 }  // namespace webrtc_pc_e2e
 }  // namespace webrtc