[DVQA] Rewrite MultiHeadQueue and add ability to remove head
Rewrite MultiHeadQueue reducing space complexity from
O(readers count * queue size) to O(queue size + readers count).
Bug: b/231397778
Change-Id: Ifbd9c686915368773916ed86467f4de3f8e06af1
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/265621
Commit-Queue: Artem Titov <titovartem@webrtc.org>
Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37197}
diff --git a/test/pc/e2e/BUILD.gn b/test/pc/e2e/BUILD.gn
index 43bd800..12db7fb 100644
--- a/test/pc/e2e/BUILD.gn
+++ b/test/pc/e2e/BUILD.gn
@@ -36,7 +36,7 @@
deps = [
":default_video_quality_analyzer_frames_comparator_test",
":default_video_quality_analyzer_test",
- ":multi_head_queue_test",
+ ":multi_reader_queue_test",
":names_collection_test",
":peer_connection_e2e_smoke_test",
":single_process_encoded_image_data_injector_unittest",
@@ -585,11 +585,11 @@
]
}
- rtc_library("multi_head_queue_test") {
+ rtc_library("multi_reader_queue_test") {
testonly = true
- sources = [ "analyzer/video/multi_head_queue_test.cc" ]
+ sources = [ "analyzer/video/multi_reader_queue_test.cc" ]
deps = [
- ":multi_head_queue",
+ ":multi_reader_queue",
"../../../test:test_support",
]
absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ]
@@ -692,7 +692,6 @@
deps = [
":default_video_quality_analyzer_internal",
":default_video_quality_analyzer_shared",
- ":multi_head_queue",
"../..:perf_test",
"../../../api:array_view",
"../../../api:video_quality_analyzer_api",
@@ -748,7 +747,7 @@
deps = [
":default_video_quality_analyzer_shared",
- ":multi_head_queue",
+ ":multi_reader_queue",
"../../../api:array_view",
"../../../api:scoped_refptr",
"../../../api/numerics:numerics",
@@ -895,10 +894,10 @@
]
}
- rtc_library("multi_head_queue") {
+ rtc_library("multi_reader_queue") {
visibility = [ "*" ]
testonly = true
- sources = [ "analyzer/video/multi_head_queue.h" ]
+ sources = [ "analyzer/video/multi_reader_queue.h" ]
deps = [ "../../../rtc_base:checks" ]
absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ]
}
diff --git a/test/pc/e2e/analyzer/video/default_video_quality_analyzer.h b/test/pc/e2e/analyzer/video/default_video_quality_analyzer.h
index 6d4818f..188588b 100644
--- a/test/pc/e2e/analyzer/video/default_video_quality_analyzer.h
+++ b/test/pc/e2e/analyzer/video/default_video_quality_analyzer.h
@@ -39,7 +39,6 @@
#include "test/pc/e2e/analyzer/video/default_video_quality_analyzer_internal_shared_objects.h"
#include "test/pc/e2e/analyzer/video/default_video_quality_analyzer_shared_objects.h"
#include "test/pc/e2e/analyzer/video/default_video_quality_analyzer_stream_state.h"
-#include "test/pc/e2e/analyzer/video/multi_head_queue.h"
#include "test/pc/e2e/analyzer/video/names_collection.h"
#include "test/testsupport/perf_test.h"
diff --git a/test/pc/e2e/analyzer/video/default_video_quality_analyzer_stream_state.h b/test/pc/e2e/analyzer/video/default_video_quality_analyzer_stream_state.h
index c127dd1..8cf41a3 100644
--- a/test/pc/e2e/analyzer/video/default_video_quality_analyzer_stream_state.h
+++ b/test/pc/e2e/analyzer/video/default_video_quality_analyzer_stream_state.h
@@ -15,7 +15,7 @@
#include "absl/types/optional.h"
#include "api/units/timestamp.h"
-#include "test/pc/e2e/analyzer/video/multi_head_queue.h"
+#include "test/pc/e2e/analyzer/video/multi_reader_queue.h"
namespace webrtc {
@@ -50,7 +50,7 @@
// When new peer is added - all current alive frames will be sent to it as
// well. So we need to register them as expected by copying owner_ head to
// the new head.
- void AddPeer() { frame_ids_.AddHead(GetAliveFramesQueueIndex()); }
+ void AddPeer() { frame_ids_.AddReader(GetAliveFramesQueueIndex()); }
size_t GetAliveFramesCount() const {
return frame_ids_.size(GetAliveFramesQueueIndex());
@@ -86,7 +86,7 @@
// If we received frame with id frame_id3, then we will pop frame_id1 and
// frame_id2 and consider those frames as dropped and then compare received
// frame with the one from `FrameInFlight` with id frame_id3.
- MultiHeadQueue<uint16_t> frame_ids_;
+ MultiReaderQueue<uint16_t> frame_ids_;
std::map<size_t, Timestamp> last_rendered_frame_time_;
};
diff --git a/test/pc/e2e/analyzer/video/multi_head_queue.h b/test/pc/e2e/analyzer/video/multi_head_queue.h
deleted file mode 100644
index eef862b..0000000
--- a/test/pc/e2e/analyzer/video/multi_head_queue.h
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
- *
- * Use of this source code is governed by a BSD-style license
- * that can be found in the LICENSE file in the root of the source
- * tree. An additional intellectual property rights grant can be found
- * in the file PATENTS. All contributing project authors may
- * be found in the AUTHORS file in the root of the source tree.
- */
-
-#ifndef TEST_PC_E2E_ANALYZER_VIDEO_MULTI_HEAD_QUEUE_H_
-#define TEST_PC_E2E_ANALYZER_VIDEO_MULTI_HEAD_QUEUE_H_
-
-#include <deque>
-#include <memory>
-#include <vector>
-
-#include "absl/types/optional.h"
-#include "rtc_base/checks.h"
-
-namespace webrtc {
-
-// A queue that allows more than one reader. Readers are independent, and all
-// readers will see all elements; an inserted element stays in the queue until
-// all readers have extracted it. Elements are copied and copying is assumed to
-// be cheap.
-template <typename T>
-class MultiHeadQueue {
- public:
- // Creates queue with exactly `readers_count` readers.
- explicit MultiHeadQueue(size_t readers_count) {
- for (size_t i = 0; i < readers_count; ++i) {
- queues_.push_back(std::deque<T>());
- }
- }
-
- // Creates a copy of an existing head. Complexity O(MultiHeadQueue::size()).
- // `copy_index` - index of the queue that will be used as a source for
- // copying.
- void AddHead(size_t copy_index) { queues_.push_back(queues_[copy_index]); }
-
- // Add value to the end of the queue. Complexity O(readers_count).
- void PushBack(T value) {
- for (auto& queue : queues_) {
- queue.push_back(value);
- }
- }
-
- // Extract element from specified head. Complexity O(1).
- absl::optional<T> PopFront(size_t index) {
- RTC_CHECK_LT(index, queues_.size());
- if (queues_[index].empty()) {
- return absl::nullopt;
- }
- T out = queues_[index].front();
- queues_[index].pop_front();
- return out;
- }
-
- // Returns element at specified head. Complexity O(1).
- absl::optional<T> Front(size_t index) const {
- RTC_CHECK_LT(index, queues_.size());
- if (queues_[index].empty()) {
- return absl::nullopt;
- }
- return queues_[index].front();
- }
-
- // Returns true if for specified head there are no more elements in the queue
- // or false otherwise. Complexity O(1).
- bool IsEmpty(size_t index) const {
- RTC_CHECK_LT(index, queues_.size());
- return queues_[index].empty();
- }
-
- // Returns size of the longest queue between all readers.
- // Complexity O(readers_count).
- size_t size() const {
- size_t size = 0;
- for (auto& queue : queues_) {
- if (queue.size() > size) {
- size = queue.size();
- }
- }
- return size;
- }
-
- // Returns size of the specified queue. Complexity O(1).
- size_t size(size_t index) const {
- RTC_CHECK_LT(index, queues_.size());
- return queues_[index].size();
- }
-
- size_t readers_count() const { return queues_.size(); }
-
- private:
- std::vector<std::deque<T>> queues_;
-};
-
-} // namespace webrtc
-
-#endif // TEST_PC_E2E_ANALYZER_VIDEO_MULTI_HEAD_QUEUE_H_
diff --git a/test/pc/e2e/analyzer/video/multi_head_queue_test.cc b/test/pc/e2e/analyzer/video/multi_head_queue_test.cc
deleted file mode 100644
index 2aa6fd8..0000000
--- a/test/pc/e2e/analyzer/video/multi_head_queue_test.cc
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Copyright (c) 2020 The WebRTC project authors. All Rights Reserved.
- *
- * Use of this source code is governed by a BSD-style license
- * that can be found in the LICENSE file in the root of the source
- * tree. An additional intellectual property rights grant can be found
- * in the file PATENTS. All contributing project authors may
- * be found in the AUTHORS file in the root of the source tree.
- */
-
-#include "test/pc/e2e/analyzer/video/multi_head_queue.h"
-#include "absl/types/optional.h"
-#include "test/gtest.h"
-
-namespace webrtc {
-namespace {
-
-TEST(MultiHeadQueueTest, GetOnEmpty) {
- MultiHeadQueue<int> queue = MultiHeadQueue<int>(10);
- EXPECT_TRUE(queue.IsEmpty(0));
- for (int i = 0; i < 10; ++i) {
- EXPECT_FALSE(queue.PopFront(i).has_value());
- EXPECT_FALSE(queue.Front(i).has_value());
- }
-}
-
-TEST(MultiHeadQueueTest, SingleHeadOneAddOneRemove) {
- MultiHeadQueue<int> queue = MultiHeadQueue<int>(1);
- queue.PushBack(1);
- EXPECT_EQ(queue.size(), 1lu);
- EXPECT_TRUE(queue.Front(0).has_value());
- EXPECT_EQ(queue.Front(0).value(), 1);
- absl::optional<int> value = queue.PopFront(0);
- EXPECT_TRUE(value.has_value());
- EXPECT_EQ(value.value(), 1);
- EXPECT_EQ(queue.size(), 0lu);
- EXPECT_TRUE(queue.IsEmpty(0));
-}
-
-TEST(MultiHeadQueueTest, SingleHead) {
- MultiHeadQueue<size_t> queue = MultiHeadQueue<size_t>(1);
- for (size_t i = 0; i < 10; ++i) {
- queue.PushBack(i);
- EXPECT_EQ(queue.size(), i + 1);
- }
- for (size_t i = 0; i < 10; ++i) {
- absl::optional<size_t> value = queue.PopFront(0);
- EXPECT_EQ(queue.size(), 10 - i - 1);
- ASSERT_TRUE(value.has_value());
- EXPECT_EQ(value.value(), i);
- }
-}
-
-TEST(MultiHeadQueueTest, ThreeHeadsAddAllRemoveAllPerHead) {
- MultiHeadQueue<size_t> queue = MultiHeadQueue<size_t>(3);
- for (size_t i = 0; i < 10; ++i) {
- queue.PushBack(i);
- EXPECT_EQ(queue.size(), i + 1);
- }
- for (size_t i = 0; i < 10; ++i) {
- absl::optional<size_t> value = queue.PopFront(0);
- EXPECT_EQ(queue.size(), 10lu);
- ASSERT_TRUE(value.has_value());
- EXPECT_EQ(value.value(), i);
- }
- for (size_t i = 0; i < 10; ++i) {
- absl::optional<size_t> value = queue.PopFront(1);
- EXPECT_EQ(queue.size(), 10lu);
- ASSERT_TRUE(value.has_value());
- EXPECT_EQ(value.value(), i);
- }
- for (size_t i = 0; i < 10; ++i) {
- absl::optional<size_t> value = queue.PopFront(2);
- EXPECT_EQ(queue.size(), 10 - i - 1);
- ASSERT_TRUE(value.has_value());
- EXPECT_EQ(value.value(), i);
- }
-}
-
-TEST(MultiHeadQueueTest, ThreeHeadsAddAllRemoveAll) {
- MultiHeadQueue<size_t> queue = MultiHeadQueue<size_t>(3);
- for (size_t i = 0; i < 10; ++i) {
- queue.PushBack(i);
- EXPECT_EQ(queue.size(), i + 1);
- }
- for (size_t i = 0; i < 10; ++i) {
- absl::optional<size_t> value1 = queue.PopFront(0);
- absl::optional<size_t> value2 = queue.PopFront(1);
- absl::optional<size_t> value3 = queue.PopFront(2);
- EXPECT_EQ(queue.size(), 10 - i - 1);
- ASSERT_TRUE(value1.has_value());
- ASSERT_TRUE(value2.has_value());
- ASSERT_TRUE(value3.has_value());
- EXPECT_EQ(value1.value(), i);
- EXPECT_EQ(value2.value(), i);
- EXPECT_EQ(value3.value(), i);
- }
-}
-
-TEST(MultiHeadQueueTest, HeadCopy) {
- MultiHeadQueue<size_t> queue = MultiHeadQueue<size_t>(1);
- for (size_t i = 0; i < 10; ++i) {
- queue.PushBack(i);
- EXPECT_EQ(queue.size(), i + 1);
- }
- queue.AddHead(0);
- EXPECT_EQ(queue.readers_count(), 2u);
- for (size_t i = 0; i < 10; ++i) {
- absl::optional<size_t> value1 = queue.PopFront(0);
- absl::optional<size_t> value2 = queue.PopFront(1);
- EXPECT_EQ(queue.size(), 10 - i - 1);
- ASSERT_TRUE(value1.has_value());
- ASSERT_TRUE(value2.has_value());
- EXPECT_EQ(value1.value(), i);
- EXPECT_EQ(value2.value(), i);
- }
-}
-
-} // namespace
-} // namespace webrtc
diff --git a/test/pc/e2e/analyzer/video/multi_reader_queue.h b/test/pc/e2e/analyzer/video/multi_reader_queue.h
new file mode 100644
index 0000000..c8a7db0
--- /dev/null
+++ b/test/pc/e2e/analyzer/video/multi_reader_queue.h
@@ -0,0 +1,163 @@
+/*
+ * Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#ifndef TEST_PC_E2E_ANALYZER_VIDEO_MULTI_READER_QUEUE_H_
+#define TEST_PC_E2E_ANALYZER_VIDEO_MULTI_READER_QUEUE_H_
+
+#include <deque>
+#include <memory>
+#include <set>
+#include <unordered_map>
+
+#include "absl/types/optional.h"
+#include "rtc_base/checks.h"
+
+namespace webrtc {
+
+// Represents the queue which can be read by multiple readers. Each reader reads
+// from its own queue head. When an element is added it will become visible for
+// all readers. When an element will be removed by all the readers, the element
+// will be removed from the queue.
+template <typename T>
+class MultiReaderQueue {
+ public:
+ // Creates queue with exactly `readers_count` readers named from 0 to
+ // `readers_count - 1`.
+ explicit MultiReaderQueue(size_t readers_count) {
+ for (size_t i = 0; i < readers_count; ++i) {
+ heads_[i] = 0;
+ }
+ }
+ // Creates queue with specified readers.
+ explicit MultiReaderQueue(std::set<size_t> readers) {
+ for (size_t reader : readers) {
+ heads_[reader] = 0;
+ }
+ }
+
+ // Adds a new reader, initializing its reading position (the reader's head)
+ // equal to the one of `reader_to_copy`. New reader will have name index
+ // equal to the current readers count.
+ // Complexity O(MultiReaderQueue::size(reader_to_copy)).
+ void AddReader(size_t reader_to_copy) {
+ AddReader(heads_.size(), reader_to_copy);
+ }
+
+ // Adds a new `reader`, initializing its reading position (the reader's head)
+ // equal to the one of `reader_to_copy`.
+ // Complexity O(MultiReaderQueue::size(reader_to_copy)).
+ void AddReader(size_t reader, size_t reader_to_copy) {
+ size_t pos = GetHeadPositionOrDie(reader_to_copy);
+
+ auto it = heads_.find(reader);
+ RTC_CHECK(it == heads_.end())
+ << "Reader " << reader << " is already in the queue";
+ heads_[reader] = heads_[reader_to_copy];
+ for (size_t i = pos; i < queue_.size(); ++i) {
+ in_queues_[i]++;
+ }
+ }
+
+ // Removes specified `reader` from the queue.
+ // Complexity O(MultiReaderQueue::size(reader)).
+ void RemoveReader(size_t reader) {
+ size_t pos = GetHeadPositionOrDie(reader);
+ for (size_t i = pos; i < queue_.size(); ++i) {
+ in_queues_[i]--;
+ }
+ while (!in_queues_.empty() && in_queues_[0] == 0) {
+ PopFront();
+ }
+ heads_.erase(reader);
+ }
+
+ // Add value to the end of the queue. Complexity O(1).
+ void PushBack(T value) {
+ queue_.push_back(value);
+ in_queues_.push_back(heads_.size());
+ }
+
+ // Extract element from specified head. Complexity O(1).
+ absl::optional<T> PopFront(size_t reader) {
+ size_t pos = GetHeadPositionOrDie(reader);
+ if (pos >= queue_.size()) {
+ return absl::nullopt;
+ }
+
+ T out = queue_[pos];
+
+ in_queues_[pos]--;
+ heads_[reader]++;
+
+ if (in_queues_[pos] == 0) {
+ RTC_DCHECK_EQ(pos, 0);
+ PopFront();
+ }
+ return out;
+ }
+
+ // Returns element at specified head. Complexity O(1).
+ absl::optional<T> Front(size_t reader) const {
+ size_t pos = GetHeadPositionOrDie(reader);
+ if (pos >= queue_.size()) {
+ return absl::nullopt;
+ }
+ return queue_[pos];
+ }
+
+ // Returns true if for specified head there are no more elements in the queue
+ // or false otherwise. Complexity O(1).
+ bool IsEmpty(size_t reader) const {
+ size_t pos = GetHeadPositionOrDie(reader);
+ return pos >= queue_.size();
+ }
+
+ // Returns size of the longest queue between all readers.
+ // Complexity O(1).
+ size_t size() const { return queue_.size(); }
+
+ // Returns size of the specified queue. Complexity O(1).
+ size_t size(size_t reader) const {
+ size_t pos = GetHeadPositionOrDie(reader);
+ return queue_.size() - pos;
+ }
+
+ // Complexity O(1).
+ size_t readers_count() const { return heads_.size(); }
+
+ private:
+ size_t GetHeadPositionOrDie(size_t reader) const {
+ auto it = heads_.find(reader);
+ RTC_CHECK(it != heads_.end()) << "No queue for reader " << reader;
+ return it->second - removed_elements_count_;
+ }
+
+ void PopFront() {
+ RTC_DCHECK(!queue_.empty());
+ RTC_DCHECK_EQ(in_queues_[0], 0);
+ queue_.pop_front();
+ in_queues_.pop_front();
+ removed_elements_count_++;
+ }
+
+ // Number of the elements that were removed from the queue. It is used to
+ // subtract from each head to compute the right index inside `queue_`;
+ size_t removed_elements_count_ = 0;
+ std::deque<T> queue_;
+ // In how may queues the element at index `i` is. An element can be removed
+ // from the front if and only if it is in 0 queues.
+ std::deque<size_t> in_queues_;
+ // Map from the reader to the head position in the queue.
+ std::unordered_map<size_t, size_t> heads_;
+};
+
+} // namespace webrtc
+
+#endif // TEST_PC_E2E_ANALYZER_VIDEO_MULTI_READER_QUEUE_H_
diff --git a/test/pc/e2e/analyzer/video/multi_reader_queue_test.cc b/test/pc/e2e/analyzer/video/multi_reader_queue_test.cc
new file mode 100644
index 0000000..e41c4a8
--- /dev/null
+++ b/test/pc/e2e/analyzer/video/multi_reader_queue_test.cc
@@ -0,0 +1,178 @@
+/*
+ * Copyright (c) 2020 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "test/pc/e2e/analyzer/video/multi_reader_queue.h"
+
+#include "absl/types/optional.h"
+#include "test/gtest.h"
+
+namespace webrtc {
+namespace {
+
+TEST(MultiReaderQueueTest, EmptyQueueEmptyForAllHeads) {
+ MultiReaderQueue<int> queue = MultiReaderQueue<int>(10);
+ EXPECT_EQ(queue.size(), 0lu);
+ for (int i = 0; i < 10; ++i) {
+ EXPECT_TRUE(queue.IsEmpty(i));
+ EXPECT_EQ(queue.size(i), 0lu);
+ EXPECT_FALSE(queue.PopFront(i).has_value());
+ EXPECT_FALSE(queue.Front(i).has_value());
+ }
+}
+
+TEST(MultiReaderQueueTest, SizeIsEqualForAllHeadsAfterAddOnly) {
+ MultiReaderQueue<int> queue = MultiReaderQueue<int>(10);
+ queue.PushBack(1);
+ queue.PushBack(2);
+ queue.PushBack(3);
+ EXPECT_EQ(queue.size(), 3lu);
+ for (int i = 0; i < 10; ++i) {
+ EXPECT_FALSE(queue.IsEmpty(i));
+ EXPECT_EQ(queue.size(i), 3lu);
+ }
+}
+
+TEST(MultiReaderQueueTest, SizeIsCorrectAfterRemoveFromOnlyOneHead) {
+ MultiReaderQueue<int> queue = MultiReaderQueue<int>(10);
+ for (int i = 0; i < 5; ++i) {
+ queue.PushBack(i);
+ }
+ EXPECT_EQ(queue.size(), 5lu);
+ // Removing elements from queue #0
+ for (int i = 0; i < 5; ++i) {
+ EXPECT_EQ(queue.size(0), static_cast<size_t>(5 - i));
+ EXPECT_EQ(queue.PopFront(0), absl::optional<int>(i));
+ for (int j = 1; j < 10; ++j) {
+ EXPECT_EQ(queue.size(j), 5lu);
+ }
+ }
+ EXPECT_EQ(queue.size(0), 0lu);
+ EXPECT_TRUE(queue.IsEmpty(0));
+}
+
+TEST(MultiReaderQueueTest, SingleHeadOneAddOneRemove) {
+ MultiReaderQueue<int> queue = MultiReaderQueue<int>(1);
+ queue.PushBack(1);
+ EXPECT_EQ(queue.size(), 1lu);
+ EXPECT_TRUE(queue.Front(0).has_value());
+ EXPECT_EQ(queue.Front(0).value(), 1);
+ absl::optional<int> value = queue.PopFront(0);
+ EXPECT_TRUE(value.has_value());
+ EXPECT_EQ(value.value(), 1);
+ EXPECT_EQ(queue.size(), 0lu);
+ EXPECT_TRUE(queue.IsEmpty(0));
+}
+
+TEST(MultiReaderQueueTest, SingleHead) {
+ MultiReaderQueue<size_t> queue = MultiReaderQueue<size_t>(1);
+ for (size_t i = 0; i < 10; ++i) {
+ queue.PushBack(i);
+ EXPECT_EQ(queue.size(), i + 1);
+ }
+ for (size_t i = 0; i < 10; ++i) {
+ EXPECT_EQ(queue.Front(0), absl::optional<size_t>(i));
+ EXPECT_EQ(queue.PopFront(0), absl::optional<size_t>(i));
+ EXPECT_EQ(queue.size(), 10 - i - 1);
+ }
+}
+
+TEST(MultiReaderQueueTest, ThreeHeadsAddAllRemoveAllPerHead) {
+ MultiReaderQueue<size_t> queue = MultiReaderQueue<size_t>(3);
+ for (size_t i = 0; i < 10; ++i) {
+ queue.PushBack(i);
+ EXPECT_EQ(queue.size(), i + 1);
+ }
+ for (size_t i = 0; i < 10; ++i) {
+ absl::optional<size_t> value = queue.PopFront(0);
+ EXPECT_EQ(queue.size(), 10lu);
+ ASSERT_TRUE(value.has_value());
+ EXPECT_EQ(value.value(), i);
+ }
+ for (size_t i = 0; i < 10; ++i) {
+ absl::optional<size_t> value = queue.PopFront(1);
+ EXPECT_EQ(queue.size(), 10lu);
+ ASSERT_TRUE(value.has_value());
+ EXPECT_EQ(value.value(), i);
+ }
+ for (size_t i = 0; i < 10; ++i) {
+ absl::optional<size_t> value = queue.PopFront(2);
+ EXPECT_EQ(queue.size(), 10 - i - 1);
+ ASSERT_TRUE(value.has_value());
+ EXPECT_EQ(value.value(), i);
+ }
+}
+
+TEST(MultiReaderQueueTest, ThreeHeadsAddAllRemoveAll) {
+ MultiReaderQueue<size_t> queue = MultiReaderQueue<size_t>(3);
+ for (size_t i = 0; i < 10; ++i) {
+ queue.PushBack(i);
+ EXPECT_EQ(queue.size(), i + 1);
+ }
+ for (size_t i = 0; i < 10; ++i) {
+ absl::optional<size_t> value1 = queue.PopFront(0);
+ absl::optional<size_t> value2 = queue.PopFront(1);
+ absl::optional<size_t> value3 = queue.PopFront(2);
+ EXPECT_EQ(queue.size(), 10 - i - 1);
+ ASSERT_TRUE(value1.has_value());
+ ASSERT_TRUE(value2.has_value());
+ ASSERT_TRUE(value3.has_value());
+ EXPECT_EQ(value1.value(), i);
+ EXPECT_EQ(value2.value(), i);
+ EXPECT_EQ(value3.value(), i);
+ }
+}
+
+TEST(MultiReaderQueueTest, AddReader) {
+ MultiReaderQueue<size_t> queue = MultiReaderQueue<size_t>(1);
+ for (size_t i = 0; i < 10; ++i) {
+ queue.PushBack(i);
+ EXPECT_EQ(queue.size(), i + 1);
+ }
+ queue.AddReader(0);
+ EXPECT_EQ(queue.readers_count(), 2lu);
+ for (size_t i = 0; i < 10; ++i) {
+ absl::optional<size_t> value1 = queue.PopFront(0);
+ absl::optional<size_t> value2 = queue.PopFront(1);
+ EXPECT_EQ(queue.size(), 10 - i - 1);
+ ASSERT_TRUE(value1.has_value());
+ ASSERT_TRUE(value2.has_value());
+ EXPECT_EQ(value1.value(), i);
+ EXPECT_EQ(value2.value(), i);
+ }
+}
+
+TEST(MultiReaderQueueTest, RemoveReaderWontChangeOthers) {
+ MultiReaderQueue<size_t> queue = MultiReaderQueue<size_t>(2);
+ for (size_t i = 0; i < 10; ++i) {
+ queue.PushBack(i);
+ }
+ EXPECT_EQ(queue.size(1), 10lu);
+
+ queue.RemoveReader(0);
+
+ EXPECT_EQ(queue.readers_count(), 1lu);
+ EXPECT_EQ(queue.size(1), 10lu);
+}
+
+TEST(MultiReaderQueueTest, RemoveLastReaderMakesQueueEmpty) {
+ MultiReaderQueue<size_t> queue = MultiReaderQueue<size_t>(1);
+ for (size_t i = 0; i < 10; ++i) {
+ queue.PushBack(i);
+ }
+ EXPECT_EQ(queue.size(), 10lu);
+
+ queue.RemoveReader(0);
+
+ EXPECT_EQ(queue.size(), 0lu);
+ EXPECT_EQ(queue.readers_count(), 0lu);
+}
+
+} // namespace
+} // namespace webrtc