[DVQA] Rewrite MultiHeadQueue and add ability to remove head

Rewrite MultiHeadQueue reducing space complexity from
O(readers count * queue size) to O(queue size + readers count).

Bug: b/231397778
Change-Id: Ifbd9c686915368773916ed86467f4de3f8e06af1
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/265621
Commit-Queue: Artem Titov <titovartem@webrtc.org>
Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37197}
diff --git a/test/pc/e2e/BUILD.gn b/test/pc/e2e/BUILD.gn
index 43bd800..12db7fb 100644
--- a/test/pc/e2e/BUILD.gn
+++ b/test/pc/e2e/BUILD.gn
@@ -36,7 +36,7 @@
       deps = [
         ":default_video_quality_analyzer_frames_comparator_test",
         ":default_video_quality_analyzer_test",
-        ":multi_head_queue_test",
+        ":multi_reader_queue_test",
         ":names_collection_test",
         ":peer_connection_e2e_smoke_test",
         ":single_process_encoded_image_data_injector_unittest",
@@ -585,11 +585,11 @@
       ]
     }
 
-    rtc_library("multi_head_queue_test") {
+    rtc_library("multi_reader_queue_test") {
       testonly = true
-      sources = [ "analyzer/video/multi_head_queue_test.cc" ]
+      sources = [ "analyzer/video/multi_reader_queue_test.cc" ]
       deps = [
-        ":multi_head_queue",
+        ":multi_reader_queue",
         "../../../test:test_support",
       ]
       absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ]
@@ -692,7 +692,6 @@
     deps = [
       ":default_video_quality_analyzer_internal",
       ":default_video_quality_analyzer_shared",
-      ":multi_head_queue",
       "../..:perf_test",
       "../../../api:array_view",
       "../../../api:video_quality_analyzer_api",
@@ -748,7 +747,7 @@
 
     deps = [
       ":default_video_quality_analyzer_shared",
-      ":multi_head_queue",
+      ":multi_reader_queue",
       "../../../api:array_view",
       "../../../api:scoped_refptr",
       "../../../api/numerics:numerics",
@@ -895,10 +894,10 @@
     ]
   }
 
-  rtc_library("multi_head_queue") {
+  rtc_library("multi_reader_queue") {
     visibility = [ "*" ]
     testonly = true
-    sources = [ "analyzer/video/multi_head_queue.h" ]
+    sources = [ "analyzer/video/multi_reader_queue.h" ]
     deps = [ "../../../rtc_base:checks" ]
     absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ]
   }
diff --git a/test/pc/e2e/analyzer/video/default_video_quality_analyzer.h b/test/pc/e2e/analyzer/video/default_video_quality_analyzer.h
index 6d4818f..188588b 100644
--- a/test/pc/e2e/analyzer/video/default_video_quality_analyzer.h
+++ b/test/pc/e2e/analyzer/video/default_video_quality_analyzer.h
@@ -39,7 +39,6 @@
 #include "test/pc/e2e/analyzer/video/default_video_quality_analyzer_internal_shared_objects.h"
 #include "test/pc/e2e/analyzer/video/default_video_quality_analyzer_shared_objects.h"
 #include "test/pc/e2e/analyzer/video/default_video_quality_analyzer_stream_state.h"
-#include "test/pc/e2e/analyzer/video/multi_head_queue.h"
 #include "test/pc/e2e/analyzer/video/names_collection.h"
 #include "test/testsupport/perf_test.h"
 
diff --git a/test/pc/e2e/analyzer/video/default_video_quality_analyzer_stream_state.h b/test/pc/e2e/analyzer/video/default_video_quality_analyzer_stream_state.h
index c127dd1..8cf41a3 100644
--- a/test/pc/e2e/analyzer/video/default_video_quality_analyzer_stream_state.h
+++ b/test/pc/e2e/analyzer/video/default_video_quality_analyzer_stream_state.h
@@ -15,7 +15,7 @@
 
 #include "absl/types/optional.h"
 #include "api/units/timestamp.h"
-#include "test/pc/e2e/analyzer/video/multi_head_queue.h"
+#include "test/pc/e2e/analyzer/video/multi_reader_queue.h"
 
 namespace webrtc {
 
@@ -50,7 +50,7 @@
   // When new peer is added - all current alive frames will be sent to it as
   // well. So we need to register them as expected by copying owner_ head to
   // the new head.
-  void AddPeer() { frame_ids_.AddHead(GetAliveFramesQueueIndex()); }
+  void AddPeer() { frame_ids_.AddReader(GetAliveFramesQueueIndex()); }
 
   size_t GetAliveFramesCount() const {
     return frame_ids_.size(GetAliveFramesQueueIndex());
@@ -86,7 +86,7 @@
   // If we received frame with id frame_id3, then we will pop frame_id1 and
   // frame_id2 and consider those frames as dropped and then compare received
   // frame with the one from `FrameInFlight` with id frame_id3.
-  MultiHeadQueue<uint16_t> frame_ids_;
+  MultiReaderQueue<uint16_t> frame_ids_;
   std::map<size_t, Timestamp> last_rendered_frame_time_;
 };
 
diff --git a/test/pc/e2e/analyzer/video/multi_head_queue.h b/test/pc/e2e/analyzer/video/multi_head_queue.h
deleted file mode 100644
index eef862b..0000000
--- a/test/pc/e2e/analyzer/video/multi_head_queue.h
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- *  Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
- *
- *  Use of this source code is governed by a BSD-style license
- *  that can be found in the LICENSE file in the root of the source
- *  tree. An additional intellectual property rights grant can be found
- *  in the file PATENTS.  All contributing project authors may
- *  be found in the AUTHORS file in the root of the source tree.
- */
-
-#ifndef TEST_PC_E2E_ANALYZER_VIDEO_MULTI_HEAD_QUEUE_H_
-#define TEST_PC_E2E_ANALYZER_VIDEO_MULTI_HEAD_QUEUE_H_
-
-#include <deque>
-#include <memory>
-#include <vector>
-
-#include "absl/types/optional.h"
-#include "rtc_base/checks.h"
-
-namespace webrtc {
-
-// A queue that allows more than one reader. Readers are independent, and all
-// readers will see all elements; an inserted element stays in the queue until
-// all readers have extracted it. Elements are copied and copying is assumed to
-// be cheap.
-template <typename T>
-class MultiHeadQueue {
- public:
-  // Creates queue with exactly `readers_count` readers.
-  explicit MultiHeadQueue(size_t readers_count) {
-    for (size_t i = 0; i < readers_count; ++i) {
-      queues_.push_back(std::deque<T>());
-    }
-  }
-
-  // Creates a copy of an existing head. Complexity O(MultiHeadQueue::size()).
-  // `copy_index` - index of the queue that will be used as a source for
-  //     copying.
-  void AddHead(size_t copy_index) { queues_.push_back(queues_[copy_index]); }
-
-  // Add value to the end of the queue. Complexity O(readers_count).
-  void PushBack(T value) {
-    for (auto& queue : queues_) {
-      queue.push_back(value);
-    }
-  }
-
-  // Extract element from specified head. Complexity O(1).
-  absl::optional<T> PopFront(size_t index) {
-    RTC_CHECK_LT(index, queues_.size());
-    if (queues_[index].empty()) {
-      return absl::nullopt;
-    }
-    T out = queues_[index].front();
-    queues_[index].pop_front();
-    return out;
-  }
-
-  // Returns element at specified head. Complexity O(1).
-  absl::optional<T> Front(size_t index) const {
-    RTC_CHECK_LT(index, queues_.size());
-    if (queues_[index].empty()) {
-      return absl::nullopt;
-    }
-    return queues_[index].front();
-  }
-
-  // Returns true if for specified head there are no more elements in the queue
-  // or false otherwise. Complexity O(1).
-  bool IsEmpty(size_t index) const {
-    RTC_CHECK_LT(index, queues_.size());
-    return queues_[index].empty();
-  }
-
-  // Returns size of the longest queue between all readers.
-  // Complexity O(readers_count).
-  size_t size() const {
-    size_t size = 0;
-    for (auto& queue : queues_) {
-      if (queue.size() > size) {
-        size = queue.size();
-      }
-    }
-    return size;
-  }
-
-  // Returns size of the specified queue. Complexity O(1).
-  size_t size(size_t index) const {
-    RTC_CHECK_LT(index, queues_.size());
-    return queues_[index].size();
-  }
-
-  size_t readers_count() const { return queues_.size(); }
-
- private:
-  std::vector<std::deque<T>> queues_;
-};
-
-}  // namespace webrtc
-
-#endif  // TEST_PC_E2E_ANALYZER_VIDEO_MULTI_HEAD_QUEUE_H_
diff --git a/test/pc/e2e/analyzer/video/multi_head_queue_test.cc b/test/pc/e2e/analyzer/video/multi_head_queue_test.cc
deleted file mode 100644
index 2aa6fd8..0000000
--- a/test/pc/e2e/analyzer/video/multi_head_queue_test.cc
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- *  Copyright (c) 2020 The WebRTC project authors. All Rights Reserved.
- *
- *  Use of this source code is governed by a BSD-style license
- *  that can be found in the LICENSE file in the root of the source
- *  tree. An additional intellectual property rights grant can be found
- *  in the file PATENTS.  All contributing project authors may
- *  be found in the AUTHORS file in the root of the source tree.
- */
-
-#include "test/pc/e2e/analyzer/video/multi_head_queue.h"
-#include "absl/types/optional.h"
-#include "test/gtest.h"
-
-namespace webrtc {
-namespace {
-
-TEST(MultiHeadQueueTest, GetOnEmpty) {
-  MultiHeadQueue<int> queue = MultiHeadQueue<int>(10);
-  EXPECT_TRUE(queue.IsEmpty(0));
-  for (int i = 0; i < 10; ++i) {
-    EXPECT_FALSE(queue.PopFront(i).has_value());
-    EXPECT_FALSE(queue.Front(i).has_value());
-  }
-}
-
-TEST(MultiHeadQueueTest, SingleHeadOneAddOneRemove) {
-  MultiHeadQueue<int> queue = MultiHeadQueue<int>(1);
-  queue.PushBack(1);
-  EXPECT_EQ(queue.size(), 1lu);
-  EXPECT_TRUE(queue.Front(0).has_value());
-  EXPECT_EQ(queue.Front(0).value(), 1);
-  absl::optional<int> value = queue.PopFront(0);
-  EXPECT_TRUE(value.has_value());
-  EXPECT_EQ(value.value(), 1);
-  EXPECT_EQ(queue.size(), 0lu);
-  EXPECT_TRUE(queue.IsEmpty(0));
-}
-
-TEST(MultiHeadQueueTest, SingleHead) {
-  MultiHeadQueue<size_t> queue = MultiHeadQueue<size_t>(1);
-  for (size_t i = 0; i < 10; ++i) {
-    queue.PushBack(i);
-    EXPECT_EQ(queue.size(), i + 1);
-  }
-  for (size_t i = 0; i < 10; ++i) {
-    absl::optional<size_t> value = queue.PopFront(0);
-    EXPECT_EQ(queue.size(), 10 - i - 1);
-    ASSERT_TRUE(value.has_value());
-    EXPECT_EQ(value.value(), i);
-  }
-}
-
-TEST(MultiHeadQueueTest, ThreeHeadsAddAllRemoveAllPerHead) {
-  MultiHeadQueue<size_t> queue = MultiHeadQueue<size_t>(3);
-  for (size_t i = 0; i < 10; ++i) {
-    queue.PushBack(i);
-    EXPECT_EQ(queue.size(), i + 1);
-  }
-  for (size_t i = 0; i < 10; ++i) {
-    absl::optional<size_t> value = queue.PopFront(0);
-    EXPECT_EQ(queue.size(), 10lu);
-    ASSERT_TRUE(value.has_value());
-    EXPECT_EQ(value.value(), i);
-  }
-  for (size_t i = 0; i < 10; ++i) {
-    absl::optional<size_t> value = queue.PopFront(1);
-    EXPECT_EQ(queue.size(), 10lu);
-    ASSERT_TRUE(value.has_value());
-    EXPECT_EQ(value.value(), i);
-  }
-  for (size_t i = 0; i < 10; ++i) {
-    absl::optional<size_t> value = queue.PopFront(2);
-    EXPECT_EQ(queue.size(), 10 - i - 1);
-    ASSERT_TRUE(value.has_value());
-    EXPECT_EQ(value.value(), i);
-  }
-}
-
-TEST(MultiHeadQueueTest, ThreeHeadsAddAllRemoveAll) {
-  MultiHeadQueue<size_t> queue = MultiHeadQueue<size_t>(3);
-  for (size_t i = 0; i < 10; ++i) {
-    queue.PushBack(i);
-    EXPECT_EQ(queue.size(), i + 1);
-  }
-  for (size_t i = 0; i < 10; ++i) {
-    absl::optional<size_t> value1 = queue.PopFront(0);
-    absl::optional<size_t> value2 = queue.PopFront(1);
-    absl::optional<size_t> value3 = queue.PopFront(2);
-    EXPECT_EQ(queue.size(), 10 - i - 1);
-    ASSERT_TRUE(value1.has_value());
-    ASSERT_TRUE(value2.has_value());
-    ASSERT_TRUE(value3.has_value());
-    EXPECT_EQ(value1.value(), i);
-    EXPECT_EQ(value2.value(), i);
-    EXPECT_EQ(value3.value(), i);
-  }
-}
-
-TEST(MultiHeadQueueTest, HeadCopy) {
-  MultiHeadQueue<size_t> queue = MultiHeadQueue<size_t>(1);
-  for (size_t i = 0; i < 10; ++i) {
-    queue.PushBack(i);
-    EXPECT_EQ(queue.size(), i + 1);
-  }
-  queue.AddHead(0);
-  EXPECT_EQ(queue.readers_count(), 2u);
-  for (size_t i = 0; i < 10; ++i) {
-    absl::optional<size_t> value1 = queue.PopFront(0);
-    absl::optional<size_t> value2 = queue.PopFront(1);
-    EXPECT_EQ(queue.size(), 10 - i - 1);
-    ASSERT_TRUE(value1.has_value());
-    ASSERT_TRUE(value2.has_value());
-    EXPECT_EQ(value1.value(), i);
-    EXPECT_EQ(value2.value(), i);
-  }
-}
-
-}  // namespace
-}  // namespace webrtc
diff --git a/test/pc/e2e/analyzer/video/multi_reader_queue.h b/test/pc/e2e/analyzer/video/multi_reader_queue.h
new file mode 100644
index 0000000..c8a7db0
--- /dev/null
+++ b/test/pc/e2e/analyzer/video/multi_reader_queue.h
@@ -0,0 +1,163 @@
+/*
+ *  Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+
+#ifndef TEST_PC_E2E_ANALYZER_VIDEO_MULTI_READER_QUEUE_H_
+#define TEST_PC_E2E_ANALYZER_VIDEO_MULTI_READER_QUEUE_H_
+
+#include <deque>
+#include <memory>
+#include <set>
+#include <unordered_map>
+
+#include "absl/types/optional.h"
+#include "rtc_base/checks.h"
+
+namespace webrtc {
+
+// Represents the queue which can be read by multiple readers. Each reader reads
+// from its own queue head. When an element is added it will become visible for
+// all readers. When an element will be removed by all the readers, the element
+// will be removed from the queue.
+template <typename T>
+class MultiReaderQueue {
+ public:
+  // Creates queue with exactly `readers_count` readers named from 0 to
+  // `readers_count - 1`.
+  explicit MultiReaderQueue(size_t readers_count) {
+    for (size_t i = 0; i < readers_count; ++i) {
+      heads_[i] = 0;
+    }
+  }
+  // Creates queue with specified readers.
+  explicit MultiReaderQueue(std::set<size_t> readers) {
+    for (size_t reader : readers) {
+      heads_[reader] = 0;
+    }
+  }
+
+  // Adds a new reader, initializing its reading position (the reader's head)
+  // equal to the one of `reader_to_copy`. New reader will have name index
+  // equal to the current readers count.
+  // Complexity O(MultiReaderQueue::size(reader_to_copy)).
+  void AddReader(size_t reader_to_copy) {
+    AddReader(heads_.size(), reader_to_copy);
+  }
+
+  // Adds a new `reader`, initializing its reading position (the reader's head)
+  // equal to the one of `reader_to_copy`.
+  // Complexity O(MultiReaderQueue::size(reader_to_copy)).
+  void AddReader(size_t reader, size_t reader_to_copy) {
+    size_t pos = GetHeadPositionOrDie(reader_to_copy);
+
+    auto it = heads_.find(reader);
+    RTC_CHECK(it == heads_.end())
+        << "Reader " << reader << " is already in the queue";
+    heads_[reader] = heads_[reader_to_copy];
+    for (size_t i = pos; i < queue_.size(); ++i) {
+      in_queues_[i]++;
+    }
+  }
+
+  // Removes specified `reader` from the queue.
+  // Complexity O(MultiReaderQueue::size(reader)).
+  void RemoveReader(size_t reader) {
+    size_t pos = GetHeadPositionOrDie(reader);
+    for (size_t i = pos; i < queue_.size(); ++i) {
+      in_queues_[i]--;
+    }
+    while (!in_queues_.empty() && in_queues_[0] == 0) {
+      PopFront();
+    }
+    heads_.erase(reader);
+  }
+
+  // Add value to the end of the queue. Complexity O(1).
+  void PushBack(T value) {
+    queue_.push_back(value);
+    in_queues_.push_back(heads_.size());
+  }
+
+  // Extract element from specified head. Complexity O(1).
+  absl::optional<T> PopFront(size_t reader) {
+    size_t pos = GetHeadPositionOrDie(reader);
+    if (pos >= queue_.size()) {
+      return absl::nullopt;
+    }
+
+    T out = queue_[pos];
+
+    in_queues_[pos]--;
+    heads_[reader]++;
+
+    if (in_queues_[pos] == 0) {
+      RTC_DCHECK_EQ(pos, 0);
+      PopFront();
+    }
+    return out;
+  }
+
+  // Returns element at specified head. Complexity O(1).
+  absl::optional<T> Front(size_t reader) const {
+    size_t pos = GetHeadPositionOrDie(reader);
+    if (pos >= queue_.size()) {
+      return absl::nullopt;
+    }
+    return queue_[pos];
+  }
+
+  // Returns true if for specified head there are no more elements in the queue
+  // or false otherwise. Complexity O(1).
+  bool IsEmpty(size_t reader) const {
+    size_t pos = GetHeadPositionOrDie(reader);
+    return pos >= queue_.size();
+  }
+
+  // Returns size of the longest queue between all readers.
+  // Complexity O(1).
+  size_t size() const { return queue_.size(); }
+
+  // Returns size of the specified queue. Complexity O(1).
+  size_t size(size_t reader) const {
+    size_t pos = GetHeadPositionOrDie(reader);
+    return queue_.size() - pos;
+  }
+
+  // Complexity O(1).
+  size_t readers_count() const { return heads_.size(); }
+
+ private:
+  size_t GetHeadPositionOrDie(size_t reader) const {
+    auto it = heads_.find(reader);
+    RTC_CHECK(it != heads_.end()) << "No queue for reader " << reader;
+    return it->second - removed_elements_count_;
+  }
+
+  void PopFront() {
+    RTC_DCHECK(!queue_.empty());
+    RTC_DCHECK_EQ(in_queues_[0], 0);
+    queue_.pop_front();
+    in_queues_.pop_front();
+    removed_elements_count_++;
+  }
+
+  // Number of the elements that were removed from the queue. It is used to
+  // subtract from each head to compute the right index inside `queue_`;
+  size_t removed_elements_count_ = 0;
+  std::deque<T> queue_;
+  // In how may queues the element at index `i` is. An element can be removed
+  // from the front if and only if it is in 0 queues.
+  std::deque<size_t> in_queues_;
+  // Map from the reader to the head position in the queue.
+  std::unordered_map<size_t, size_t> heads_;
+};
+
+}  // namespace webrtc
+
+#endif  // TEST_PC_E2E_ANALYZER_VIDEO_MULTI_READER_QUEUE_H_
diff --git a/test/pc/e2e/analyzer/video/multi_reader_queue_test.cc b/test/pc/e2e/analyzer/video/multi_reader_queue_test.cc
new file mode 100644
index 0000000..e41c4a8
--- /dev/null
+++ b/test/pc/e2e/analyzer/video/multi_reader_queue_test.cc
@@ -0,0 +1,178 @@
+/*
+ *  Copyright (c) 2020 The WebRTC project authors. All Rights Reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "test/pc/e2e/analyzer/video/multi_reader_queue.h"
+
+#include "absl/types/optional.h"
+#include "test/gtest.h"
+
+namespace webrtc {
+namespace {
+
+TEST(MultiReaderQueueTest, EmptyQueueEmptyForAllHeads) {
+  MultiReaderQueue<int> queue = MultiReaderQueue<int>(10);
+  EXPECT_EQ(queue.size(), 0lu);
+  for (int i = 0; i < 10; ++i) {
+    EXPECT_TRUE(queue.IsEmpty(i));
+    EXPECT_EQ(queue.size(i), 0lu);
+    EXPECT_FALSE(queue.PopFront(i).has_value());
+    EXPECT_FALSE(queue.Front(i).has_value());
+  }
+}
+
+TEST(MultiReaderQueueTest, SizeIsEqualForAllHeadsAfterAddOnly) {
+  MultiReaderQueue<int> queue = MultiReaderQueue<int>(10);
+  queue.PushBack(1);
+  queue.PushBack(2);
+  queue.PushBack(3);
+  EXPECT_EQ(queue.size(), 3lu);
+  for (int i = 0; i < 10; ++i) {
+    EXPECT_FALSE(queue.IsEmpty(i));
+    EXPECT_EQ(queue.size(i), 3lu);
+  }
+}
+
+TEST(MultiReaderQueueTest, SizeIsCorrectAfterRemoveFromOnlyOneHead) {
+  MultiReaderQueue<int> queue = MultiReaderQueue<int>(10);
+  for (int i = 0; i < 5; ++i) {
+    queue.PushBack(i);
+  }
+  EXPECT_EQ(queue.size(), 5lu);
+  // Removing elements from queue #0
+  for (int i = 0; i < 5; ++i) {
+    EXPECT_EQ(queue.size(0), static_cast<size_t>(5 - i));
+    EXPECT_EQ(queue.PopFront(0), absl::optional<int>(i));
+    for (int j = 1; j < 10; ++j) {
+      EXPECT_EQ(queue.size(j), 5lu);
+    }
+  }
+  EXPECT_EQ(queue.size(0), 0lu);
+  EXPECT_TRUE(queue.IsEmpty(0));
+}
+
+TEST(MultiReaderQueueTest, SingleHeadOneAddOneRemove) {
+  MultiReaderQueue<int> queue = MultiReaderQueue<int>(1);
+  queue.PushBack(1);
+  EXPECT_EQ(queue.size(), 1lu);
+  EXPECT_TRUE(queue.Front(0).has_value());
+  EXPECT_EQ(queue.Front(0).value(), 1);
+  absl::optional<int> value = queue.PopFront(0);
+  EXPECT_TRUE(value.has_value());
+  EXPECT_EQ(value.value(), 1);
+  EXPECT_EQ(queue.size(), 0lu);
+  EXPECT_TRUE(queue.IsEmpty(0));
+}
+
+TEST(MultiReaderQueueTest, SingleHead) {
+  MultiReaderQueue<size_t> queue = MultiReaderQueue<size_t>(1);
+  for (size_t i = 0; i < 10; ++i) {
+    queue.PushBack(i);
+    EXPECT_EQ(queue.size(), i + 1);
+  }
+  for (size_t i = 0; i < 10; ++i) {
+    EXPECT_EQ(queue.Front(0), absl::optional<size_t>(i));
+    EXPECT_EQ(queue.PopFront(0), absl::optional<size_t>(i));
+    EXPECT_EQ(queue.size(), 10 - i - 1);
+  }
+}
+
+TEST(MultiReaderQueueTest, ThreeHeadsAddAllRemoveAllPerHead) {
+  MultiReaderQueue<size_t> queue = MultiReaderQueue<size_t>(3);
+  for (size_t i = 0; i < 10; ++i) {
+    queue.PushBack(i);
+    EXPECT_EQ(queue.size(), i + 1);
+  }
+  for (size_t i = 0; i < 10; ++i) {
+    absl::optional<size_t> value = queue.PopFront(0);
+    EXPECT_EQ(queue.size(), 10lu);
+    ASSERT_TRUE(value.has_value());
+    EXPECT_EQ(value.value(), i);
+  }
+  for (size_t i = 0; i < 10; ++i) {
+    absl::optional<size_t> value = queue.PopFront(1);
+    EXPECT_EQ(queue.size(), 10lu);
+    ASSERT_TRUE(value.has_value());
+    EXPECT_EQ(value.value(), i);
+  }
+  for (size_t i = 0; i < 10; ++i) {
+    absl::optional<size_t> value = queue.PopFront(2);
+    EXPECT_EQ(queue.size(), 10 - i - 1);
+    ASSERT_TRUE(value.has_value());
+    EXPECT_EQ(value.value(), i);
+  }
+}
+
+TEST(MultiReaderQueueTest, ThreeHeadsAddAllRemoveAll) {
+  MultiReaderQueue<size_t> queue = MultiReaderQueue<size_t>(3);
+  for (size_t i = 0; i < 10; ++i) {
+    queue.PushBack(i);
+    EXPECT_EQ(queue.size(), i + 1);
+  }
+  for (size_t i = 0; i < 10; ++i) {
+    absl::optional<size_t> value1 = queue.PopFront(0);
+    absl::optional<size_t> value2 = queue.PopFront(1);
+    absl::optional<size_t> value3 = queue.PopFront(2);
+    EXPECT_EQ(queue.size(), 10 - i - 1);
+    ASSERT_TRUE(value1.has_value());
+    ASSERT_TRUE(value2.has_value());
+    ASSERT_TRUE(value3.has_value());
+    EXPECT_EQ(value1.value(), i);
+    EXPECT_EQ(value2.value(), i);
+    EXPECT_EQ(value3.value(), i);
+  }
+}
+
+TEST(MultiReaderQueueTest, AddReader) {
+  MultiReaderQueue<size_t> queue = MultiReaderQueue<size_t>(1);
+  for (size_t i = 0; i < 10; ++i) {
+    queue.PushBack(i);
+    EXPECT_EQ(queue.size(), i + 1);
+  }
+  queue.AddReader(0);
+  EXPECT_EQ(queue.readers_count(), 2lu);
+  for (size_t i = 0; i < 10; ++i) {
+    absl::optional<size_t> value1 = queue.PopFront(0);
+    absl::optional<size_t> value2 = queue.PopFront(1);
+    EXPECT_EQ(queue.size(), 10 - i - 1);
+    ASSERT_TRUE(value1.has_value());
+    ASSERT_TRUE(value2.has_value());
+    EXPECT_EQ(value1.value(), i);
+    EXPECT_EQ(value2.value(), i);
+  }
+}
+
+TEST(MultiReaderQueueTest, RemoveReaderWontChangeOthers) {
+  MultiReaderQueue<size_t> queue = MultiReaderQueue<size_t>(2);
+  for (size_t i = 0; i < 10; ++i) {
+    queue.PushBack(i);
+  }
+  EXPECT_EQ(queue.size(1), 10lu);
+
+  queue.RemoveReader(0);
+
+  EXPECT_EQ(queue.readers_count(), 1lu);
+  EXPECT_EQ(queue.size(1), 10lu);
+}
+
+TEST(MultiReaderQueueTest, RemoveLastReaderMakesQueueEmpty) {
+  MultiReaderQueue<size_t> queue = MultiReaderQueue<size_t>(1);
+  for (size_t i = 0; i < 10; ++i) {
+    queue.PushBack(i);
+  }
+  EXPECT_EQ(queue.size(), 10lu);
+
+  queue.RemoveReader(0);
+
+  EXPECT_EQ(queue.size(), 0lu);
+  EXPECT_EQ(queue.readers_count(), 0lu);
+}
+
+}  // namespace
+}  // namespace webrtc