Add DVQA support for scenarios with new participants joining

Bug: webrtc:12247
Change-Id: Id51a2ab34e0b802e11931cad13f48ce8eefddcae
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/196361
Reviewed-by: Sebastian Jansson <srte@webrtc.org>
Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org>
Commit-Queue: Andrey Logvin <landrey@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#32804}
diff --git a/api/test/video_quality_analyzer_interface.h b/api/test/video_quality_analyzer_interface.h
index c5370a7..4488e5a 100644
--- a/api/test/video_quality_analyzer_interface.h
+++ b/api/test/video_quality_analyzer_interface.h
@@ -138,6 +138,9 @@
       absl::string_view pc_label,
       const rtc::scoped_refptr<const RTCStatsReport>& report) override {}
 
+  // Will be called before test adds new participant in the middle of a call.
+  virtual void RegisterParticipantInCall(absl::string_view peer_name) {}
+
   // Tells analyzer that analysis complete and it should calculate final
   // statistics.
   virtual void Stop() {}
diff --git a/test/pc/e2e/analyzer/video/default_video_quality_analyzer.cc b/test/pc/e2e/analyzer/video/default_video_quality_analyzer.cc
index 23f9433..04999c3 100644
--- a/test/pc/e2e/analyzer/video/default_video_quality_analyzer.cc
+++ b/test/pc/e2e/analyzer/video/default_video_quality_analyzer.cc
@@ -495,6 +495,40 @@
                     << ", code=" << error_code;
 }
 
+void DefaultVideoQualityAnalyzer::RegisterParticipantInCall(
+    absl::string_view peer_name) {
+  MutexLock lock1(&lock_);
+  MutexLock lock2(&comparison_lock_);
+  RTC_CHECK(!peers_->HasName(peer_name));
+  peers_->AddIfAbsent(peer_name);
+
+  // Ensure stats for receiving (for frames from other peers to this one)
+  // streams exists. Since in flight frames will be sent to the new peer
+  // as well. Sending stats (from this peer to others) will be added by
+  // DefaultVideoQualityAnalyzer::OnFrameCaptured.
+  for (auto& key_val : stream_to_sender_) {
+    InternalStatsKey key(key_val.first, key_val.second,
+                         peers_->index(peer_name));
+    const int64_t frames_count = captured_frames_in_flight_.size();
+    FrameCounters counters;
+    counters.captured = frames_count;
+    counters.pre_encoded = frames_count;
+    counters.encoded = frames_count;
+    stream_frame_counters_.insert({key, std::move(counters)});
+
+    stream_last_freeze_end_time_.insert({key, start_time_});
+  }
+  // Ensure, that frames states are handled correctly
+  // (e.g. dropped frames tracking).
+  for (auto& key_val : stream_states_) {
+    key_val.second.AddPeer();
+  }
+  // Register new peer for every frame in flight.
+  for (auto& key_val : captured_frames_in_flight_) {
+    key_val.second.AddPeer();
+  }
+}
+
 void DefaultVideoQualityAnalyzer::Stop() {
   {
     MutexLock lock(&lock_);
diff --git a/test/pc/e2e/analyzer/video/default_video_quality_analyzer.h b/test/pc/e2e/analyzer/video/default_video_quality_analyzer.h
index a181133..f30e61b 100644
--- a/test/pc/e2e/analyzer/video/default_video_quality_analyzer.h
+++ b/test/pc/e2e/analyzer/video/default_video_quality_analyzer.h
@@ -221,6 +221,7 @@
   void OnDecoderError(absl::string_view peer_name,
                       uint16_t frame_id,
                       int32_t error_code) override;
+  void RegisterParticipantInCall(absl::string_view peer_name) override;
   void Stop() override;
   std::string GetStreamLabel(uint16_t frame_id) override;
   void OnStatsReports(
@@ -309,12 +310,18 @@
     size_t owner() const { return owner_; }
 
     void PushBack(uint16_t frame_id) { frame_ids_.PushBack(frame_id); }
-    // Crash if state is empty.
+    // Crash if state is empty. Guarantees that there can be no alive frames
+    // that are not in the owner queue
     uint16_t PopFront(size_t peer);
     bool IsEmpty(size_t peer) const { return frame_ids_.IsEmpty(peer); }
     // Crash if state is empty.
     uint16_t Front(size_t peer) const { return frame_ids_.Front(peer).value(); }
 
+    // When new peer is added - all current alive frames will be sent to it as
+    // well. So we need to register them as expected by copying owner_ head to
+    // the new head.
+    void AddPeer() { frame_ids_.AddHead(owner_); }
+
     size_t GetAliveFramesCount() { return frame_ids_.size(owner_); }
     uint16_t MarkNextAliveFrameAsDead();
 
@@ -379,6 +386,8 @@
     bool RemoveFrame();
     void SetFrameId(uint16_t id);
 
+    void AddPeer() { ++peers_count_; }
+
     std::vector<size_t> GetPeersWhichDidntReceive() const;
     bool HaveAllPeersReceived() const;
 
@@ -425,7 +434,7 @@
    private:
     const size_t stream_;
     const size_t owner_;
-    const size_t peers_count_;
+    size_t peers_count_;
     absl::optional<VideoFrame> frame_;
 
     // Frame events timestamp.
diff --git a/test/pc/e2e/analyzer/video/default_video_quality_analyzer_test.cc b/test/pc/e2e/analyzer/video/default_video_quality_analyzer_test.cc
index 9e6e5e4..5fdd051 100644
--- a/test/pc/e2e/analyzer/video/default_video_quality_analyzer_test.cc
+++ b/test/pc/e2e/analyzer/video/default_video_quality_analyzer_test.cc
@@ -745,6 +745,114 @@
   EXPECT_EQ(analyzer.GetCpuUsagePercent(), cpu_usage);
 }
 
+TEST(DefaultVideoQualityAnalyzerTest, RuntimeParticipantsAdding) {
+  std::unique_ptr<test::FrameGeneratorInterface> frame_generator =
+      test::CreateSquareFrameGenerator(kFrameWidth, kFrameHeight,
+                                       /*type=*/absl::nullopt,
+                                       /*num_squares=*/absl::nullopt);
+
+  constexpr char kAlice[] = "alice";
+  constexpr char kBob[] = "bob";
+  constexpr char kCharlie[] = "charlie";
+
+  DefaultVideoQualityAnalyzer analyzer(Clock::GetRealTimeClock(),
+                                       AnalyzerOptionsForTest());
+  analyzer.Start("test_case", {}, kAnalyzerMaxThreadsCount);
+
+  std::map<uint16_t, VideoFrame> captured_frames;
+  std::vector<uint16_t> frames_order;
+  analyzer.RegisterParticipantInCall(kAlice);
+  analyzer.RegisterParticipantInCall(kBob);
+  for (int i = 0; i < kMaxFramesInFlightPerStream; ++i) {
+    VideoFrame frame = NextFrame(frame_generator.get(), i);
+    frame.set_id(analyzer.OnFrameCaptured(kAlice, kStreamLabel, frame));
+    frames_order.push_back(frame.id());
+    captured_frames.insert({frame.id(), frame});
+    analyzer.OnFramePreEncode(kAlice, frame);
+    analyzer.OnFrameEncoded(kAlice, frame.id(), FakeEncode(frame),
+                            VideoQualityAnalyzerInterface::EncoderStats());
+  }
+
+  for (size_t i = 0; i < frames_order.size() / 2; ++i) {
+    uint16_t frame_id = frames_order.at(i);
+    VideoFrame received_frame = DeepCopy(captured_frames.at(frame_id));
+    analyzer.OnFramePreDecode(kBob, received_frame.id(),
+                              FakeEncode(received_frame));
+    analyzer.OnFrameDecoded(kBob, received_frame,
+                            VideoQualityAnalyzerInterface::DecoderStats());
+    analyzer.OnFrameRendered(kBob, received_frame);
+  }
+
+  analyzer.RegisterParticipantInCall(kCharlie);
+
+  for (size_t i = frames_order.size() / 2; i < frames_order.size(); ++i) {
+    uint16_t frame_id = frames_order.at(i);
+    VideoFrame bob_received_frame = DeepCopy(captured_frames.at(frame_id));
+    analyzer.OnFramePreDecode(kBob, bob_received_frame.id(),
+                              FakeEncode(bob_received_frame));
+    analyzer.OnFrameDecoded(kBob, bob_received_frame,
+                            VideoQualityAnalyzerInterface::DecoderStats());
+    analyzer.OnFrameRendered(kBob, bob_received_frame);
+
+    VideoFrame charlie_received_frame = DeepCopy(captured_frames.at(frame_id));
+    analyzer.OnFramePreDecode(kCharlie, charlie_received_frame.id(),
+                              FakeEncode(charlie_received_frame));
+    analyzer.OnFrameDecoded(kCharlie, charlie_received_frame,
+                            VideoQualityAnalyzerInterface::DecoderStats());
+    analyzer.OnFrameRendered(kCharlie, charlie_received_frame);
+  }
+
+  // Give analyzer some time to process frames on async thread. The computations
+  // have to be fast (heavy metrics are disabled!), so if doesn't fit 100ms it
+  // means we have an issue!
+  SleepMs(100);
+  analyzer.Stop();
+
+  AnalyzerStats stats = analyzer.GetAnalyzerStats();
+  EXPECT_EQ(stats.memory_overloaded_comparisons_done, 0);
+  EXPECT_EQ(stats.comparisons_done,
+            kMaxFramesInFlightPerStream + kMaxFramesInFlightPerStream / 2);
+
+  std::vector<StatsSample> frames_in_flight_sizes =
+      GetSortedSamples(stats.frames_in_flight_left_count);
+  EXPECT_EQ(frames_in_flight_sizes.back().value, 0)
+      << ToString(frames_in_flight_sizes);
+
+  FrameCounters frame_counters = analyzer.GetGlobalCounters();
+  EXPECT_EQ(frame_counters.captured, kMaxFramesInFlightPerStream);
+  EXPECT_EQ(frame_counters.received,
+            kMaxFramesInFlightPerStream + kMaxFramesInFlightPerStream / 2);
+  EXPECT_EQ(frame_counters.decoded,
+            kMaxFramesInFlightPerStream + kMaxFramesInFlightPerStream / 2);
+  EXPECT_EQ(frame_counters.rendered,
+            kMaxFramesInFlightPerStream + kMaxFramesInFlightPerStream / 2);
+  EXPECT_EQ(frame_counters.dropped, 0);
+
+  EXPECT_EQ(analyzer.GetKnownVideoStreams().size(), 2lu);
+  const StatsKey kAliceBobStats(kStreamLabel, kAlice, kBob);
+  const StatsKey kAliceCharlieStats(kStreamLabel, kAlice, kCharlie);
+  {
+    FrameCounters stream_conters =
+        analyzer.GetPerStreamCounters().at(kAliceBobStats);
+    EXPECT_EQ(stream_conters.captured, 10);
+    EXPECT_EQ(stream_conters.pre_encoded, 10);
+    EXPECT_EQ(stream_conters.encoded, 10);
+    EXPECT_EQ(stream_conters.received, 10);
+    EXPECT_EQ(stream_conters.decoded, 10);
+    EXPECT_EQ(stream_conters.rendered, 10);
+  }
+  {
+    FrameCounters stream_conters =
+        analyzer.GetPerStreamCounters().at(kAliceCharlieStats);
+    EXPECT_EQ(stream_conters.captured, 5);
+    EXPECT_EQ(stream_conters.pre_encoded, 5);
+    EXPECT_EQ(stream_conters.encoded, 5);
+    EXPECT_EQ(stream_conters.received, 5);
+    EXPECT_EQ(stream_conters.decoded, 5);
+    EXPECT_EQ(stream_conters.rendered, 5);
+  }
+}
+
 }  // namespace
 }  // namespace webrtc_pc_e2e
 }  // namespace webrtc
diff --git a/test/pc/e2e/analyzer/video/multi_head_queue.h b/test/pc/e2e/analyzer/video/multi_head_queue.h
index 52314a6..fc606d2 100644
--- a/test/pc/e2e/analyzer/video/multi_head_queue.h
+++ b/test/pc/e2e/analyzer/video/multi_head_queue.h
@@ -35,6 +35,9 @@
     }
   }
 
+  // Creates a copy of an existing head. Complexity O(MultiHeadQueue::size()).
+  void AddHead(size_t copy_index) { queues_.push_back(queues_[copy_index]); }
+
   // Add value to the end of the queue. Complexity O(readers_count).
   void PushBack(T value) {
     for (auto& queue : queues_) {
diff --git a/test/pc/e2e/analyzer/video/multi_head_queue_test.cc b/test/pc/e2e/analyzer/video/multi_head_queue_test.cc
index 3a4ab6c..0025d1e 100644
--- a/test/pc/e2e/analyzer/video/multi_head_queue_test.cc
+++ b/test/pc/e2e/analyzer/video/multi_head_queue_test.cc
@@ -98,6 +98,25 @@
   }
 }
 
+TEST(MultiHeadQueueTest, HeadCopy) {
+  MultiHeadQueue<size_t> queue = MultiHeadQueue<size_t>(1);
+  for (size_t i = 0; i < 10; ++i) {
+    queue.PushBack(i);
+    EXPECT_EQ(queue.size(), i + 1);
+  }
+  queue.AddHead(0);
+  EXPECT_EQ(queue.readers_count(), 2u);
+  for (size_t i = 0; i < 10; ++i) {
+    absl::optional<size_t> value1 = queue.PopFront(0);
+    absl::optional<size_t> value2 = queue.PopFront(1);
+    EXPECT_EQ(queue.size(), 10 - i - 1);
+    ASSERT_TRUE(value1.has_value());
+    ASSERT_TRUE(value2.has_value());
+    EXPECT_EQ(value1.value(), i);
+    EXPECT_EQ(value2.value(), i);
+  }
+}
+
 }  // namespace
 }  // namespace webrtc_pc_e2e
 }  // namespace webrtc
diff --git a/test/pc/e2e/analyzer/video/video_quality_analyzer_injection_helper.h b/test/pc/e2e/analyzer/video/video_quality_analyzer_injection_helper.h
index 981a359..111aa34 100644
--- a/test/pc/e2e/analyzer/video/video_quality_analyzer_injection_helper.h
+++ b/test/pc/e2e/analyzer/video/video_quality_analyzer_injection_helper.h
@@ -46,6 +46,12 @@
       EncodedImageDataExtractor* extractor);
   ~VideoQualityAnalyzerInjectionHelper() override;
 
+  // Registers new call participant to the underlying video quality analyzer.
+  // The method should be called before the participant is actually added.
+  void RegisterParticipantInCall(absl::string_view peer_name) {
+    analyzer_->RegisterParticipantInCall(peer_name);
+  }
+
   // Wraps video encoder factory to give video quality analyzer access to frames
   // before encoding and encoded images after.
   std::unique_ptr<VideoEncoderFactory> WrapVideoEncoderFactory(