[DVQA] Add support for removing peer from the StreamState

Bug: b/231397778
Change-Id: I8ce1486f91f6c84e246e043f2a4e2dd94fc29d06
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/265809
Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org>
Commit-Queue: Artem Titov <titovartem@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37220}
diff --git a/test/pc/e2e/BUILD.gn b/test/pc/e2e/BUILD.gn
index 12db7fb..2e4410d 100644
--- a/test/pc/e2e/BUILD.gn
+++ b/test/pc/e2e/BUILD.gn
@@ -35,6 +35,7 @@
 
       deps = [
         ":default_video_quality_analyzer_frames_comparator_test",
+        ":default_video_quality_analyzer_stream_state_test",
         ":default_video_quality_analyzer_test",
         ":multi_reader_queue_test",
         ":names_collection_test",
@@ -594,6 +595,19 @@
       ]
       absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ]
     }
+
+    rtc_library("default_video_quality_analyzer_stream_state_test") {
+      testonly = true
+      sources = [
+        "analyzer/video/default_video_quality_analyzer_stream_state_test.cc",
+      ]
+      deps = [
+        ":default_video_quality_analyzer_internal",
+        "../../../api/units:timestamp",
+        "../../../test:test_support",
+      ]
+      absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ]
+    }
   }
 
   rtc_library("analyzer_helper") {
@@ -726,6 +740,7 @@
     visibility = [
       ":default_video_quality_analyzer",
       ":default_video_quality_analyzer_frames_comparator_test",
+      ":default_video_quality_analyzer_stream_state_test",
       ":names_collection_test",
     ]
 
diff --git a/test/pc/e2e/analyzer/video/default_video_quality_analyzer.cc b/test/pc/e2e/analyzer/video/default_video_quality_analyzer.cc
index 5922c44..4426db4 100644
--- a/test/pc/e2e/analyzer/video/default_video_quality_analyzer.cc
+++ b/test/pc/e2e/analyzer/video/default_video_quality_analyzer.cc
@@ -193,12 +193,16 @@
       }
     }
 
+    std::set<size_t> frame_receivers_indexes = peers_->GetPresentIndexes();
+    if (!options_.enable_receive_own_stream) {
+      frame_receivers_indexes.erase(peer_index);
+    }
+
     auto state_it = stream_states_.find(stream_index);
     if (state_it == stream_states_.end()) {
       stream_states_.emplace(
           stream_index,
-          StreamState(peer_index, peers_->size(),
-                      options_.enable_receive_own_stream, captured_time));
+          StreamState(peer_index, frame_receivers_indexes, captured_time));
     }
     StreamState* state = &stream_states_.at(stream_index);
     state->PushBack(frame_id);
@@ -230,10 +234,6 @@
 
       captured_frames_in_flight_.erase(it);
     }
-    std::set<size_t> frame_receivers_indexes = peers_->GetPresentIndexes();
-    if (!options_.enable_receive_own_stream) {
-      frame_receivers_indexes.erase(peer_index);
-    }
     captured_frames_in_flight_.emplace(
         frame_id, FrameInFlight(stream_index, frame, captured_time,
                                 std::move(frame_receivers_indexes)));
@@ -470,7 +470,7 @@
 
   const size_t stream_index = frame_in_flight->stream();
   StreamState* state = &stream_states_.at(stream_index);
-  const InternalStatsKey stats_key(stream_index, state->owner(), peer_index);
+  const InternalStatsKey stats_key(stream_index, state->sender(), peer_index);
 
   // Update frames counters.
   frame_counters_.rendered++;
@@ -498,7 +498,6 @@
 
     auto dropped_frame_it = captured_frames_in_flight_.find(dropped_frame_id);
     RTC_DCHECK(dropped_frame_it != captured_frames_in_flight_.end());
-    absl::optional<VideoFrame> dropped_frame = dropped_frame_it->second.frame();
     dropped_frame_it->second.MarkDropped(peer_index);
 
     analyzer_stats_.frames_in_flight_left_count.AddSample(
@@ -591,7 +590,7 @@
   // Ensure, that frames states are handled correctly
   // (e.g. dropped frames tracking).
   for (auto& key_val : stream_states_) {
-    key_val.second.AddPeer();
+    key_val.second.AddPeer(new_peer_index);
   }
   // Register new peer for every frame in flight.
   // It is guaranteed, that no garbage FrameInFlight objects will stay in
@@ -624,11 +623,11 @@
       const size_t stream_index = state_entry.first;
       StreamState& stream_state = state_entry.second;
       for (size_t i = 0; i < peers_->size(); ++i) {
-        if (i == stream_state.owner() && !options_.enable_receive_own_stream) {
+        if (i == stream_state.sender() && !options_.enable_receive_own_stream) {
           continue;
         }
 
-        InternalStatsKey stats_key(stream_index, stream_state.owner(), i);
+        InternalStatsKey stats_key(stream_index, stream_state.sender(), i);
 
         // If there are no freezes in the call we have to report
         // time_between_freezes_ms as call duration and in such case
diff --git a/test/pc/e2e/analyzer/video/default_video_quality_analyzer_stream_state.cc b/test/pc/e2e/analyzer/video/default_video_quality_analyzer_stream_state.cc
index f1d98b8..d59ef12 100644
--- a/test/pc/e2e/analyzer/video/default_video_quality_analyzer_stream_state.cc
+++ b/test/pc/e2e/analyzer/video/default_video_quality_analyzer_stream_state.cc
@@ -11,6 +11,7 @@
 #include "test/pc/e2e/analyzer/video/default_video_quality_analyzer_stream_state.h"
 
 #include <map>
+#include <set>
 
 #include "absl/types/optional.h"
 #include "api/units/timestamp.h"
@@ -30,26 +31,33 @@
 
 }  // namespace
 
+StreamState::StreamState(size_t sender,
+                         std::set<size_t> receivers,
+                         Timestamp stream_started_time)
+    : sender_(sender),
+      stream_started_time_(stream_started_time),
+      receivers_(receivers),
+      frame_ids_(std::move(receivers)) {
+  frame_ids_.AddReader(kAliveFramesQueueIndex);
+  RTC_CHECK_NE(sender_, kAliveFramesQueueIndex);
+  for (size_t receiver : receivers_) {
+    RTC_CHECK_NE(receiver, kAliveFramesQueueIndex);
+  }
+}
+
 uint16_t StreamState::PopFront(size_t peer) {
-  size_t peer_queue = GetPeerQueueIndex(peer);
-  size_t alive_frames_queue = GetAliveFramesQueueIndex();
-  absl::optional<uint16_t> frame_id = frame_ids_.PopFront(peer_queue);
+  RTC_CHECK_NE(peer, kAliveFramesQueueIndex);
+  absl::optional<uint16_t> frame_id = frame_ids_.PopFront(peer);
   RTC_DCHECK(frame_id.has_value());
 
   // If alive's frame queue is longer than all others, than also pop frame from
   // it, because that frame is received by all receivers.
-  size_t alive_size = frame_ids_.size(alive_frames_queue);
-  size_t other_size = 0;
-  for (size_t i = 0; i < frame_ids_.readers_count(); ++i) {
-    size_t cur_size = frame_ids_.size(i);
-    if (i != alive_frames_queue && cur_size > other_size) {
-      other_size = cur_size;
-    }
-  }
+  size_t alive_size = frame_ids_.size(kAliveFramesQueueIndex);
+  size_t other_size = GetLongestReceiverQueue();
   // Pops frame from alive queue if alive's queue is the longest one.
   if (alive_size > other_size) {
     absl::optional<uint16_t> alive_frame_id =
-        frame_ids_.PopFront(alive_frames_queue);
+        frame_ids_.PopFront(kAliveFramesQueueIndex);
     RTC_DCHECK(alive_frame_id.has_value());
     RTC_DCHECK_EQ(frame_id.value(), alive_frame_id.value());
   }
@@ -57,9 +65,30 @@
   return frame_id.value();
 }
 
+void StreamState::AddPeer(size_t peer) {
+  RTC_CHECK_NE(peer, kAliveFramesQueueIndex);
+  frame_ids_.AddReader(peer, kAliveFramesQueueIndex);
+  receivers_.insert(peer);
+}
+
+void StreamState::RemovePeer(size_t peer) {
+  RTC_CHECK_NE(peer, kAliveFramesQueueIndex);
+  frame_ids_.RemoveReader(peer);
+  receivers_.erase(peer);
+
+  // If we removed the last receiver for the alive frames, we need to pop them
+  // from the queue, because now they received by all receivers.
+  size_t alive_size = frame_ids_.size(kAliveFramesQueueIndex);
+  size_t other_size = GetLongestReceiverQueue();
+  while (alive_size > other_size) {
+    frame_ids_.PopFront(kAliveFramesQueueIndex);
+    alive_size--;
+  }
+}
+
 uint16_t StreamState::MarkNextAliveFrameAsDead() {
   absl::optional<uint16_t> frame_id =
-      frame_ids_.PopFront(GetAliveFramesQueueIndex());
+      frame_ids_.PopFront(kAliveFramesQueueIndex);
   RTC_DCHECK(frame_id.has_value());
   return frame_id.value();
 }
@@ -78,27 +107,15 @@
   return MaybeGetValue(last_rendered_frame_time_, peer);
 }
 
-size_t StreamState::GetPeerQueueIndex(size_t peer_index) const {
-  // When sender isn't expecting to receive its own stream we will use their
-  // queue for tracking alive frames. Otherwise we will use the queue #0 to
-  // track alive frames and will shift all other queues for peers on 1.
-  // It means when `enable_receive_own_stream_` is true peer's queue will have
-  // index equal to `peer_index` + 1 and when `enable_receive_own_stream_` is
-  // false peer's queue will have index equal to `peer_index`.
-  if (!enable_receive_own_stream_) {
-    return peer_index;
+size_t StreamState::GetLongestReceiverQueue() const {
+  size_t max = 0;
+  for (size_t receiver : receivers_) {
+    size_t cur_size = frame_ids_.size(receiver);
+    if (cur_size > max) {
+      max = cur_size;
+    }
   }
-  return peer_index + 1;
-}
-
-size_t StreamState::GetAliveFramesQueueIndex() const {
-  // When sender isn't expecting to receive its own stream we will use their
-  // queue for tracking alive frames. Otherwise we will use the queue #0 to
-  // track alive frames and will shift all other queues for peers on 1.
-  if (!enable_receive_own_stream_) {
-    return owner_;
-  }
-  return 0;
+  return max;
 }
 
 }  // namespace webrtc
diff --git a/test/pc/e2e/analyzer/video/default_video_quality_analyzer_stream_state.h b/test/pc/e2e/analyzer/video/default_video_quality_analyzer_stream_state.h
index 8cf41a3..829a79c 100644
--- a/test/pc/e2e/analyzer/video/default_video_quality_analyzer_stream_state.h
+++ b/test/pc/e2e/analyzer/video/default_video_quality_analyzer_stream_state.h
@@ -11,7 +11,9 @@
 #ifndef TEST_PC_E2E_ANALYZER_VIDEO_DEFAULT_VIDEO_QUALITY_ANALYZER_STREAM_STATE_H_
 #define TEST_PC_E2E_ANALYZER_VIDEO_DEFAULT_VIDEO_QUALITY_ANALYZER_STREAM_STATE_H_
 
+#include <limits>
 #include <map>
+#include <set>
 
 #include "absl/types/optional.h"
 #include "api/units/timestamp.h"
@@ -21,39 +23,44 @@
 
 // Represents a current state of video stream inside
 // DefaultVideoQualityAnalyzer.
+//
+// Maintains the sequence of frames for each video stream and keeps track about
+// which frames were seen by each of the possible stream receiver.
+//
+// Keeps information about which frames are alive and which are dead. Frame is
+// alive if it contains VideoFrame payload for corresponding FrameInFlight
+// object inside DefaultVideoQualityAnalyzer, otherwise frame is considered
+// dead.
+//
+// Supports peer indexes from 0 to max(size_t) - 1.
 class StreamState {
  public:
-  StreamState(size_t owner,
-              size_t peers_count,
-              bool enable_receive_own_stream,
-              Timestamp stream_started_time)
-      : owner_(owner),
-        enable_receive_own_stream_(enable_receive_own_stream),
-        stream_started_time_(stream_started_time),
-        frame_ids_(enable_receive_own_stream ? peers_count + 1 : peers_count) {}
+  StreamState(size_t sender,
+              std::set<size_t> receivers,
+              Timestamp stream_started_time);
 
-  size_t owner() const { return owner_; }
+  size_t sender() const { return sender_; }
   Timestamp stream_started_time() const { return stream_started_time_; }
 
   void PushBack(uint16_t frame_id) { frame_ids_.PushBack(frame_id); }
-  // Crash if state is empty. Guarantees that there can be no alive frames
-  // that are not in the owner queue
-  uint16_t PopFront(size_t peer);
-  bool IsEmpty(size_t peer) const {
-    return frame_ids_.IsEmpty(GetPeerQueueIndex(peer));
-  }
   // Crash if state is empty.
-  uint16_t Front(size_t peer) const {
-    return frame_ids_.Front(GetPeerQueueIndex(peer)).value();
-  }
+  uint16_t PopFront(size_t peer);
+  bool IsEmpty(size_t peer) const { return frame_ids_.IsEmpty(peer); }
+  // Crash if state is empty.
+  uint16_t Front(size_t peer) const { return frame_ids_.Front(peer).value(); }
 
-  // When new peer is added - all current alive frames will be sent to it as
-  // well. So we need to register them as expected by copying owner_ head to
-  // the new head.
-  void AddPeer() { frame_ids_.AddReader(GetAliveFramesQueueIndex()); }
+  // Adds a new peer to the state. All currently alive frames will be expected
+  // to be received by the newly added peer.
+  void AddPeer(size_t peer);
+
+  // Removes peer from the state. Frames that were expected to be received by
+  // this peer will be removed from it. On the other hand last rendered frame
+  // time for the removed peer will be preserved, because
+  // DefaultVideoQualityAnalyzer still may request it for stats processing.
+  void RemovePeer(size_t peer);
 
   size_t GetAliveFramesCount() const {
-    return frame_ids_.size(GetAliveFramesQueueIndex());
+    return frame_ids_.size(kAliveFramesQueueIndex);
   }
   uint16_t MarkNextAliveFrameAsDead();
 
@@ -61,19 +68,17 @@
   absl::optional<Timestamp> last_rendered_frame_time(size_t peer) const;
 
  private:
-  // Returns index of the `frame_ids_` queue which is used for specified
-  // `peer_index`.
-  size_t GetPeerQueueIndex(size_t peer_index) const;
+  // Index of the `frame_ids_` queue which is used to track alive frames for
+  // this stream.
+  static constexpr size_t kAliveFramesQueueIndex =
+      std::numeric_limits<size_t>::max();
 
-  // Returns index of the `frame_ids_` queue which is used to track alive
-  // frames for this stream. The frame is alive if it contains VideoFrame
-  // payload in `captured_frames_in_flight_`.
-  size_t GetAliveFramesQueueIndex() const;
+  size_t GetLongestReceiverQueue() const;
 
   // Index of the owner. Owner's queue in `frame_ids_` will keep alive frames.
-  const size_t owner_;
-  const bool enable_receive_own_stream_;
+  const size_t sender_;
   const Timestamp stream_started_time_;
+  std::set<size_t> receivers_;
   // To correctly determine dropped frames we have to know sequence of frames
   // in each stream so we will keep a list of frame ids inside the stream.
   // This list is represented by multi head queue of frame ids with separate
diff --git a/test/pc/e2e/analyzer/video/default_video_quality_analyzer_stream_state_test.cc b/test/pc/e2e/analyzer/video/default_video_quality_analyzer_stream_state_test.cc
new file mode 100644
index 0000000..01a6aab
--- /dev/null
+++ b/test/pc/e2e/analyzer/video/default_video_quality_analyzer_stream_state_test.cc
@@ -0,0 +1,126 @@
+/*
+ *  Copyright (c) 2022 The WebRTC project authors. All Rights Reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "test/pc/e2e/analyzer/video/default_video_quality_analyzer_stream_state.h"
+
+#include <set>
+
+#include "api/units/timestamp.h"
+#include "test/gtest.h"
+
+namespace webrtc {
+namespace {
+
+TEST(StreamStateTest, PopFrontAndFrontIndependentForEachPeer) {
+  StreamState state(/*sender=*/0,
+                    /*receivers=*/std::set<size_t>{1, 2},
+                    Timestamp::Seconds(1));
+  state.PushBack(/*frame_id=*/1);
+  state.PushBack(/*frame_id=*/2);
+
+  EXPECT_EQ(state.Front(/*peer=*/1), 1);
+  EXPECT_EQ(state.PopFront(/*peer=*/1), 1);
+  EXPECT_EQ(state.Front(/*peer=*/1), 2);
+  EXPECT_EQ(state.PopFront(/*peer=*/1), 2);
+  EXPECT_EQ(state.Front(/*peer=*/2), 1);
+  EXPECT_EQ(state.PopFront(/*peer=*/2), 1);
+  EXPECT_EQ(state.Front(/*peer=*/2), 2);
+  EXPECT_EQ(state.PopFront(/*peer=*/2), 2);
+}
+
+TEST(StreamStateTest, IsEmpty) {
+  StreamState state(/*sender=*/0,
+                    /*receivers=*/std::set<size_t>{1, 2},
+                    Timestamp::Seconds(1));
+  state.PushBack(/*frame_id=*/1);
+
+  EXPECT_FALSE(state.IsEmpty(/*peer=*/1));
+
+  state.PopFront(/*peer=*/1);
+
+  EXPECT_TRUE(state.IsEmpty(/*peer=*/1));
+}
+
+TEST(StreamStateTest, PopFrontForOnlyOnePeerDontChangeAliveFramesCount) {
+  StreamState state(/*sender=*/0,
+                    /*receivers=*/std::set<size_t>{1, 2},
+                    Timestamp::Seconds(1));
+  state.PushBack(/*frame_id=*/1);
+  state.PushBack(/*frame_id=*/2);
+
+  EXPECT_EQ(state.GetAliveFramesCount(), 2lu);
+
+  state.PopFront(/*peer=*/1);
+  state.PopFront(/*peer=*/1);
+
+  EXPECT_EQ(state.GetAliveFramesCount(), 2lu);
+}
+
+TEST(StreamStateTest, PopFrontForAllPeersReducesAliveFramesCount) {
+  StreamState state(/*sender=*/0,
+                    /*receivers=*/std::set<size_t>{1, 2},
+                    Timestamp::Seconds(1));
+  state.PushBack(/*frame_id=*/1);
+  state.PushBack(/*frame_id=*/2);
+
+  EXPECT_EQ(state.GetAliveFramesCount(), 2lu);
+
+  state.PopFront(/*peer=*/1);
+  state.PopFront(/*peer=*/2);
+
+  EXPECT_EQ(state.GetAliveFramesCount(), 1lu);
+}
+
+TEST(StreamStateTest, RemovePeerForLastExpectedReceiverUpdatesAliveFrames) {
+  StreamState state(/*sender=*/0,
+                    /*receivers=*/std::set<size_t>{1, 2},
+                    Timestamp::Seconds(1));
+  state.PushBack(/*frame_id=*/1);
+  state.PushBack(/*frame_id=*/2);
+
+  state.PopFront(/*peer=*/1);
+
+  EXPECT_EQ(state.GetAliveFramesCount(), 2lu);
+
+  state.RemovePeer(/*peer=*/2);
+
+  EXPECT_EQ(state.GetAliveFramesCount(), 1lu);
+}
+
+TEST(StreamStateTest, MarkNextAliveFrameAsDeadDecreseAliveFramesCount) {
+  StreamState state(/*sender=*/0,
+                    /*receivers=*/std::set<size_t>{1, 2},
+                    Timestamp::Seconds(1));
+  state.PushBack(/*frame_id=*/1);
+  state.PushBack(/*frame_id=*/2);
+
+  EXPECT_EQ(state.GetAliveFramesCount(), 2lu);
+
+  state.MarkNextAliveFrameAsDead();
+
+  EXPECT_EQ(state.GetAliveFramesCount(), 1lu);
+}
+
+TEST(StreamStateTest, MarkNextAliveFrameAsDeadDoesntAffectFrontFrameForPeer) {
+  StreamState state(/*sender=*/0,
+                    /*receivers=*/std::set<size_t>{1, 2},
+                    Timestamp::Seconds(1));
+  state.PushBack(/*frame_id=*/1);
+  state.PushBack(/*frame_id=*/2);
+
+  EXPECT_EQ(state.Front(/*peer=*/1), 1);
+
+  state.MarkNextAliveFrameAsDead();
+
+  EXPECT_EQ(state.Front(/*peer=*/1), 1);
+}
+
+}  // namespace
+}  // namespace webrtc
diff --git a/test/pc/e2e/analyzer/video/multi_reader_queue.h b/test/pc/e2e/analyzer/video/multi_reader_queue.h
index c8a7db0..39d26b4 100644
--- a/test/pc/e2e/analyzer/video/multi_reader_queue.h
+++ b/test/pc/e2e/analyzer/video/multi_reader_queue.h
@@ -42,14 +42,6 @@
     }
   }
 
-  // Adds a new reader, initializing its reading position (the reader's head)
-  // equal to the one of `reader_to_copy`. New reader will have name index
-  // equal to the current readers count.
-  // Complexity O(MultiReaderQueue::size(reader_to_copy)).
-  void AddReader(size_t reader_to_copy) {
-    AddReader(heads_.size(), reader_to_copy);
-  }
-
   // Adds a new `reader`, initializing its reading position (the reader's head)
   // equal to the one of `reader_to_copy`.
   // Complexity O(MultiReaderQueue::size(reader_to_copy)).
@@ -65,6 +57,19 @@
     }
   }
 
+  // Adds a new `reader`, initializing its reading position equal to first
+  // element in the queue.
+  // Complexity O(MultiReaderQueue::size()).
+  void AddReader(size_t reader) {
+    auto it = heads_.find(reader);
+    RTC_CHECK(it == heads_.end())
+        << "Reader " << reader << " is already in the queue";
+    heads_[reader] = removed_elements_count_;
+    for (size_t i = 0; i < queue_.size(); ++i) {
+      in_queues_[i]++;
+    }
+  }
+
   // Removes specified `reader` from the queue.
   // Complexity O(MultiReaderQueue::size(reader)).
   void RemoveReader(size_t reader) {
diff --git a/test/pc/e2e/analyzer/video/multi_reader_queue_test.cc b/test/pc/e2e/analyzer/video/multi_reader_queue_test.cc
index e41c4a8..ea6aa0a 100644
--- a/test/pc/e2e/analyzer/video/multi_reader_queue_test.cc
+++ b/test/pc/e2e/analyzer/video/multi_reader_queue_test.cc
@@ -17,92 +17,94 @@
 namespace {
 
 TEST(MultiReaderQueueTest, EmptyQueueEmptyForAllHeads) {
-  MultiReaderQueue<int> queue = MultiReaderQueue<int>(10);
+  MultiReaderQueue<int> queue = MultiReaderQueue<int>(/*readers_count=*/10);
   EXPECT_EQ(queue.size(), 0lu);
   for (int i = 0; i < 10; ++i) {
-    EXPECT_TRUE(queue.IsEmpty(i));
-    EXPECT_EQ(queue.size(i), 0lu);
-    EXPECT_FALSE(queue.PopFront(i).has_value());
-    EXPECT_FALSE(queue.Front(i).has_value());
+    EXPECT_TRUE(queue.IsEmpty(/*reader=*/i));
+    EXPECT_EQ(queue.size(/*reader=*/i), 0lu);
+    EXPECT_FALSE(queue.PopFront(/*reader=*/i).has_value());
+    EXPECT_FALSE(queue.Front(/*reader=*/i).has_value());
   }
 }
 
 TEST(MultiReaderQueueTest, SizeIsEqualForAllHeadsAfterAddOnly) {
-  MultiReaderQueue<int> queue = MultiReaderQueue<int>(10);
+  MultiReaderQueue<int> queue = MultiReaderQueue<int>(/*readers_count=*/10);
   queue.PushBack(1);
   queue.PushBack(2);
   queue.PushBack(3);
   EXPECT_EQ(queue.size(), 3lu);
   for (int i = 0; i < 10; ++i) {
-    EXPECT_FALSE(queue.IsEmpty(i));
-    EXPECT_EQ(queue.size(i), 3lu);
+    EXPECT_FALSE(queue.IsEmpty(/*reader=*/i));
+    EXPECT_EQ(queue.size(/*reader=*/i), 3lu);
   }
 }
 
 TEST(MultiReaderQueueTest, SizeIsCorrectAfterRemoveFromOnlyOneHead) {
-  MultiReaderQueue<int> queue = MultiReaderQueue<int>(10);
+  MultiReaderQueue<int> queue = MultiReaderQueue<int>(/*readers_count=*/10);
   for (int i = 0; i < 5; ++i) {
     queue.PushBack(i);
   }
   EXPECT_EQ(queue.size(), 5lu);
   // Removing elements from queue #0
   for (int i = 0; i < 5; ++i) {
-    EXPECT_EQ(queue.size(0), static_cast<size_t>(5 - i));
-    EXPECT_EQ(queue.PopFront(0), absl::optional<int>(i));
+    EXPECT_EQ(queue.size(/*reader=*/0), static_cast<size_t>(5 - i));
+    EXPECT_EQ(queue.PopFront(/*reader=*/0), absl::optional<int>(i));
     for (int j = 1; j < 10; ++j) {
-      EXPECT_EQ(queue.size(j), 5lu);
+      EXPECT_EQ(queue.size(/*reader=*/j), 5lu);
     }
   }
-  EXPECT_EQ(queue.size(0), 0lu);
-  EXPECT_TRUE(queue.IsEmpty(0));
+  EXPECT_EQ(queue.size(/*reader=*/0), 0lu);
+  EXPECT_TRUE(queue.IsEmpty(/*reader=*/0));
 }
 
 TEST(MultiReaderQueueTest, SingleHeadOneAddOneRemove) {
-  MultiReaderQueue<int> queue = MultiReaderQueue<int>(1);
+  MultiReaderQueue<int> queue = MultiReaderQueue<int>(/*readers_count=*/1);
   queue.PushBack(1);
   EXPECT_EQ(queue.size(), 1lu);
-  EXPECT_TRUE(queue.Front(0).has_value());
-  EXPECT_EQ(queue.Front(0).value(), 1);
-  absl::optional<int> value = queue.PopFront(0);
+  EXPECT_TRUE(queue.Front(/*reader=*/0).has_value());
+  EXPECT_EQ(queue.Front(/*reader=*/0).value(), 1);
+  absl::optional<int> value = queue.PopFront(/*reader=*/0);
   EXPECT_TRUE(value.has_value());
   EXPECT_EQ(value.value(), 1);
   EXPECT_EQ(queue.size(), 0lu);
-  EXPECT_TRUE(queue.IsEmpty(0));
+  EXPECT_TRUE(queue.IsEmpty(/*reader=*/0));
 }
 
 TEST(MultiReaderQueueTest, SingleHead) {
-  MultiReaderQueue<size_t> queue = MultiReaderQueue<size_t>(1);
+  MultiReaderQueue<size_t> queue =
+      MultiReaderQueue<size_t>(/*readers_count=*/1);
   for (size_t i = 0; i < 10; ++i) {
     queue.PushBack(i);
     EXPECT_EQ(queue.size(), i + 1);
   }
   for (size_t i = 0; i < 10; ++i) {
-    EXPECT_EQ(queue.Front(0), absl::optional<size_t>(i));
-    EXPECT_EQ(queue.PopFront(0), absl::optional<size_t>(i));
+    EXPECT_EQ(queue.Front(/*reader=*/0), absl::optional<size_t>(i));
+    EXPECT_EQ(queue.PopFront(/*reader=*/0), absl::optional<size_t>(i));
     EXPECT_EQ(queue.size(), 10 - i - 1);
   }
 }
 
 TEST(MultiReaderQueueTest, ThreeHeadsAddAllRemoveAllPerHead) {
-  MultiReaderQueue<size_t> queue = MultiReaderQueue<size_t>(3);
+  MultiReaderQueue<size_t> queue =
+      MultiReaderQueue<size_t>(/*readers_count=*/3);
   for (size_t i = 0; i < 10; ++i) {
     queue.PushBack(i);
     EXPECT_EQ(queue.size(), i + 1);
   }
   for (size_t i = 0; i < 10; ++i) {
-    absl::optional<size_t> value = queue.PopFront(0);
+    absl::optional<size_t> value = queue.PopFront(/*reader=*/0);
     EXPECT_EQ(queue.size(), 10lu);
     ASSERT_TRUE(value.has_value());
     EXPECT_EQ(value.value(), i);
   }
   for (size_t i = 0; i < 10; ++i) {
-    absl::optional<size_t> value = queue.PopFront(1);
+    absl::optional<size_t> value = queue.PopFront(/*reader=*/1);
     EXPECT_EQ(queue.size(), 10lu);
     ASSERT_TRUE(value.has_value());
     EXPECT_EQ(value.value(), i);
   }
   for (size_t i = 0; i < 10; ++i) {
-    absl::optional<size_t> value = queue.PopFront(2);
+    absl::optional<size_t> value = queue.PopFront(/*reader=*/2);
     EXPECT_EQ(queue.size(), 10 - i - 1);
     ASSERT_TRUE(value.has_value());
     EXPECT_EQ(value.value(), i);
@@ -110,15 +112,16 @@
 }
 
 TEST(MultiReaderQueueTest, ThreeHeadsAddAllRemoveAll) {
-  MultiReaderQueue<size_t> queue = MultiReaderQueue<size_t>(3);
+  MultiReaderQueue<size_t> queue =
+      MultiReaderQueue<size_t>(/*readers_count=*/3);
   for (size_t i = 0; i < 10; ++i) {
     queue.PushBack(i);
     EXPECT_EQ(queue.size(), i + 1);
   }
   for (size_t i = 0; i < 10; ++i) {
-    absl::optional<size_t> value1 = queue.PopFront(0);
-    absl::optional<size_t> value2 = queue.PopFront(1);
-    absl::optional<size_t> value3 = queue.PopFront(2);
+    absl::optional<size_t> value1 = queue.PopFront(/*reader=*/0);
+    absl::optional<size_t> value2 = queue.PopFront(/*reader=*/1);
+    absl::optional<size_t> value3 = queue.PopFront(/*reader=*/2);
     EXPECT_EQ(queue.size(), 10 - i - 1);
     ASSERT_TRUE(value1.has_value());
     ASSERT_TRUE(value2.has_value());
@@ -129,40 +132,65 @@
   }
 }
 
-TEST(MultiReaderQueueTest, AddReader) {
-  MultiReaderQueue<size_t> queue = MultiReaderQueue<size_t>(1);
+TEST(MultiReaderQueueTest, AddReaderSeeElementsOnlyFromReaderToCopy) {
+  MultiReaderQueue<size_t> queue =
+      MultiReaderQueue<size_t>(/*readers_count=*/2);
   for (size_t i = 0; i < 10; ++i) {
     queue.PushBack(i);
-    EXPECT_EQ(queue.size(), i + 1);
   }
-  queue.AddReader(0);
-  EXPECT_EQ(queue.readers_count(), 2lu);
+  for (size_t i = 0; i < 5; ++i) {
+    queue.PopFront(0);
+  }
+
+  queue.AddReader(/*reader=*/2, /*reader_to_copy=*/0);
+
+  EXPECT_EQ(queue.readers_count(), 3lu);
+  for (size_t i = 5; i < 10; ++i) {
+    absl::optional<size_t> value = queue.PopFront(/*reader=*/2);
+    EXPECT_EQ(queue.size(/*reader=*/2), 10 - i - 1);
+    ASSERT_TRUE(value.has_value());
+    EXPECT_EQ(value.value(), i);
+  }
+}
+
+TEST(MultiReaderQueueTest, AddReaderWithoutReaderToCopySeeFullQueue) {
+  MultiReaderQueue<size_t> queue =
+      MultiReaderQueue<size_t>(/*readers_count=*/2);
   for (size_t i = 0; i < 10; ++i) {
-    absl::optional<size_t> value1 = queue.PopFront(0);
-    absl::optional<size_t> value2 = queue.PopFront(1);
-    EXPECT_EQ(queue.size(), 10 - i - 1);
-    ASSERT_TRUE(value1.has_value());
-    ASSERT_TRUE(value2.has_value());
-    EXPECT_EQ(value1.value(), i);
-    EXPECT_EQ(value2.value(), i);
+    queue.PushBack(i);
+  }
+  for (size_t i = 0; i < 5; ++i) {
+    queue.PopFront(/*reader=*/0);
+  }
+
+  queue.AddReader(/*reader=*/2);
+
+  EXPECT_EQ(queue.readers_count(), 3lu);
+  for (size_t i = 0; i < 10; ++i) {
+    absl::optional<size_t> value = queue.PopFront(/*reader=*/2);
+    EXPECT_EQ(queue.size(/*reader=*/2), 10 - i - 1);
+    ASSERT_TRUE(value.has_value());
+    EXPECT_EQ(value.value(), i);
   }
 }
 
 TEST(MultiReaderQueueTest, RemoveReaderWontChangeOthers) {
-  MultiReaderQueue<size_t> queue = MultiReaderQueue<size_t>(2);
+  MultiReaderQueue<size_t> queue =
+      MultiReaderQueue<size_t>(/*readers_count=*/2);
   for (size_t i = 0; i < 10; ++i) {
     queue.PushBack(i);
   }
-  EXPECT_EQ(queue.size(1), 10lu);
+  EXPECT_EQ(queue.size(/*reader=*/1), 10lu);
 
   queue.RemoveReader(0);
 
   EXPECT_EQ(queue.readers_count(), 1lu);
-  EXPECT_EQ(queue.size(1), 10lu);
+  EXPECT_EQ(queue.size(/*reader=*/1), 10lu);
 }
 
 TEST(MultiReaderQueueTest, RemoveLastReaderMakesQueueEmpty) {
-  MultiReaderQueue<size_t> queue = MultiReaderQueue<size_t>(1);
+  MultiReaderQueue<size_t> queue =
+      MultiReaderQueue<size_t>(/*readers_count=*/1);
   for (size_t i = 0; i < 10; ++i) {
     queue.PushBack(i);
   }