| /* |
| * Copyright (c) 2019 The WebRTC project authors. All Rights Reserved. |
| * |
| * Use of this source code is governed by a BSD-style license |
| * that can be found in the LICENSE file in the root of the source |
| * tree. An additional intellectual property rights grant can be found |
| * in the file PATENTS. All contributing project authors may |
| * be found in the AUTHORS file in the root of the source tree. |
| */ |
| |
| #ifndef TEST_PC_E2E_ANALYZER_VIDEO_MULTI_READER_QUEUE_H_ |
| #define TEST_PC_E2E_ANALYZER_VIDEO_MULTI_READER_QUEUE_H_ |
| |
| #include <deque> |
| #include <memory> |
| #include <set> |
| #include <unordered_map> |
| |
| #include "absl/types/optional.h" |
| #include "rtc_base/checks.h" |
| |
| namespace webrtc { |
| |
| // Represents the queue which can be read by multiple readers. Each reader reads |
| // from its own queue head. When an element is added it will become visible for |
| // all readers. When an element will be removed by all the readers, the element |
| // will be removed from the queue. |
| template <typename T> |
| class MultiReaderQueue { |
| public: |
| // Creates queue with exactly `readers_count` readers named from 0 to |
| // `readers_count - 1`. |
| explicit MultiReaderQueue(size_t readers_count) { |
| for (size_t i = 0; i < readers_count; ++i) { |
| heads_[i] = 0; |
| } |
| } |
| // Creates queue with specified readers. |
| explicit MultiReaderQueue(std::set<size_t> readers) { |
| for (size_t reader : readers) { |
| heads_[reader] = 0; |
| } |
| } |
| |
| // Adds a new `reader`, initializing its reading position (the reader's head) |
| // equal to the one of `reader_to_copy`. |
| // Complexity O(MultiReaderQueue::size(reader_to_copy)). |
| void AddReader(size_t reader, size_t reader_to_copy) { |
| size_t pos = GetHeadPositionOrDie(reader_to_copy); |
| |
| auto it = heads_.find(reader); |
| RTC_CHECK(it == heads_.end()) |
| << "Reader " << reader << " is already in the queue"; |
| heads_[reader] = heads_[reader_to_copy]; |
| for (size_t i = pos; i < queue_.size(); ++i) { |
| in_queues_[i]++; |
| } |
| } |
| |
| // Adds a new `reader`, initializing its reading position equal to first |
| // element in the queue. |
| // Complexity O(MultiReaderQueue::size()). |
| void AddReader(size_t reader) { |
| auto it = heads_.find(reader); |
| RTC_CHECK(it == heads_.end()) |
| << "Reader " << reader << " is already in the queue"; |
| heads_[reader] = removed_elements_count_; |
| for (size_t i = 0; i < queue_.size(); ++i) { |
| in_queues_[i]++; |
| } |
| } |
| |
| // Removes specified `reader` from the queue. |
| // Complexity O(MultiReaderQueue::size(reader)). |
| void RemoveReader(size_t reader) { |
| size_t pos = GetHeadPositionOrDie(reader); |
| for (size_t i = pos; i < queue_.size(); ++i) { |
| in_queues_[i]--; |
| } |
| while (!in_queues_.empty() && in_queues_[0] == 0) { |
| PopFront(); |
| } |
| heads_.erase(reader); |
| } |
| |
| // Add value to the end of the queue. Complexity O(1). |
| void PushBack(T value) { |
| queue_.push_back(value); |
| in_queues_.push_back(heads_.size()); |
| } |
| |
| // Extract element from specified head. Complexity O(1). |
| absl::optional<T> PopFront(size_t reader) { |
| size_t pos = GetHeadPositionOrDie(reader); |
| if (pos >= queue_.size()) { |
| return absl::nullopt; |
| } |
| |
| T out = queue_[pos]; |
| |
| in_queues_[pos]--; |
| heads_[reader]++; |
| |
| if (in_queues_[pos] == 0) { |
| RTC_DCHECK_EQ(pos, 0); |
| PopFront(); |
| } |
| return out; |
| } |
| |
| // Returns element at specified head. Complexity O(1). |
| absl::optional<T> Front(size_t reader) const { |
| size_t pos = GetHeadPositionOrDie(reader); |
| if (pos >= queue_.size()) { |
| return absl::nullopt; |
| } |
| return queue_[pos]; |
| } |
| |
| // Returns true if for specified head there are no more elements in the queue |
| // or false otherwise. Complexity O(1). |
| bool IsEmpty(size_t reader) const { |
| size_t pos = GetHeadPositionOrDie(reader); |
| return pos >= queue_.size(); |
| } |
| |
| // Returns size of the longest queue between all readers. |
| // Complexity O(1). |
| size_t size() const { return queue_.size(); } |
| |
| // Returns size of the specified queue. Complexity O(1). |
| size_t size(size_t reader) const { |
| size_t pos = GetHeadPositionOrDie(reader); |
| return queue_.size() - pos; |
| } |
| |
| // Complexity O(1). |
| size_t readers_count() const { return heads_.size(); } |
| |
| private: |
| size_t GetHeadPositionOrDie(size_t reader) const { |
| auto it = heads_.find(reader); |
| RTC_CHECK(it != heads_.end()) << "No queue for reader " << reader; |
| return it->second - removed_elements_count_; |
| } |
| |
| void PopFront() { |
| RTC_DCHECK(!queue_.empty()); |
| RTC_DCHECK_EQ(in_queues_[0], 0); |
| queue_.pop_front(); |
| in_queues_.pop_front(); |
| removed_elements_count_++; |
| } |
| |
| // Number of the elements that were removed from the queue. It is used to |
| // subtract from each head to compute the right index inside `queue_`; |
| size_t removed_elements_count_ = 0; |
| std::deque<T> queue_; |
| // In how may queues the element at index `i` is. An element can be removed |
| // from the front if and only if it is in 0 queues. |
| std::deque<size_t> in_queues_; |
| // Map from the reader to the head position in the queue. |
| std::unordered_map<size_t, size_t> heads_; |
| }; |
| |
| } // namespace webrtc |
| |
| #endif // TEST_PC_E2E_ANALYZER_VIDEO_MULTI_READER_QUEUE_H_ |