[DVQA] Introduce FramesStorage to centralize frames management

Bug: b/271542055, webrtc:14995
Change-Id: I881801b6f79e940404ab80ac28db8df2a04dcaef
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/298048
Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org>
Reviewed-by: Jeremy Leconte <jleconte@google.com>
Commit-Queue: Artem Titov <titovartem@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#39609}
diff --git a/test/pc/e2e/analyzer/video/BUILD.gn b/test/pc/e2e/analyzer/video/BUILD.gn
index 5c489e8..91af64e 100644
--- a/test/pc/e2e/analyzer/video/BUILD.gn
+++ b/test/pc/e2e/analyzer/video/BUILD.gn
@@ -313,6 +313,7 @@
     "../../../../../rtc_base:stringutils",
     "../../../../../rtc_base/synchronization:mutex",
     "../../../../../system_wrappers",
+    "dvqa:frames_storage",
   ]
   absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ]
 }
@@ -326,6 +327,7 @@
 
   deps = [
     "../../../../../api/numerics",
+    "../../../../../api/units:time_delta",
     "../../../../../api/units:timestamp",
     "../../../../../rtc_base:checks",
     "../../../../../rtc_base:stringutils",
diff --git a/test/pc/e2e/analyzer/video/default_video_quality_analyzer.cc b/test/pc/e2e/analyzer/video/default_video_quality_analyzer.cc
index 389cb09..96269a6 100644
--- a/test/pc/e2e/analyzer/video/default_video_quality_analyzer.cc
+++ b/test/pc/e2e/analyzer/video/default_video_quality_analyzer.cc
@@ -133,6 +133,7 @@
     : options_(options),
       clock_(clock),
       metrics_logger_(metrics_logger),
+      frames_storage_(options.max_frames_storage_duration, clock_),
       frames_comparator_(clock, cpu_measurer_, options) {
   RTC_CHECK(metrics_logger_);
 }
@@ -241,13 +242,16 @@
             it->second.GetStatsForPeer(i));
       }
 
+      frames_storage_.Remove(it->second.id());
       captured_frames_in_flight_.erase(it);
     }
     captured_frames_in_flight_.emplace(
-        frame_id, FrameInFlight(stream_index, frame, captured_time,
+        frame_id, FrameInFlight(stream_index, frame_id, captured_time,
                                 std::move(frame_receivers_indexes)));
-    // Set frame id on local copy of the frame
-    captured_frames_in_flight_.at(frame_id).SetFrameId(frame_id);
+    // Store local copy of the frame with frame_id set.
+    VideoFrame local_frame(frame);
+    local_frame.set_id(frame_id);
+    frames_storage_.Add(std::move(local_frame), captured_time);
 
     // Update history stream<->frame mapping
     for (auto it = stream_to_frame_id_history_.begin();
@@ -257,20 +261,6 @@
     stream_to_frame_id_history_[stream_index].insert(frame_id);
     stream_to_frame_id_full_history_[stream_index].push_back(frame_id);
 
-    // If state has too many frames that are in flight => remove the oldest
-    // queued frame in order to avoid to use too much memory.
-    if (state->GetAliveFramesCount() >
-        options_.max_frames_in_flight_per_stream_count) {
-      uint16_t frame_id_to_remove = state->MarkNextAliveFrameAsDead();
-      auto it = captured_frames_in_flight_.find(frame_id_to_remove);
-      RTC_CHECK(it != captured_frames_in_flight_.end())
-          << "Frame with ID " << frame_id_to_remove
-          << " is expected to be in flight, but hasn't been found in "
-          << "|captured_frames_in_flight_|";
-      bool is_removed = it->second.RemoveFrame();
-      RTC_DCHECK(is_removed)
-          << "Invalid stream state: alive frame is removed already";
-    }
     if (options_.report_infra_metrics) {
       analyzer_stats_.on_frame_captured_processing_time_ms.AddSample(
           (Now() - captured_time).ms<double>());
@@ -518,7 +508,7 @@
 
   // Find corresponding captured frame.
   FrameInFlight* frame_in_flight = &frame_it->second;
-  absl::optional<VideoFrame> captured_frame = frame_in_flight->frame();
+  absl::optional<VideoFrame> captured_frame = frames_storage_.Get(frame.id());
 
   const size_t stream_index = frame_in_flight->stream();
   StreamState* state = &stream_states_.at(stream_index);
@@ -566,6 +556,7 @@
       frame_in_flight->GetStatsForPeer(peer_index));
 
   if (frame_it->second.HaveAllPeersReceived()) {
+    frames_storage_.Remove(frame_it->second.id());
     captured_frames_in_flight_.erase(frame_it);
   }
 
@@ -720,6 +711,7 @@
     // is no FrameInFlight for the received encoded image.
     if (frame_in_flight.HasEncodedTime() &&
         frame_in_flight.HaveAllPeersReceived()) {
+      frames_storage_.Remove(frame_in_flight.id());
       it = captured_frames_in_flight_.erase(it);
     } else {
       it++;
@@ -1049,6 +1041,7 @@
     }
 
     if (next_frame_it->second.HaveAllPeersReceived()) {
+      frames_storage_.Remove(next_frame_it->second.id());
       captured_frames_in_flight_.erase(next_frame_it);
     }
   }
diff --git a/test/pc/e2e/analyzer/video/default_video_quality_analyzer.h b/test/pc/e2e/analyzer/video/default_video_quality_analyzer.h
index c4bf324..34b1fc8 100644
--- a/test/pc/e2e/analyzer/video/default_video_quality_analyzer.h
+++ b/test/pc/e2e/analyzer/video/default_video_quality_analyzer.h
@@ -36,6 +36,7 @@
 #include "test/pc/e2e/analyzer/video/default_video_quality_analyzer_internal_shared_objects.h"
 #include "test/pc/e2e/analyzer/video/default_video_quality_analyzer_shared_objects.h"
 #include "test/pc/e2e/analyzer/video/default_video_quality_analyzer_stream_state.h"
+#include "test/pc/e2e/analyzer/video/dvqa/frames_storage.h"
 #include "test/pc/e2e/analyzer/video/names_collection.h"
 
 namespace webrtc {
@@ -165,6 +166,7 @@
   // Mapping from stream label to unique size_t value to use in stats and avoid
   // extra string copying.
   NamesCollection streams_ RTC_GUARDED_BY(mutex_);
+  FramesStorage frames_storage_ RTC_GUARDED_BY(mutex_);
   // Frames that were captured by all streams and still aren't rendered on
   // receivers or deemed dropped. Frame with id X can be removed from this map
   // if:
diff --git a/test/pc/e2e/analyzer/video/default_video_quality_analyzer_frame_in_flight.cc b/test/pc/e2e/analyzer/video/default_video_quality_analyzer_frame_in_flight.cc
index bee5638..b0ed041 100644
--- a/test/pc/e2e/analyzer/video/default_video_quality_analyzer_frame_in_flight.cc
+++ b/test/pc/e2e/analyzer/video/default_video_quality_analyzer_frame_in_flight.cc
@@ -37,29 +37,14 @@
 }  // namespace
 
 FrameInFlight::FrameInFlight(size_t stream,
-                             VideoFrame frame,
+                             uint16_t frame_id,
                              Timestamp captured_time,
                              std::set<size_t> expected_receivers)
     : stream_(stream),
       expected_receivers_(std::move(expected_receivers)),
-      frame_(std::move(frame)),
+      frame_id_(frame_id),
       captured_time_(captured_time) {}
 
-bool FrameInFlight::RemoveFrame() {
-  if (!frame_) {
-    return false;
-  }
-  frame_ = absl::nullopt;
-  return true;
-}
-
-void FrameInFlight::SetFrameId(uint16_t id) {
-  if (frame_) {
-    frame_->set_id(id);
-  }
-  frame_id_ = id;
-}
-
 std::vector<size_t> FrameInFlight::GetPeersWhichDidntReceive() const {
   std::vector<size_t> out;
   for (size_t peer : expected_receivers_) {
diff --git a/test/pc/e2e/analyzer/video/default_video_quality_analyzer_frame_in_flight.h b/test/pc/e2e/analyzer/video/default_video_quality_analyzer_frame_in_flight.h
index 8322eb7..06552c7 100644
--- a/test/pc/e2e/analyzer/video/default_video_quality_analyzer_frame_in_flight.h
+++ b/test/pc/e2e/analyzer/video/default_video_quality_analyzer_frame_in_flight.h
@@ -66,22 +66,16 @@
 class FrameInFlight {
  public:
   FrameInFlight(size_t stream,
-                VideoFrame frame,
+                uint16_t frame_id,
                 Timestamp captured_time,
                 std::set<size_t> expected_receivers);
 
   size_t stream() const { return stream_; }
-  // Returns internal copy of source `VideoFrame` or `absl::nullopt` if it was
-  // removed before.
-  const absl::optional<VideoFrame>& frame() const { return frame_; }
+
+  uint16_t id() const { return frame_id_; }
 
   Timestamp captured_time() const { return captured_time_; }
 
-  // Removes internal copy of the source `VideoFrame` to free up extra memory.
-  // Returns was frame removed or not.
-  bool RemoveFrame();
-  void SetFrameId(uint16_t id);
-
   void AddExpectedReceiver(size_t peer) { expected_receivers_.insert(peer); }
 
   void RemoveExpectedReceiver(size_t peer) { expected_receivers_.erase(peer); }
@@ -162,7 +156,6 @@
   // any peer or can be safely deleted. It is responsibility of the user of this
   // object to decide when it should be deleted.
   std::set<size_t> expected_receivers_;
-  absl::optional<VideoFrame> frame_;
   // Store frame id separately because `frame_` can be removed when we have too
   // much memory consuption.
   uint16_t frame_id_ = VideoFrame::kNotSetId;
diff --git a/test/pc/e2e/analyzer/video/default_video_quality_analyzer_frames_comparator_test.cc b/test/pc/e2e/analyzer/video/default_video_quality_analyzer_frames_comparator_test.cc
index d6732e1..0024e79 100644
--- a/test/pc/e2e/analyzer/video/default_video_quality_analyzer_frames_comparator_test.cc
+++ b/test/pc/e2e/analyzer/video/default_video_quality_analyzer_frames_comparator_test.cc
@@ -36,14 +36,11 @@
 
 using StatsSample = ::webrtc::SamplesStatsCounter::StatsSample;
 
-constexpr int kMaxFramesInFlightPerStream = 10;
-
 DefaultVideoQualityAnalyzerOptions AnalyzerOptionsForTest() {
   DefaultVideoQualityAnalyzerOptions options;
   options.compute_psnr = false;
   options.compute_ssim = false;
   options.adjust_cropping_before_comparing_frames = false;
-  options.max_frames_in_flight_per_stream_count = kMaxFramesInFlightPerStream;
   return options;
 }
 
diff --git a/test/pc/e2e/analyzer/video/default_video_quality_analyzer_metric_names_test.cc b/test/pc/e2e/analyzer/video/default_video_quality_analyzer_metric_names_test.cc
index f5029ac..072c2ef 100644
--- a/test/pc/e2e/analyzer/video/default_video_quality_analyzer_metric_names_test.cc
+++ b/test/pc/e2e/analyzer/video/default_video_quality_analyzer_metric_names_test.cc
@@ -43,7 +43,6 @@
 using ::webrtc::test::Unit;
 
 constexpr int kAnalyzerMaxThreadsCount = 1;
-constexpr int kMaxFramesInFlightPerStream = 10;
 constexpr int kFrameWidth = 320;
 constexpr int kFrameHeight = 240;
 
@@ -52,7 +51,6 @@
   options.compute_psnr = true;
   options.compute_ssim = true;
   options.adjust_cropping_before_comparing_frames = false;
-  options.max_frames_in_flight_per_stream_count = kMaxFramesInFlightPerStream;
   options.report_detailed_frame_stats = true;
   return options;
 }
diff --git a/test/pc/e2e/analyzer/video/default_video_quality_analyzer_shared_objects.h b/test/pc/e2e/analyzer/video/default_video_quality_analyzer_shared_objects.h
index 175f777..17e7e0e 100644
--- a/test/pc/e2e/analyzer/video/default_video_quality_analyzer_shared_objects.h
+++ b/test/pc/e2e/analyzer/video/default_video_quality_analyzer_shared_objects.h
@@ -22,15 +22,14 @@
 
 #include "absl/types/optional.h"
 #include "api/numerics/samples_stats_counter.h"
+#include "api/units/time_delta.h"
 #include "api/units/timestamp.h"
 #include "rtc_base/strings/string_builder.h"
 
 namespace webrtc {
 
 // WebRTC will request a key frame after 3 seconds if no frames were received.
-// We assume max frame rate ~60 fps, so 270 frames will cover max freeze without
-// key frame request.
-constexpr size_t kDefaultMaxFramesInFlightPerStream = 270;
+constexpr TimeDelta kDefaultMaxFramesStorageDuration = TimeDelta::Seconds(3);
 
 class SamplesRateCounter {
  public:
@@ -270,11 +269,9 @@
   // significantly slows down the comparison, so turn it on only when it is
   // needed.
   bool adjust_cropping_before_comparing_frames = false;
-  // Amount of frames that are queued in the DefaultVideoQualityAnalyzer from
-  // the point they were captured to the point they were rendered on all
-  // receivers per stream.
-  size_t max_frames_in_flight_per_stream_count =
-      kDefaultMaxFramesInFlightPerStream;
+  // Amount of time for which DefaultVideoQualityAnalyzer will store frames
+  // which were captured but not yet rendered on all receivers per stream.
+  TimeDelta max_frames_storage_duration = kDefaultMaxFramesStorageDuration;
   // If true, the analyzer will expect peers to receive their own video streams.
   bool enable_receive_own_stream = false;
 };
diff --git a/test/pc/e2e/analyzer/video/default_video_quality_analyzer_stream_state.cc b/test/pc/e2e/analyzer/video/default_video_quality_analyzer_stream_state.cc
index eee69a7..bcdf16d 100644
--- a/test/pc/e2e/analyzer/video/default_video_quality_analyzer_stream_state.cc
+++ b/test/pc/e2e/analyzer/video/default_video_quality_analyzer_stream_state.cc
@@ -101,13 +101,6 @@
   return &it->second;
 }
 
-uint16_t StreamState::MarkNextAliveFrameAsDead() {
-  absl::optional<uint16_t> frame_id =
-      frame_ids_.PopFront(kAliveFramesQueueIndex);
-  RTC_DCHECK(frame_id.has_value());
-  return frame_id.value();
-}
-
 void StreamState::SetLastRenderedFrameTime(size_t peer, Timestamp time) {
   auto it = last_rendered_frame_time_.find(peer);
   if (it == last_rendered_frame_time_.end()) {
diff --git a/test/pc/e2e/analyzer/video/default_video_quality_analyzer_stream_state.h b/test/pc/e2e/analyzer/video/default_video_quality_analyzer_stream_state.h
index f0dc4cd..190432c 100644
--- a/test/pc/e2e/analyzer/video/default_video_quality_analyzer_stream_state.h
+++ b/test/pc/e2e/analyzer/video/default_video_quality_analyzer_stream_state.h
@@ -69,8 +69,6 @@
   size_t GetAliveFramesCount() const {
     return frame_ids_.size(kAliveFramesQueueIndex);
   }
-  // Returns frame id of the frame which was marked as dead.
-  uint16_t MarkNextAliveFrameAsDead();
 
   void SetLastRenderedFrameTime(size_t peer, Timestamp time);
   absl::optional<Timestamp> last_rendered_frame_time(size_t peer) const;
diff --git a/test/pc/e2e/analyzer/video/default_video_quality_analyzer_stream_state_test.cc b/test/pc/e2e/analyzer/video/default_video_quality_analyzer_stream_state_test.cc
index 9c4d584..1b2c59b 100644
--- a/test/pc/e2e/analyzer/video/default_video_quality_analyzer_stream_state_test.cc
+++ b/test/pc/e2e/analyzer/video/default_video_quality_analyzer_stream_state_test.cc
@@ -95,33 +95,5 @@
   EXPECT_EQ(state.GetAliveFramesCount(), 1lu);
 }
 
-TEST(StreamStateTest, MarkNextAliveFrameAsDeadDecreseAliveFramesCount) {
-  StreamState state(/*sender=*/0,
-                    /*receivers=*/std::set<size_t>{1, 2}, Timestamp::Seconds(1),
-                    Clock::GetRealTimeClock());
-  state.PushBack(/*frame_id=*/1);
-  state.PushBack(/*frame_id=*/2);
-
-  EXPECT_EQ(state.GetAliveFramesCount(), 2lu);
-
-  state.MarkNextAliveFrameAsDead();
-
-  EXPECT_EQ(state.GetAliveFramesCount(), 1lu);
-}
-
-TEST(StreamStateTest, MarkNextAliveFrameAsDeadDoesntAffectFrontFrameForPeer) {
-  StreamState state(/*sender=*/0,
-                    /*receivers=*/std::set<size_t>{1, 2}, Timestamp::Seconds(1),
-                    Clock::GetRealTimeClock());
-  state.PushBack(/*frame_id=*/1);
-  state.PushBack(/*frame_id=*/2);
-
-  EXPECT_EQ(state.Front(/*peer=*/1), 1);
-
-  state.MarkNextAliveFrameAsDead();
-
-  EXPECT_EQ(state.Front(/*peer=*/1), 1);
-}
-
 }  // namespace
 }  // namespace webrtc
diff --git a/test/pc/e2e/analyzer/video/default_video_quality_analyzer_test.cc b/test/pc/e2e/analyzer/video/default_video_quality_analyzer_test.cc
index 6a459c5..534e56c 100644
--- a/test/pc/e2e/analyzer/video/default_video_quality_analyzer_test.cc
+++ b/test/pc/e2e/analyzer/video/default_video_quality_analyzer_test.cc
@@ -47,7 +47,7 @@
 using StatsSample = ::webrtc::SamplesStatsCounter::StatsSample;
 
 constexpr int kAnalyzerMaxThreadsCount = 1;
-constexpr int kMaxFramesInFlightPerStream = 10;
+constexpr TimeDelta kMaxFramesInFlightStorageDuration = TimeDelta::Seconds(3);
 constexpr int kFrameWidth = 320;
 constexpr int kFrameHeight = 240;
 constexpr double kMaxSsim = 1;
@@ -60,7 +60,7 @@
   options.compute_psnr = false;
   options.compute_ssim = false;
   options.adjust_cropping_before_comparing_frames = false;
-  options.max_frames_in_flight_per_stream_count = kMaxFramesInFlightPerStream;
+  options.max_frames_storage_duration = kMaxFramesInFlightStorageDuration;
   return options;
 }
 
@@ -204,196 +204,6 @@
   }
 }
 
-TEST(DefaultVideoQualityAnalyzerTest,
-     MemoryOverloadedAndThenAllFramesReceived) {
-  std::unique_ptr<test::FrameGeneratorInterface> frame_generator =
-      test::CreateSquareFrameGenerator(kFrameWidth, kFrameHeight,
-                                       /*type=*/absl::nullopt,
-                                       /*num_squares=*/absl::nullopt);
-
-  DefaultVideoQualityAnalyzer analyzer(Clock::GetRealTimeClock(),
-                                       test::GetGlobalMetricsLogger(),
-                                       AnalyzerOptionsForTest());
-  analyzer.Start("test_case",
-                 std::vector<std::string>{kSenderPeerName, kReceiverPeerName},
-                 kAnalyzerMaxThreadsCount);
-
-  std::map<uint16_t, VideoFrame> captured_frames;
-  std::vector<uint16_t> frames_order;
-  for (int i = 0; i < kMaxFramesInFlightPerStream * 2; ++i) {
-    VideoFrame frame = NextFrame(frame_generator.get(), i);
-    frame.set_id(
-        analyzer.OnFrameCaptured(kSenderPeerName, kStreamLabel, frame));
-    frames_order.push_back(frame.id());
-    captured_frames.insert({frame.id(), frame});
-    analyzer.OnFramePreEncode(kSenderPeerName, frame);
-    analyzer.OnFrameEncoded(kSenderPeerName, frame.id(), FakeEncode(frame),
-                            VideoQualityAnalyzerInterface::EncoderStats(),
-                            false);
-  }
-
-  for (const uint16_t& frame_id : frames_order) {
-    VideoFrame received_frame = DeepCopy(captured_frames.at(frame_id));
-    analyzer.OnFramePreDecode(kReceiverPeerName, received_frame.id(),
-                              FakeEncode(received_frame));
-    analyzer.OnFrameDecoded(kReceiverPeerName, received_frame,
-                            VideoQualityAnalyzerInterface::DecoderStats());
-    analyzer.OnFrameRendered(kReceiverPeerName, received_frame);
-  }
-
-  // Give analyzer some time to process frames on async thread. The computations
-  // have to be fast (heavy metrics are disabled!), so if doesn't fit 100ms it
-  // means we have an issue!
-  SleepMs(100);
-  analyzer.Stop();
-
-  AnalyzerStats stats = analyzer.GetAnalyzerStats();
-  EXPECT_EQ(stats.memory_overloaded_comparisons_done,
-            kMaxFramesInFlightPerStream);
-  EXPECT_EQ(stats.comparisons_done, kMaxFramesInFlightPerStream * 2);
-  FrameCounters frame_counters = analyzer.GetGlobalCounters();
-  EXPECT_EQ(frame_counters.captured, kMaxFramesInFlightPerStream * 2);
-  EXPECT_EQ(frame_counters.rendered, kMaxFramesInFlightPerStream * 2);
-  EXPECT_EQ(frame_counters.dropped, 0);
-}
-
-TEST(DefaultVideoQualityAnalyzerTest,
-     FillMaxMemoryReceiveAllMemoryOverloadedAndThenAllFramesReceived) {
-  std::unique_ptr<test::FrameGeneratorInterface> frame_generator =
-      test::CreateSquareFrameGenerator(kFrameWidth, kFrameHeight,
-                                       /*type=*/absl::nullopt,
-                                       /*num_squares=*/absl::nullopt);
-
-  DefaultVideoQualityAnalyzer analyzer(Clock::GetRealTimeClock(),
-                                       test::GetGlobalMetricsLogger(),
-                                       AnalyzerOptionsForTest());
-  analyzer.Start("test_case",
-                 std::vector<std::string>{kSenderPeerName, kReceiverPeerName},
-                 kAnalyzerMaxThreadsCount);
-
-  std::map<uint16_t, VideoFrame> captured_frames;
-  std::vector<uint16_t> frames_order;
-  // Feel analyzer's memory up to limit
-  for (int i = 0; i < kMaxFramesInFlightPerStream; ++i) {
-    VideoFrame frame = NextFrame(frame_generator.get(), i);
-    frame.set_id(
-        analyzer.OnFrameCaptured(kSenderPeerName, kStreamLabel, frame));
-    frames_order.push_back(frame.id());
-    captured_frames.insert({frame.id(), frame});
-    analyzer.OnFramePreEncode(kSenderPeerName, frame);
-    analyzer.OnFrameEncoded(kSenderPeerName, frame.id(), FakeEncode(frame),
-                            VideoQualityAnalyzerInterface::EncoderStats(),
-                            false);
-  }
-
-  // Receive all frames.
-  for (const uint16_t& frame_id : frames_order) {
-    VideoFrame received_frame = DeepCopy(captured_frames.at(frame_id));
-    analyzer.OnFramePreDecode(kReceiverPeerName, received_frame.id(),
-                              FakeEncode(received_frame));
-    analyzer.OnFrameDecoded(kReceiverPeerName, received_frame,
-                            VideoQualityAnalyzerInterface::DecoderStats());
-    analyzer.OnFrameRendered(kReceiverPeerName, received_frame);
-  }
-  frames_order.clear();
-
-  // Give analyzer some time to process frames on async thread. The computations
-  // have to be fast (heavy metrics are disabled!), so if doesn't fit 100ms it
-  // means we have an issue!
-  SleepMs(100);
-
-  // Overload analyzer's memory up to limit
-  for (int i = 0; i < 2 * kMaxFramesInFlightPerStream; ++i) {
-    VideoFrame frame = NextFrame(frame_generator.get(), i);
-    frame.set_id(
-        analyzer.OnFrameCaptured(kSenderPeerName, kStreamLabel, frame));
-    frames_order.push_back(frame.id());
-    captured_frames.insert({frame.id(), frame});
-    analyzer.OnFramePreEncode(kSenderPeerName, frame);
-    analyzer.OnFrameEncoded(kSenderPeerName, frame.id(), FakeEncode(frame),
-                            VideoQualityAnalyzerInterface::EncoderStats(),
-                            false);
-  }
-
-  // Receive all frames.
-  for (const uint16_t& frame_id : frames_order) {
-    VideoFrame received_frame = DeepCopy(captured_frames.at(frame_id));
-    analyzer.OnFramePreDecode(kReceiverPeerName, received_frame.id(),
-                              FakeEncode(received_frame));
-    analyzer.OnFrameDecoded(kReceiverPeerName, received_frame,
-                            VideoQualityAnalyzerInterface::DecoderStats());
-    analyzer.OnFrameRendered(kReceiverPeerName, received_frame);
-  }
-
-  // Give analyzer some time to process frames on async thread. The computations
-  // have to be fast (heavy metrics are disabled!), so if doesn't fit 100ms it
-  // means we have an issue!
-  SleepMs(100);
-  analyzer.Stop();
-
-  AnalyzerStats stats = analyzer.GetAnalyzerStats();
-  EXPECT_EQ(stats.memory_overloaded_comparisons_done,
-            kMaxFramesInFlightPerStream);
-  EXPECT_EQ(stats.comparisons_done, kMaxFramesInFlightPerStream * 3);
-  FrameCounters frame_counters = analyzer.GetGlobalCounters();
-  EXPECT_EQ(frame_counters.captured, kMaxFramesInFlightPerStream * 3);
-  EXPECT_EQ(frame_counters.rendered, kMaxFramesInFlightPerStream * 3);
-  EXPECT_EQ(frame_counters.dropped, 0);
-}
-
-TEST(DefaultVideoQualityAnalyzerTest,
-     MemoryOverloadedHalfDroppedAndThenHalfFramesReceived) {
-  std::unique_ptr<test::FrameGeneratorInterface> frame_generator =
-      test::CreateSquareFrameGenerator(kFrameWidth, kFrameHeight,
-                                       /*type=*/absl::nullopt,
-                                       /*num_squares=*/absl::nullopt);
-
-  DefaultVideoQualityAnalyzer analyzer(Clock::GetRealTimeClock(),
-                                       test::GetGlobalMetricsLogger(),
-                                       AnalyzerOptionsForTest());
-  analyzer.Start("test_case",
-                 std::vector<std::string>{kSenderPeerName, kReceiverPeerName},
-                 kAnalyzerMaxThreadsCount);
-
-  std::map<uint16_t, VideoFrame> captured_frames;
-  std::vector<uint16_t> frames_order;
-  for (int i = 0; i < kMaxFramesInFlightPerStream * 2; ++i) {
-    VideoFrame frame = NextFrame(frame_generator.get(), i);
-    frame.set_id(
-        analyzer.OnFrameCaptured(kSenderPeerName, kStreamLabel, frame));
-    frames_order.push_back(frame.id());
-    captured_frames.insert({frame.id(), frame});
-    analyzer.OnFramePreEncode(kSenderPeerName, frame);
-    analyzer.OnFrameEncoded(kSenderPeerName, frame.id(), FakeEncode(frame),
-                            VideoQualityAnalyzerInterface::EncoderStats(),
-                            false);
-  }
-
-  for (size_t i = kMaxFramesInFlightPerStream; i < frames_order.size(); ++i) {
-    uint16_t frame_id = frames_order.at(i);
-    VideoFrame received_frame = DeepCopy(captured_frames.at(frame_id));
-    analyzer.OnFramePreDecode(kReceiverPeerName, received_frame.id(),
-                              FakeEncode(received_frame));
-    analyzer.OnFrameDecoded(kReceiverPeerName, received_frame,
-                            VideoQualityAnalyzerInterface::DecoderStats());
-    analyzer.OnFrameRendered(kReceiverPeerName, received_frame);
-  }
-
-  // Give analyzer some time to process frames on async thread. The computations
-  // have to be fast (heavy metrics are disabled!), so if doesn't fit 100ms it
-  // means we have an issue!
-  SleepMs(100);
-  analyzer.Stop();
-
-  AnalyzerStats stats = analyzer.GetAnalyzerStats();
-  EXPECT_EQ(stats.memory_overloaded_comparisons_done, 0);
-  EXPECT_EQ(stats.comparisons_done, kMaxFramesInFlightPerStream * 2);
-  FrameCounters frame_counters = analyzer.GetGlobalCounters();
-  EXPECT_EQ(frame_counters.captured, kMaxFramesInFlightPerStream * 2);
-  EXPECT_EQ(frame_counters.rendered, kMaxFramesInFlightPerStream);
-  EXPECT_EQ(frame_counters.dropped, kMaxFramesInFlightPerStream);
-}
-
 TEST(DefaultVideoQualityAnalyzerTest, NormalScenario) {
   std::unique_ptr<test::FrameGeneratorInterface> frame_generator =
       test::CreateSquareFrameGenerator(kFrameWidth, kFrameHeight,
@@ -409,7 +219,7 @@
 
   std::map<uint16_t, VideoFrame> captured_frames;
   std::vector<uint16_t> frames_order;
-  for (int i = 0; i < kMaxFramesInFlightPerStream; ++i) {
+  for (int i = 0; i < 10; ++i) {
     VideoFrame frame = NextFrame(frame_generator.get(), i);
     frame.set_id(
         analyzer.OnFrameCaptured(kSenderPeerName, kStreamLabel, frame));
@@ -439,7 +249,7 @@
 
   AnalyzerStats stats = analyzer.GetAnalyzerStats();
   EXPECT_EQ(stats.memory_overloaded_comparisons_done, 0);
-  EXPECT_EQ(stats.comparisons_done, kMaxFramesInFlightPerStream);
+  EXPECT_EQ(stats.comparisons_done, 10);
 
   std::vector<StatsSample> frames_in_flight_sizes =
       GetSortedSamples(stats.frames_in_flight_left_count);
@@ -447,11 +257,11 @@
       << ToString(frames_in_flight_sizes);
 
   FrameCounters frame_counters = analyzer.GetGlobalCounters();
-  EXPECT_EQ(frame_counters.captured, kMaxFramesInFlightPerStream);
-  EXPECT_EQ(frame_counters.received, kMaxFramesInFlightPerStream / 2);
-  EXPECT_EQ(frame_counters.decoded, kMaxFramesInFlightPerStream / 2);
-  EXPECT_EQ(frame_counters.rendered, kMaxFramesInFlightPerStream / 2);
-  EXPECT_EQ(frame_counters.dropped, kMaxFramesInFlightPerStream / 2);
+  EXPECT_EQ(frame_counters.captured, 10);
+  EXPECT_EQ(frame_counters.received, 5);
+  EXPECT_EQ(frame_counters.decoded, 5);
+  EXPECT_EQ(frame_counters.rendered, 5);
+  EXPECT_EQ(frame_counters.dropped, 5);
 }
 
 TEST(DefaultVideoQualityAnalyzerTest, OneFrameReceivedTwice) {
@@ -525,7 +335,7 @@
 
   std::map<uint16_t, VideoFrame> captured_frames;
   std::vector<uint16_t> frames_order;
-  for (int i = 0; i < kMaxFramesInFlightPerStream; ++i) {
+  for (int i = 0; i < 10; ++i) {
     VideoFrame frame = NextFrame(frame_generator.get(), i);
     frame.set_id(analyzer.OnFrameCaptured(kAlice, kStreamLabel, frame));
     frames_order.push_back(frame.id());
@@ -571,14 +381,14 @@
 
   AnalyzerStats analyzer_stats = analyzer.GetAnalyzerStats();
   EXPECT_EQ(analyzer_stats.memory_overloaded_comparisons_done, 0);
-  EXPECT_EQ(analyzer_stats.comparisons_done, kMaxFramesInFlightPerStream * 2);
+  EXPECT_EQ(analyzer_stats.comparisons_done, 20);
 
   FrameCounters frame_counters = analyzer.GetGlobalCounters();
-  EXPECT_EQ(frame_counters.captured, kMaxFramesInFlightPerStream);
-  EXPECT_EQ(frame_counters.received, kMaxFramesInFlightPerStream);
-  EXPECT_EQ(frame_counters.decoded, kMaxFramesInFlightPerStream);
-  EXPECT_EQ(frame_counters.rendered, kMaxFramesInFlightPerStream);
-  EXPECT_EQ(frame_counters.dropped, kMaxFramesInFlightPerStream);
+  EXPECT_EQ(frame_counters.captured, 10);
+  EXPECT_EQ(frame_counters.received, 10);
+  EXPECT_EQ(frame_counters.decoded, 10);
+  EXPECT_EQ(frame_counters.rendered, 10);
+  EXPECT_EQ(frame_counters.dropped, 10);
 
   VideoStreamsInfo streams_info = analyzer.GetKnownStreams();
   EXPECT_EQ(streams_info.GetStreams(), std::set<std::string>{kStreamLabel});
@@ -707,8 +517,6 @@
   analyzer_options.compute_psnr = true;
   analyzer_options.compute_ssim = true;
   analyzer_options.adjust_cropping_before_comparing_frames = false;
-  analyzer_options.max_frames_in_flight_per_stream_count =
-      kMaxFramesInFlightPerStream;
   DefaultVideoQualityAnalyzer analyzer(Clock::GetRealTimeClock(),
                                        test::GetGlobalMetricsLogger(),
                                        analyzer_options);
@@ -716,7 +524,7 @@
                  std::vector<std::string>{kSenderPeerName, kReceiverPeerName},
                  kAnalyzerMaxThreadsCount);
 
-  for (int i = 0; i < kMaxFramesInFlightPerStream; ++i) {
+  for (int i = 0; i < 10; ++i) {
     VideoFrame frame = NextFrame(frame_generator.get(), i);
     frame.set_id(
         analyzer.OnFrameCaptured(kSenderPeerName, kStreamLabel, frame));
@@ -741,7 +549,7 @@
 
   AnalyzerStats stats = analyzer.GetAnalyzerStats();
   EXPECT_EQ(stats.memory_overloaded_comparisons_done, 0);
-  EXPECT_EQ(stats.comparisons_done, kMaxFramesInFlightPerStream);
+  EXPECT_EQ(stats.comparisons_done, 10);
 
   std::vector<StatsSample> frames_in_flight_sizes =
       GetSortedSamples(stats.frames_in_flight_left_count);
@@ -768,8 +576,6 @@
   analyzer_options.compute_psnr = true;
   analyzer_options.compute_ssim = true;
   analyzer_options.adjust_cropping_before_comparing_frames = true;
-  analyzer_options.max_frames_in_flight_per_stream_count =
-      kMaxFramesInFlightPerStream;
   DefaultVideoQualityAnalyzer analyzer(Clock::GetRealTimeClock(),
                                        test::GetGlobalMetricsLogger(),
                                        analyzer_options);
@@ -777,7 +583,7 @@
                  std::vector<std::string>{kSenderPeerName, kReceiverPeerName},
                  kAnalyzerMaxThreadsCount);
 
-  for (int i = 0; i < kMaxFramesInFlightPerStream; ++i) {
+  for (int i = 0; i < 10; ++i) {
     VideoFrame frame = NextFrame(frame_generator.get(), i);
     frame.set_id(
         analyzer.OnFrameCaptured(kSenderPeerName, kStreamLabel, frame));
@@ -808,7 +614,7 @@
 
   AnalyzerStats stats = analyzer.GetAnalyzerStats();
   EXPECT_EQ(stats.memory_overloaded_comparisons_done, 0);
-  EXPECT_EQ(stats.comparisons_done, kMaxFramesInFlightPerStream);
+  EXPECT_EQ(stats.comparisons_done, 10);
 
   std::vector<StatsSample> frames_in_flight_sizes =
       GetSortedSamples(stats.frames_in_flight_left_count);
@@ -839,7 +645,7 @@
 
   std::map<uint16_t, VideoFrame> captured_frames;
   std::vector<uint16_t> frames_order;
-  for (int i = 0; i < kMaxFramesInFlightPerStream; ++i) {
+  for (int i = 0; i < 10; ++i) {
     VideoFrame frame = NextFrame(frame_generator.get(), i);
     frame.set_id(
         analyzer.OnFrameCaptured(kSenderPeerName, kStreamLabel, frame));
@@ -2426,5 +2232,64 @@
   // TODO(bugs.webrtc.org/14995): Assert on harmonic fps
 }
 
+TEST_F(DefaultVideoQualityAnalyzerSimulatedTimeTest,
+       MemoryOverloadedAndThenAllFramesReceived) {
+  std::unique_ptr<test::FrameGeneratorInterface> frame_generator =
+      test::CreateSquareFrameGenerator(kFrameWidth, kFrameHeight,
+                                       /*type=*/absl::nullopt,
+                                       /*num_squares=*/absl::nullopt);
+
+  DefaultVideoQualityAnalyzer analyzer(
+      GetClock(), test::GetGlobalMetricsLogger(), AnalyzerOptionsForTest());
+  analyzer.Start("test_case",
+                 std::vector<std::string>{kSenderPeerName, kReceiverPeerName},
+                 kAnalyzerMaxThreadsCount);
+
+  std::map<uint16_t, VideoFrame> captured_frames;
+  std::vector<uint16_t> frames_order;
+  for (int i = 0; i < 5; ++i) {
+    VideoFrame frame = NextFrame(frame_generator.get(), i);
+    frame.set_id(
+        analyzer.OnFrameCaptured(kSenderPeerName, kStreamLabel, frame));
+    frames_order.push_back(frame.id());
+    captured_frames.insert({frame.id(), frame});
+    analyzer.OnFramePreEncode(kSenderPeerName, frame);
+    analyzer.OnFrameEncoded(kSenderPeerName, frame.id(), FakeEncode(frame),
+                            VideoQualityAnalyzerInterface::EncoderStats(),
+                            false);
+  }
+  AdvanceTime(kMaxFramesInFlightStorageDuration + TimeDelta::Millis(1));
+  for (int i = 0; i < 5; ++i) {
+    VideoFrame frame = NextFrame(frame_generator.get(), i);
+    frame.set_id(
+        analyzer.OnFrameCaptured(kSenderPeerName, kStreamLabel, frame));
+    frames_order.push_back(frame.id());
+    captured_frames.insert({frame.id(), frame});
+    analyzer.OnFramePreEncode(kSenderPeerName, frame);
+    analyzer.OnFrameEncoded(kSenderPeerName, frame.id(), FakeEncode(frame),
+                            VideoQualityAnalyzerInterface::EncoderStats(),
+                            false);
+  }
+
+  for (const uint16_t& frame_id : frames_order) {
+    VideoFrame received_frame = DeepCopy(captured_frames.at(frame_id));
+    analyzer.OnFramePreDecode(kReceiverPeerName, received_frame.id(),
+                              FakeEncode(received_frame));
+    analyzer.OnFrameDecoded(kReceiverPeerName, received_frame,
+                            VideoQualityAnalyzerInterface::DecoderStats());
+    analyzer.OnFrameRendered(kReceiverPeerName, received_frame);
+  }
+
+  analyzer.Stop();
+
+  AnalyzerStats stats = analyzer.GetAnalyzerStats();
+  EXPECT_EQ(stats.memory_overloaded_comparisons_done, 5);
+  EXPECT_EQ(stats.comparisons_done, 10);
+  FrameCounters frame_counters = analyzer.GetGlobalCounters();
+  EXPECT_EQ(frame_counters.captured, 10);
+  EXPECT_EQ(frame_counters.rendered, 10);
+  EXPECT_EQ(frame_counters.dropped, 0);
+}
+
 }  // namespace
 }  // namespace webrtc
diff --git a/test/pc/e2e/analyzer/video/dvqa/BUILD.gn b/test/pc/e2e/analyzer/video/dvqa/BUILD.gn
index 3aa25ee..5f525ae 100644
--- a/test/pc/e2e/analyzer/video/dvqa/BUILD.gn
+++ b/test/pc/e2e/analyzer/video/dvqa/BUILD.gn
@@ -12,14 +12,20 @@
   group("dvqa") {
     testonly = true
 
-    deps = [ ":pausable_state" ]
+    deps = [
+      ":frames_storage",
+      ":pausable_state",
+    ]
   }
 
   if (rtc_include_tests) {
     group("dvqa_unittests") {
       testonly = true
 
-      deps = [ ":pausable_state_test" ]
+      deps = [
+        ":frames_storage_test",
+        ":pausable_state_test",
+      ]
     }
   }
 }
@@ -48,6 +54,28 @@
   ]
 }
 
+rtc_library("frames_storage") {
+  visibility = [
+    ":dvqa",
+    ":frames_storage_test",
+    "..:default_video_quality_analyzer",
+  ]
+
+  testonly = true
+  sources = [
+    "frames_storage.cc",
+    "frames_storage.h",
+  ]
+
+  deps = [
+    "../../../../../../api/units:time_delta",
+    "../../../../../../api/units:timestamp",
+    "../../../../../../api/video:video_frame",
+    "../../../../../../system_wrappers",
+  ]
+  absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ]
+}
+
 if (rtc_include_tests) {
   rtc_library("pausable_state_test") {
     testonly = true
@@ -62,4 +90,20 @@
       "../../../../../time_controller",
     ]
   }
+
+  rtc_library("frames_storage_test") {
+    testonly = true
+    sources = [ "frames_storage_test.cc" ]
+    deps = [
+      ":frames_storage",
+      "../../../../..:test_support",
+      "../../../../../../api:scoped_refptr",
+      "../../../../../../api:time_controller",
+      "../../../../../../api/units:time_delta",
+      "../../../../../../api/units:timestamp",
+      "../../../../../../api/video:video_frame",
+      "../../../../../../system_wrappers",
+      "../../../../../time_controller",
+    ]
+  }
 }
diff --git a/test/pc/e2e/analyzer/video/dvqa/frames_storage.cc b/test/pc/e2e/analyzer/video/dvqa/frames_storage.cc
new file mode 100644
index 0000000..2a70468
--- /dev/null
+++ b/test/pc/e2e/analyzer/video/dvqa/frames_storage.cc
@@ -0,0 +1,129 @@
+/*
+ *  Copyright (c) 2023 The WebRTC project authors. All Rights Reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "test/pc/e2e/analyzer/video/dvqa/frames_storage.h"
+
+#include <cstdint>
+#include <utility>
+
+#include "absl/types/optional.h"
+#include "api/units/timestamp.h"
+#include "api/video/video_frame.h"
+
+namespace webrtc {
+
+void FramesStorage::Add(const VideoFrame& frame, Timestamp captured_time) {
+  heap_.push_back(HeapNode{.frame = frame, .captured_time = captured_time});
+  frame_id_index_[frame.id()] = heap_.size() - 1;
+  Heapify(heap_.size() - 1);
+  RemoveTooOldFrames();
+}
+
+absl::optional<VideoFrame> FramesStorage::Get(uint16_t frame_id) {
+  auto it = frame_id_index_.find(frame_id);
+  if (it == frame_id_index_.end()) {
+    return absl::nullopt;
+  }
+
+  return heap_[it->second].frame;
+}
+
+void FramesStorage::Remove(uint16_t frame_id) {
+  RemoveInternal(frame_id);
+  RemoveTooOldFrames();
+}
+
+void FramesStorage::RemoveInternal(uint16_t frame_id) {
+  auto it = frame_id_index_.find(frame_id);
+  if (it == frame_id_index_.end()) {
+    return;
+  }
+
+  size_t index = it->second;
+  frame_id_index_.erase(it);
+
+  // If it's not the last element in the heap, swap the last element in the heap
+  // with element to remove.
+  if (index != heap_.size() - 1) {
+    heap_[index] = std::move(heap_[heap_.size() - 1]);
+    frame_id_index_[heap_[index].frame.id()] = index;
+  }
+
+  // Remove the last element.
+  heap_.pop_back();
+
+  if (index < heap_.size()) {
+    Heapify(index);
+  }
+}
+
+void FramesStorage::Heapify(size_t index) {
+  HeapifyUp(index);
+  HeapifyDown(index);
+}
+
+void FramesStorage::HeapifyUp(size_t index) {
+  if (index == 0) {
+    return;
+  }
+  RTC_CHECK_LT(index, heap_.size());
+  size_t parent = index / 2;
+  if (heap_[parent].captured_time <= heap_[index].captured_time) {
+    return;
+  }
+  HeapNode tmp = std::move(heap_[index]);
+  heap_[index] = std::move(heap_[parent]);
+  heap_[parent] = std::move(tmp);
+
+  frame_id_index_[heap_[index].frame.id()] = index;
+  frame_id_index_[heap_[parent].frame.id()] = parent;
+
+  HeapifyUp(parent);
+}
+
+void FramesStorage::HeapifyDown(size_t index) {
+  RTC_CHECK_GE(index, 0);
+  RTC_CHECK_LT(index, heap_.size());
+
+  size_t left_child = 2 * index;
+  size_t right_child = 2 * index + 1;
+
+  if (left_child >= heap_.size()) {
+    return;
+  }
+  size_t smallest_child = left_child;
+  if (right_child < heap_.size() &&
+      heap_[right_child].captured_time < heap_[left_child].captured_time) {
+    smallest_child = right_child;
+  }
+
+  if (heap_[index].captured_time <= heap_[smallest_child].captured_time) {
+    return;
+  }
+
+  HeapNode tmp = std::move(heap_[index]);
+  heap_[index] = std::move(heap_[smallest_child]);
+  heap_[smallest_child] = std::move(tmp);
+
+  frame_id_index_[heap_[index].frame.id()] = index;
+  frame_id_index_[heap_[smallest_child].frame.id()] = smallest_child;
+
+  HeapifyDown(smallest_child);
+}
+
+void FramesStorage::RemoveTooOldFrames() {
+  Timestamp now = clock_->CurrentTime();
+  while (!heap_.empty() &&
+         (heap_[0].captured_time + max_storage_duration_) < now) {
+    RemoveInternal(heap_[0].frame.id());
+  }
+}
+
+}  // namespace webrtc
diff --git a/test/pc/e2e/analyzer/video/dvqa/frames_storage.h b/test/pc/e2e/analyzer/video/dvqa/frames_storage.h
new file mode 100644
index 0000000..d3c6bd4
--- /dev/null
+++ b/test/pc/e2e/analyzer/video/dvqa/frames_storage.h
@@ -0,0 +1,77 @@
+/*
+ *  Copyright (c) 2023 The WebRTC project authors. All Rights Reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+
+#ifndef TEST_PC_E2E_ANALYZER_VIDEO_DVQA_FRAMES_STORAGE_H_
+#define TEST_PC_E2E_ANALYZER_VIDEO_DVQA_FRAMES_STORAGE_H_
+
+#include <cstdint>
+#include <unordered_map>
+#include <vector>
+
+#include "absl/types/optional.h"
+#include "api/units/time_delta.h"
+#include "api/units/timestamp.h"
+#include "api/video/video_frame.h"
+#include "system_wrappers/include/clock.h"
+
+namespace webrtc {
+
+// Stores video frames for DefaultVideoQualityAnalyzer. Frames are cleaned up
+// when the time elapsed from their capture time exceeds `max_storage_duration`.
+class FramesStorage {
+ public:
+  FramesStorage(TimeDelta max_storage_duration, Clock* clock)
+      : max_storage_duration_(max_storage_duration), clock_(clock) {}
+  FramesStorage(const FramesStorage&) = delete;
+  FramesStorage& operator=(const FramesStorage&) = delete;
+  FramesStorage(FramesStorage&&) = default;
+  FramesStorage& operator=(FramesStorage&&) = default;
+
+  // Adds frame to the storage. It is guaranteed to be stored at least
+  // `max_storage_duration` from `captured_time`.
+  //
+  // Complexity: O(log(n))
+  void Add(const VideoFrame& frame, Timestamp captured_time);
+
+  // Complexity: O(1)
+  absl::optional<VideoFrame> Get(uint16_t frame_id);
+
+  // Removes the frame identified by `frame_id` from the storage. No error
+  // happens in case there isn't a frame identified by `frame_id`.
+  //
+  // Complexity: O(log(n))
+  void Remove(uint16_t frame_id);
+
+ private:
+  struct HeapNode {
+    VideoFrame frame;
+    Timestamp captured_time;
+  };
+
+  void RemoveInternal(uint16_t frame_id);
+
+  void Heapify(size_t index);
+  void HeapifyUp(size_t index);
+  void HeapifyDown(size_t index);
+
+  // Complexity: O(#(of too old frames) * log(n))
+  void RemoveTooOldFrames();
+
+  TimeDelta max_storage_duration_;
+  Clock* clock_;
+
+  std::unordered_map<uint16_t, size_t> frame_id_index_;
+  // Min-heap based on HeapNode::captured_time.
+  std::vector<HeapNode> heap_;
+};
+
+}  // namespace webrtc
+
+#endif  // TEST_PC_E2E_ANALYZER_VIDEO_DVQA_FRAMES_STORAGE_H_
diff --git a/test/pc/e2e/analyzer/video/dvqa/frames_storage_test.cc b/test/pc/e2e/analyzer/video/dvqa/frames_storage_test.cc
new file mode 100644
index 0000000..88a172e
--- /dev/null
+++ b/test/pc/e2e/analyzer/video/dvqa/frames_storage_test.cc
@@ -0,0 +1,158 @@
+/*
+ *  Copyright (c) 2023 The WebRTC project authors. All Rights Reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "test/pc/e2e/analyzer/video/dvqa/frames_storage.h"
+
+#include <cstdint>
+#include <memory>
+
+#include "api/scoped_refptr.h"
+#include "api/test/time_controller.h"
+#include "api/units/time_delta.h"
+#include "api/units/timestamp.h"
+#include "api/video/i420_buffer.h"
+#include "api/video/video_frame.h"
+#include "system_wrappers/include/clock.h"
+#include "test/gtest.h"
+#include "test/time_controller/simulated_time_controller.h"
+
+namespace webrtc {
+namespace {
+
+VideoFrame Create2x2Frame(uint16_t frame_id) {
+  rtc::scoped_refptr<I420Buffer> buffer =
+      I420Buffer::Create(/*width=*/2, /*height=*/2);
+  memset(buffer->MutableDataY(), static_cast<uint8_t>(frame_id), 4);
+  memset(buffer->MutableDataU(), static_cast<uint8_t>(frame_id + 1), 1);
+  memset(buffer->MutableDataV(), static_cast<uint8_t>(frame_id + 2), 1);
+  return VideoFrame::Builder()
+      .set_id(frame_id)
+      .set_video_frame_buffer(buffer)
+      .set_timestamp_us(1)
+      .build();
+}
+
+void AssertHasFrame(FramesStorage& storage, uint16_t frame_id) {
+  absl::optional<VideoFrame> frame = storage.Get(frame_id);
+  ASSERT_TRUE(frame.has_value()) << "Frame " << frame_id << " wasn't found";
+  EXPECT_EQ(frame->id(), frame_id);
+}
+
+class FramesStorageTest : public testing::Test {
+ protected:
+  FramesStorageTest()
+      : time_controller_(std::make_unique<GlobalSimulatedTimeController>(
+            Timestamp::Seconds(1000))) {}
+
+  Timestamp NowPlusSeconds(int seconds) {
+    return time_controller_->GetClock()->CurrentTime() +
+           TimeDelta::Seconds(seconds);
+  }
+
+  Clock* GetClock() { return time_controller_->GetClock(); }
+
+  void AdvanceTime(TimeDelta time) { time_controller_->AdvanceTime(time); }
+
+ private:
+  std::unique_ptr<TimeController> time_controller_;
+};
+
+TEST_F(FramesStorageTest, CanGetAllAddedFrames) {
+  VideoFrame frame1 = Create2x2Frame(/*frame_id=*/1);
+  VideoFrame frame2 = Create2x2Frame(/*frame_id=*/2);
+  VideoFrame frame3 = Create2x2Frame(/*frame_id=*/3);
+  VideoFrame frame4 = Create2x2Frame(/*frame_id=*/4);
+  VideoFrame frame5 = Create2x2Frame(/*frame_id=*/5);
+
+  FramesStorage storage(TimeDelta::Seconds(1), GetClock());
+
+  storage.Add(frame1, /*captured_time=*/NowPlusSeconds(1));
+  storage.Add(frame2, /*captured_time=*/NowPlusSeconds(2));
+  storage.Add(frame3, /*captured_time=*/NowPlusSeconds(3));
+  storage.Add(frame4, /*captured_time=*/NowPlusSeconds(2));
+  storage.Add(frame5, /*captured_time=*/NowPlusSeconds(1));
+
+  AssertHasFrame(storage, /*frame_id=*/1);
+  AssertHasFrame(storage, /*frame_id=*/2);
+  AssertHasFrame(storage, /*frame_id=*/3);
+  AssertHasFrame(storage, /*frame_id=*/4);
+  AssertHasFrame(storage, /*frame_id=*/5);
+}
+
+TEST_F(FramesStorageTest, CanGetRemainingAddedFramesAfterRemove) {
+  VideoFrame frame1 = Create2x2Frame(/*frame_id=*/1);
+  VideoFrame frame2 = Create2x2Frame(/*frame_id=*/2);
+  VideoFrame frame3 = Create2x2Frame(/*frame_id=*/3);
+  VideoFrame frame4 = Create2x2Frame(/*frame_id=*/4);
+  VideoFrame frame5 = Create2x2Frame(/*frame_id=*/5);
+
+  FramesStorage storage(TimeDelta::Seconds(1), GetClock());
+
+  storage.Add(frame1, /*captured_time=*/NowPlusSeconds(1));
+  storage.Add(frame2, /*captured_time=*/NowPlusSeconds(2));
+  storage.Add(frame3, /*captured_time=*/NowPlusSeconds(3));
+  storage.Add(frame4, /*captured_time=*/NowPlusSeconds(2));
+  storage.Add(frame5, /*captured_time=*/NowPlusSeconds(1));
+
+  storage.Remove(frame1.id());
+  storage.Remove(frame2.id());
+  storage.Remove(frame3.id());
+
+  AssertHasFrame(storage, /*frame_id=*/4);
+  AssertHasFrame(storage, /*frame_id=*/5);
+}
+
+TEST_F(FramesStorageTest, AutoCleanupRemovesOnlyOldFrames) {
+  VideoFrame frame1 = Create2x2Frame(/*frame_id=*/1);
+  VideoFrame frame2 = Create2x2Frame(/*frame_id=*/2);
+  VideoFrame frame3 = Create2x2Frame(/*frame_id=*/3);
+  VideoFrame frame4 = Create2x2Frame(/*frame_id=*/4);
+  VideoFrame frame5 = Create2x2Frame(/*frame_id=*/5);
+  VideoFrame frame6 = Create2x2Frame(/*frame_id=*/6);
+
+  FramesStorage storage(TimeDelta::Seconds(1), GetClock());
+
+  storage.Add(frame1, /*captured_time=*/NowPlusSeconds(0));
+  storage.Add(frame2, /*captured_time=*/NowPlusSeconds(1));
+  storage.Add(frame3, /*captured_time=*/NowPlusSeconds(2));
+  storage.Add(frame4, /*captured_time=*/NowPlusSeconds(1));
+  storage.Add(frame5, /*captured_time=*/NowPlusSeconds(0));
+
+  AdvanceTime(TimeDelta::Millis(1001));
+  storage.Add(frame6, /*captured_time=*/NowPlusSeconds(3));
+
+  AssertHasFrame(storage, /*frame_id=*/2);
+  AssertHasFrame(storage, /*frame_id=*/3);
+  AssertHasFrame(storage, /*frame_id=*/4);
+  EXPECT_FALSE(storage.Get(/*frame_id=*/1).has_value());
+  EXPECT_FALSE(storage.Get(/*frame_id=*/5).has_value());
+}
+
+TEST_F(FramesStorageTest, AllFramesAutoCleaned) {
+  VideoFrame frame1 = Create2x2Frame(/*frame_id=*/1);
+  VideoFrame frame2 = Create2x2Frame(/*frame_id=*/2);
+  VideoFrame frame3 = Create2x2Frame(/*frame_id=*/3);
+
+  FramesStorage storage(TimeDelta::Seconds(1), GetClock());
+
+  storage.Add(frame1, /*captured_time=*/NowPlusSeconds(0));
+  storage.Add(frame2, /*captured_time=*/NowPlusSeconds(0));
+  storage.Add(frame3, /*captured_time=*/NowPlusSeconds(0));
+
+  AdvanceTime(TimeDelta::Millis(1001));
+  storage.Remove(/*frame_id=*/3);
+
+  EXPECT_FALSE(storage.Get(/*frame_id=*/1).has_value());
+  EXPECT_FALSE(storage.Get(/*frame_id=*/2).has_value());
+  EXPECT_FALSE(storage.Get(/*frame_id=*/3).has_value());
+}
+
+}  // namespace
+}  // namespace webrtc