[DVQA] Rewrite MultiHeadQueue and add ability to remove head Rewrite MultiHeadQueue reducing space complexity from O(readers count * queue size) to O(queue size + readers count). Bug: b/231397778 Change-Id: Ifbd9c686915368773916ed86467f4de3f8e06af1 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/265621 Commit-Queue: Artem Titov <titovartem@webrtc.org> Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org> Cr-Commit-Position: refs/heads/main@{#37197}
diff --git a/test/pc/e2e/BUILD.gn b/test/pc/e2e/BUILD.gn index 43bd800..12db7fb 100644 --- a/test/pc/e2e/BUILD.gn +++ b/test/pc/e2e/BUILD.gn
@@ -36,7 +36,7 @@ deps = [ ":default_video_quality_analyzer_frames_comparator_test", ":default_video_quality_analyzer_test", - ":multi_head_queue_test", + ":multi_reader_queue_test", ":names_collection_test", ":peer_connection_e2e_smoke_test", ":single_process_encoded_image_data_injector_unittest", @@ -585,11 +585,11 @@ ] } - rtc_library("multi_head_queue_test") { + rtc_library("multi_reader_queue_test") { testonly = true - sources = [ "analyzer/video/multi_head_queue_test.cc" ] + sources = [ "analyzer/video/multi_reader_queue_test.cc" ] deps = [ - ":multi_head_queue", + ":multi_reader_queue", "../../../test:test_support", ] absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ] @@ -692,7 +692,6 @@ deps = [ ":default_video_quality_analyzer_internal", ":default_video_quality_analyzer_shared", - ":multi_head_queue", "../..:perf_test", "../../../api:array_view", "../../../api:video_quality_analyzer_api", @@ -748,7 +747,7 @@ deps = [ ":default_video_quality_analyzer_shared", - ":multi_head_queue", + ":multi_reader_queue", "../../../api:array_view", "../../../api:scoped_refptr", "../../../api/numerics:numerics", @@ -895,10 +894,10 @@ ] } - rtc_library("multi_head_queue") { + rtc_library("multi_reader_queue") { visibility = [ "*" ] testonly = true - sources = [ "analyzer/video/multi_head_queue.h" ] + sources = [ "analyzer/video/multi_reader_queue.h" ] deps = [ "../../../rtc_base:checks" ] absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ] }
diff --git a/test/pc/e2e/analyzer/video/default_video_quality_analyzer.h b/test/pc/e2e/analyzer/video/default_video_quality_analyzer.h index 6d4818f..188588b 100644 --- a/test/pc/e2e/analyzer/video/default_video_quality_analyzer.h +++ b/test/pc/e2e/analyzer/video/default_video_quality_analyzer.h
@@ -39,7 +39,6 @@ #include "test/pc/e2e/analyzer/video/default_video_quality_analyzer_internal_shared_objects.h" #include "test/pc/e2e/analyzer/video/default_video_quality_analyzer_shared_objects.h" #include "test/pc/e2e/analyzer/video/default_video_quality_analyzer_stream_state.h" -#include "test/pc/e2e/analyzer/video/multi_head_queue.h" #include "test/pc/e2e/analyzer/video/names_collection.h" #include "test/testsupport/perf_test.h"
diff --git a/test/pc/e2e/analyzer/video/default_video_quality_analyzer_stream_state.h b/test/pc/e2e/analyzer/video/default_video_quality_analyzer_stream_state.h index c127dd1..8cf41a3 100644 --- a/test/pc/e2e/analyzer/video/default_video_quality_analyzer_stream_state.h +++ b/test/pc/e2e/analyzer/video/default_video_quality_analyzer_stream_state.h
@@ -15,7 +15,7 @@ #include "absl/types/optional.h" #include "api/units/timestamp.h" -#include "test/pc/e2e/analyzer/video/multi_head_queue.h" +#include "test/pc/e2e/analyzer/video/multi_reader_queue.h" namespace webrtc { @@ -50,7 +50,7 @@ // When new peer is added - all current alive frames will be sent to it as // well. So we need to register them as expected by copying owner_ head to // the new head. - void AddPeer() { frame_ids_.AddHead(GetAliveFramesQueueIndex()); } + void AddPeer() { frame_ids_.AddReader(GetAliveFramesQueueIndex()); } size_t GetAliveFramesCount() const { return frame_ids_.size(GetAliveFramesQueueIndex()); @@ -86,7 +86,7 @@ // If we received frame with id frame_id3, then we will pop frame_id1 and // frame_id2 and consider those frames as dropped and then compare received // frame with the one from `FrameInFlight` with id frame_id3. - MultiHeadQueue<uint16_t> frame_ids_; + MultiReaderQueue<uint16_t> frame_ids_; std::map<size_t, Timestamp> last_rendered_frame_time_; };
diff --git a/test/pc/e2e/analyzer/video/multi_head_queue.h b/test/pc/e2e/analyzer/video/multi_head_queue.h deleted file mode 100644 index eef862b..0000000 --- a/test/pc/e2e/analyzer/video/multi_head_queue.h +++ /dev/null
@@ -1,102 +0,0 @@ -/* - * Copyright (c) 2019 The WebRTC project authors. All Rights Reserved. - * - * Use of this source code is governed by a BSD-style license - * that can be found in the LICENSE file in the root of the source - * tree. An additional intellectual property rights grant can be found - * in the file PATENTS. All contributing project authors may - * be found in the AUTHORS file in the root of the source tree. - */ - -#ifndef TEST_PC_E2E_ANALYZER_VIDEO_MULTI_HEAD_QUEUE_H_ -#define TEST_PC_E2E_ANALYZER_VIDEO_MULTI_HEAD_QUEUE_H_ - -#include <deque> -#include <memory> -#include <vector> - -#include "absl/types/optional.h" -#include "rtc_base/checks.h" - -namespace webrtc { - -// A queue that allows more than one reader. Readers are independent, and all -// readers will see all elements; an inserted element stays in the queue until -// all readers have extracted it. Elements are copied and copying is assumed to -// be cheap. -template <typename T> -class MultiHeadQueue { - public: - // Creates queue with exactly `readers_count` readers. - explicit MultiHeadQueue(size_t readers_count) { - for (size_t i = 0; i < readers_count; ++i) { - queues_.push_back(std::deque<T>()); - } - } - - // Creates a copy of an existing head. Complexity O(MultiHeadQueue::size()). - // `copy_index` - index of the queue that will be used as a source for - // copying. - void AddHead(size_t copy_index) { queues_.push_back(queues_[copy_index]); } - - // Add value to the end of the queue. Complexity O(readers_count). - void PushBack(T value) { - for (auto& queue : queues_) { - queue.push_back(value); - } - } - - // Extract element from specified head. Complexity O(1). - absl::optional<T> PopFront(size_t index) { - RTC_CHECK_LT(index, queues_.size()); - if (queues_[index].empty()) { - return absl::nullopt; - } - T out = queues_[index].front(); - queues_[index].pop_front(); - return out; - } - - // Returns element at specified head. Complexity O(1). - absl::optional<T> Front(size_t index) const { - RTC_CHECK_LT(index, queues_.size()); - if (queues_[index].empty()) { - return absl::nullopt; - } - return queues_[index].front(); - } - - // Returns true if for specified head there are no more elements in the queue - // or false otherwise. Complexity O(1). - bool IsEmpty(size_t index) const { - RTC_CHECK_LT(index, queues_.size()); - return queues_[index].empty(); - } - - // Returns size of the longest queue between all readers. - // Complexity O(readers_count). - size_t size() const { - size_t size = 0; - for (auto& queue : queues_) { - if (queue.size() > size) { - size = queue.size(); - } - } - return size; - } - - // Returns size of the specified queue. Complexity O(1). - size_t size(size_t index) const { - RTC_CHECK_LT(index, queues_.size()); - return queues_[index].size(); - } - - size_t readers_count() const { return queues_.size(); } - - private: - std::vector<std::deque<T>> queues_; -}; - -} // namespace webrtc - -#endif // TEST_PC_E2E_ANALYZER_VIDEO_MULTI_HEAD_QUEUE_H_
diff --git a/test/pc/e2e/analyzer/video/multi_head_queue_test.cc b/test/pc/e2e/analyzer/video/multi_head_queue_test.cc deleted file mode 100644 index 2aa6fd8..0000000 --- a/test/pc/e2e/analyzer/video/multi_head_queue_test.cc +++ /dev/null
@@ -1,120 +0,0 @@ -/* - * Copyright (c) 2020 The WebRTC project authors. All Rights Reserved. - * - * Use of this source code is governed by a BSD-style license - * that can be found in the LICENSE file in the root of the source - * tree. An additional intellectual property rights grant can be found - * in the file PATENTS. All contributing project authors may - * be found in the AUTHORS file in the root of the source tree. - */ - -#include "test/pc/e2e/analyzer/video/multi_head_queue.h" -#include "absl/types/optional.h" -#include "test/gtest.h" - -namespace webrtc { -namespace { - -TEST(MultiHeadQueueTest, GetOnEmpty) { - MultiHeadQueue<int> queue = MultiHeadQueue<int>(10); - EXPECT_TRUE(queue.IsEmpty(0)); - for (int i = 0; i < 10; ++i) { - EXPECT_FALSE(queue.PopFront(i).has_value()); - EXPECT_FALSE(queue.Front(i).has_value()); - } -} - -TEST(MultiHeadQueueTest, SingleHeadOneAddOneRemove) { - MultiHeadQueue<int> queue = MultiHeadQueue<int>(1); - queue.PushBack(1); - EXPECT_EQ(queue.size(), 1lu); - EXPECT_TRUE(queue.Front(0).has_value()); - EXPECT_EQ(queue.Front(0).value(), 1); - absl::optional<int> value = queue.PopFront(0); - EXPECT_TRUE(value.has_value()); - EXPECT_EQ(value.value(), 1); - EXPECT_EQ(queue.size(), 0lu); - EXPECT_TRUE(queue.IsEmpty(0)); -} - -TEST(MultiHeadQueueTest, SingleHead) { - MultiHeadQueue<size_t> queue = MultiHeadQueue<size_t>(1); - for (size_t i = 0; i < 10; ++i) { - queue.PushBack(i); - EXPECT_EQ(queue.size(), i + 1); - } - for (size_t i = 0; i < 10; ++i) { - absl::optional<size_t> value = queue.PopFront(0); - EXPECT_EQ(queue.size(), 10 - i - 1); - ASSERT_TRUE(value.has_value()); - EXPECT_EQ(value.value(), i); - } -} - -TEST(MultiHeadQueueTest, ThreeHeadsAddAllRemoveAllPerHead) { - MultiHeadQueue<size_t> queue = MultiHeadQueue<size_t>(3); - for (size_t i = 0; i < 10; ++i) { - queue.PushBack(i); - EXPECT_EQ(queue.size(), i + 1); - } - for (size_t i = 0; i < 10; ++i) { - absl::optional<size_t> value = queue.PopFront(0); - EXPECT_EQ(queue.size(), 10lu); - ASSERT_TRUE(value.has_value()); - EXPECT_EQ(value.value(), i); - } - for (size_t i = 0; i < 10; ++i) { - absl::optional<size_t> value = queue.PopFront(1); - EXPECT_EQ(queue.size(), 10lu); - ASSERT_TRUE(value.has_value()); - EXPECT_EQ(value.value(), i); - } - for (size_t i = 0; i < 10; ++i) { - absl::optional<size_t> value = queue.PopFront(2); - EXPECT_EQ(queue.size(), 10 - i - 1); - ASSERT_TRUE(value.has_value()); - EXPECT_EQ(value.value(), i); - } -} - -TEST(MultiHeadQueueTest, ThreeHeadsAddAllRemoveAll) { - MultiHeadQueue<size_t> queue = MultiHeadQueue<size_t>(3); - for (size_t i = 0; i < 10; ++i) { - queue.PushBack(i); - EXPECT_EQ(queue.size(), i + 1); - } - for (size_t i = 0; i < 10; ++i) { - absl::optional<size_t> value1 = queue.PopFront(0); - absl::optional<size_t> value2 = queue.PopFront(1); - absl::optional<size_t> value3 = queue.PopFront(2); - EXPECT_EQ(queue.size(), 10 - i - 1); - ASSERT_TRUE(value1.has_value()); - ASSERT_TRUE(value2.has_value()); - ASSERT_TRUE(value3.has_value()); - EXPECT_EQ(value1.value(), i); - EXPECT_EQ(value2.value(), i); - EXPECT_EQ(value3.value(), i); - } -} - -TEST(MultiHeadQueueTest, HeadCopy) { - MultiHeadQueue<size_t> queue = MultiHeadQueue<size_t>(1); - for (size_t i = 0; i < 10; ++i) { - queue.PushBack(i); - EXPECT_EQ(queue.size(), i + 1); - } - queue.AddHead(0); - EXPECT_EQ(queue.readers_count(), 2u); - for (size_t i = 0; i < 10; ++i) { - absl::optional<size_t> value1 = queue.PopFront(0); - absl::optional<size_t> value2 = queue.PopFront(1); - EXPECT_EQ(queue.size(), 10 - i - 1); - ASSERT_TRUE(value1.has_value()); - ASSERT_TRUE(value2.has_value()); - EXPECT_EQ(value1.value(), i); - EXPECT_EQ(value2.value(), i); - } -} - -} // namespace -} // namespace webrtc
diff --git a/test/pc/e2e/analyzer/video/multi_reader_queue.h b/test/pc/e2e/analyzer/video/multi_reader_queue.h new file mode 100644 index 0000000..c8a7db0 --- /dev/null +++ b/test/pc/e2e/analyzer/video/multi_reader_queue.h
@@ -0,0 +1,163 @@ +/* + * Copyright (c) 2019 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef TEST_PC_E2E_ANALYZER_VIDEO_MULTI_READER_QUEUE_H_ +#define TEST_PC_E2E_ANALYZER_VIDEO_MULTI_READER_QUEUE_H_ + +#include <deque> +#include <memory> +#include <set> +#include <unordered_map> + +#include "absl/types/optional.h" +#include "rtc_base/checks.h" + +namespace webrtc { + +// Represents the queue which can be read by multiple readers. Each reader reads +// from its own queue head. When an element is added it will become visible for +// all readers. When an element will be removed by all the readers, the element +// will be removed from the queue. +template <typename T> +class MultiReaderQueue { + public: + // Creates queue with exactly `readers_count` readers named from 0 to + // `readers_count - 1`. + explicit MultiReaderQueue(size_t readers_count) { + for (size_t i = 0; i < readers_count; ++i) { + heads_[i] = 0; + } + } + // Creates queue with specified readers. + explicit MultiReaderQueue(std::set<size_t> readers) { + for (size_t reader : readers) { + heads_[reader] = 0; + } + } + + // Adds a new reader, initializing its reading position (the reader's head) + // equal to the one of `reader_to_copy`. New reader will have name index + // equal to the current readers count. + // Complexity O(MultiReaderQueue::size(reader_to_copy)). + void AddReader(size_t reader_to_copy) { + AddReader(heads_.size(), reader_to_copy); + } + + // Adds a new `reader`, initializing its reading position (the reader's head) + // equal to the one of `reader_to_copy`. + // Complexity O(MultiReaderQueue::size(reader_to_copy)). + void AddReader(size_t reader, size_t reader_to_copy) { + size_t pos = GetHeadPositionOrDie(reader_to_copy); + + auto it = heads_.find(reader); + RTC_CHECK(it == heads_.end()) + << "Reader " << reader << " is already in the queue"; + heads_[reader] = heads_[reader_to_copy]; + for (size_t i = pos; i < queue_.size(); ++i) { + in_queues_[i]++; + } + } + + // Removes specified `reader` from the queue. + // Complexity O(MultiReaderQueue::size(reader)). + void RemoveReader(size_t reader) { + size_t pos = GetHeadPositionOrDie(reader); + for (size_t i = pos; i < queue_.size(); ++i) { + in_queues_[i]--; + } + while (!in_queues_.empty() && in_queues_[0] == 0) { + PopFront(); + } + heads_.erase(reader); + } + + // Add value to the end of the queue. Complexity O(1). + void PushBack(T value) { + queue_.push_back(value); + in_queues_.push_back(heads_.size()); + } + + // Extract element from specified head. Complexity O(1). + absl::optional<T> PopFront(size_t reader) { + size_t pos = GetHeadPositionOrDie(reader); + if (pos >= queue_.size()) { + return absl::nullopt; + } + + T out = queue_[pos]; + + in_queues_[pos]--; + heads_[reader]++; + + if (in_queues_[pos] == 0) { + RTC_DCHECK_EQ(pos, 0); + PopFront(); + } + return out; + } + + // Returns element at specified head. Complexity O(1). + absl::optional<T> Front(size_t reader) const { + size_t pos = GetHeadPositionOrDie(reader); + if (pos >= queue_.size()) { + return absl::nullopt; + } + return queue_[pos]; + } + + // Returns true if for specified head there are no more elements in the queue + // or false otherwise. Complexity O(1). + bool IsEmpty(size_t reader) const { + size_t pos = GetHeadPositionOrDie(reader); + return pos >= queue_.size(); + } + + // Returns size of the longest queue between all readers. + // Complexity O(1). + size_t size() const { return queue_.size(); } + + // Returns size of the specified queue. Complexity O(1). + size_t size(size_t reader) const { + size_t pos = GetHeadPositionOrDie(reader); + return queue_.size() - pos; + } + + // Complexity O(1). + size_t readers_count() const { return heads_.size(); } + + private: + size_t GetHeadPositionOrDie(size_t reader) const { + auto it = heads_.find(reader); + RTC_CHECK(it != heads_.end()) << "No queue for reader " << reader; + return it->second - removed_elements_count_; + } + + void PopFront() { + RTC_DCHECK(!queue_.empty()); + RTC_DCHECK_EQ(in_queues_[0], 0); + queue_.pop_front(); + in_queues_.pop_front(); + removed_elements_count_++; + } + + // Number of the elements that were removed from the queue. It is used to + // subtract from each head to compute the right index inside `queue_`; + size_t removed_elements_count_ = 0; + std::deque<T> queue_; + // In how may queues the element at index `i` is. An element can be removed + // from the front if and only if it is in 0 queues. + std::deque<size_t> in_queues_; + // Map from the reader to the head position in the queue. + std::unordered_map<size_t, size_t> heads_; +}; + +} // namespace webrtc + +#endif // TEST_PC_E2E_ANALYZER_VIDEO_MULTI_READER_QUEUE_H_
diff --git a/test/pc/e2e/analyzer/video/multi_reader_queue_test.cc b/test/pc/e2e/analyzer/video/multi_reader_queue_test.cc new file mode 100644 index 0000000..e41c4a8 --- /dev/null +++ b/test/pc/e2e/analyzer/video/multi_reader_queue_test.cc
@@ -0,0 +1,178 @@ +/* + * Copyright (c) 2020 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "test/pc/e2e/analyzer/video/multi_reader_queue.h" + +#include "absl/types/optional.h" +#include "test/gtest.h" + +namespace webrtc { +namespace { + +TEST(MultiReaderQueueTest, EmptyQueueEmptyForAllHeads) { + MultiReaderQueue<int> queue = MultiReaderQueue<int>(10); + EXPECT_EQ(queue.size(), 0lu); + for (int i = 0; i < 10; ++i) { + EXPECT_TRUE(queue.IsEmpty(i)); + EXPECT_EQ(queue.size(i), 0lu); + EXPECT_FALSE(queue.PopFront(i).has_value()); + EXPECT_FALSE(queue.Front(i).has_value()); + } +} + +TEST(MultiReaderQueueTest, SizeIsEqualForAllHeadsAfterAddOnly) { + MultiReaderQueue<int> queue = MultiReaderQueue<int>(10); + queue.PushBack(1); + queue.PushBack(2); + queue.PushBack(3); + EXPECT_EQ(queue.size(), 3lu); + for (int i = 0; i < 10; ++i) { + EXPECT_FALSE(queue.IsEmpty(i)); + EXPECT_EQ(queue.size(i), 3lu); + } +} + +TEST(MultiReaderQueueTest, SizeIsCorrectAfterRemoveFromOnlyOneHead) { + MultiReaderQueue<int> queue = MultiReaderQueue<int>(10); + for (int i = 0; i < 5; ++i) { + queue.PushBack(i); + } + EXPECT_EQ(queue.size(), 5lu); + // Removing elements from queue #0 + for (int i = 0; i < 5; ++i) { + EXPECT_EQ(queue.size(0), static_cast<size_t>(5 - i)); + EXPECT_EQ(queue.PopFront(0), absl::optional<int>(i)); + for (int j = 1; j < 10; ++j) { + EXPECT_EQ(queue.size(j), 5lu); + } + } + EXPECT_EQ(queue.size(0), 0lu); + EXPECT_TRUE(queue.IsEmpty(0)); +} + +TEST(MultiReaderQueueTest, SingleHeadOneAddOneRemove) { + MultiReaderQueue<int> queue = MultiReaderQueue<int>(1); + queue.PushBack(1); + EXPECT_EQ(queue.size(), 1lu); + EXPECT_TRUE(queue.Front(0).has_value()); + EXPECT_EQ(queue.Front(0).value(), 1); + absl::optional<int> value = queue.PopFront(0); + EXPECT_TRUE(value.has_value()); + EXPECT_EQ(value.value(), 1); + EXPECT_EQ(queue.size(), 0lu); + EXPECT_TRUE(queue.IsEmpty(0)); +} + +TEST(MultiReaderQueueTest, SingleHead) { + MultiReaderQueue<size_t> queue = MultiReaderQueue<size_t>(1); + for (size_t i = 0; i < 10; ++i) { + queue.PushBack(i); + EXPECT_EQ(queue.size(), i + 1); + } + for (size_t i = 0; i < 10; ++i) { + EXPECT_EQ(queue.Front(0), absl::optional<size_t>(i)); + EXPECT_EQ(queue.PopFront(0), absl::optional<size_t>(i)); + EXPECT_EQ(queue.size(), 10 - i - 1); + } +} + +TEST(MultiReaderQueueTest, ThreeHeadsAddAllRemoveAllPerHead) { + MultiReaderQueue<size_t> queue = MultiReaderQueue<size_t>(3); + for (size_t i = 0; i < 10; ++i) { + queue.PushBack(i); + EXPECT_EQ(queue.size(), i + 1); + } + for (size_t i = 0; i < 10; ++i) { + absl::optional<size_t> value = queue.PopFront(0); + EXPECT_EQ(queue.size(), 10lu); + ASSERT_TRUE(value.has_value()); + EXPECT_EQ(value.value(), i); + } + for (size_t i = 0; i < 10; ++i) { + absl::optional<size_t> value = queue.PopFront(1); + EXPECT_EQ(queue.size(), 10lu); + ASSERT_TRUE(value.has_value()); + EXPECT_EQ(value.value(), i); + } + for (size_t i = 0; i < 10; ++i) { + absl::optional<size_t> value = queue.PopFront(2); + EXPECT_EQ(queue.size(), 10 - i - 1); + ASSERT_TRUE(value.has_value()); + EXPECT_EQ(value.value(), i); + } +} + +TEST(MultiReaderQueueTest, ThreeHeadsAddAllRemoveAll) { + MultiReaderQueue<size_t> queue = MultiReaderQueue<size_t>(3); + for (size_t i = 0; i < 10; ++i) { + queue.PushBack(i); + EXPECT_EQ(queue.size(), i + 1); + } + for (size_t i = 0; i < 10; ++i) { + absl::optional<size_t> value1 = queue.PopFront(0); + absl::optional<size_t> value2 = queue.PopFront(1); + absl::optional<size_t> value3 = queue.PopFront(2); + EXPECT_EQ(queue.size(), 10 - i - 1); + ASSERT_TRUE(value1.has_value()); + ASSERT_TRUE(value2.has_value()); + ASSERT_TRUE(value3.has_value()); + EXPECT_EQ(value1.value(), i); + EXPECT_EQ(value2.value(), i); + EXPECT_EQ(value3.value(), i); + } +} + +TEST(MultiReaderQueueTest, AddReader) { + MultiReaderQueue<size_t> queue = MultiReaderQueue<size_t>(1); + for (size_t i = 0; i < 10; ++i) { + queue.PushBack(i); + EXPECT_EQ(queue.size(), i + 1); + } + queue.AddReader(0); + EXPECT_EQ(queue.readers_count(), 2lu); + for (size_t i = 0; i < 10; ++i) { + absl::optional<size_t> value1 = queue.PopFront(0); + absl::optional<size_t> value2 = queue.PopFront(1); + EXPECT_EQ(queue.size(), 10 - i - 1); + ASSERT_TRUE(value1.has_value()); + ASSERT_TRUE(value2.has_value()); + EXPECT_EQ(value1.value(), i); + EXPECT_EQ(value2.value(), i); + } +} + +TEST(MultiReaderQueueTest, RemoveReaderWontChangeOthers) { + MultiReaderQueue<size_t> queue = MultiReaderQueue<size_t>(2); + for (size_t i = 0; i < 10; ++i) { + queue.PushBack(i); + } + EXPECT_EQ(queue.size(1), 10lu); + + queue.RemoveReader(0); + + EXPECT_EQ(queue.readers_count(), 1lu); + EXPECT_EQ(queue.size(1), 10lu); +} + +TEST(MultiReaderQueueTest, RemoveLastReaderMakesQueueEmpty) { + MultiReaderQueue<size_t> queue = MultiReaderQueue<size_t>(1); + for (size_t i = 0; i < 10; ++i) { + queue.PushBack(i); + } + EXPECT_EQ(queue.size(), 10lu); + + queue.RemoveReader(0); + + EXPECT_EQ(queue.size(), 0lu); + EXPECT_EQ(queue.readers_count(), 0lu); +} + +} // namespace +} // namespace webrtc