blob: ad07d52c81f2988b2d6329196af2d10f0b020556 [file] [log] [blame]
/*
* Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef MODULES_AUDIO_PROCESSING_AEC3_MESSAGE_QUEUE_H_
#define MODULES_AUDIO_PROCESSING_AEC3_MESSAGE_QUEUE_H_
#include <atomic>
#include <utility>
#include <vector>
#include "rtc_base/checks.h"
#include "rtc_base/thread_checker.h"
namespace webrtc {
// Fixed-size circular queue similar to SwapQueue, but lock-free and no
// QueueItemVerifierFunction.
// The queue is designed for single-producer-single-consumer (accessed by one
// producer thread, calling Insert(), and one consumer thread, calling Remove().
template <typename T>
class MessageQueue {
public:
explicit MessageQueue(size_t size) : num_elements_(0), queue_(size) {
producer_thread_checker_.DetachFromThread();
consumer_thread_checker_.DetachFromThread();
}
MessageQueue(size_t size, const T& prototype)
: num_elements_(0), queue_(size, prototype) {
producer_thread_checker_.DetachFromThread();
consumer_thread_checker_.DetachFromThread();
}
~MessageQueue() = default;
// Inserts a T at the back of the queue by swapping *input with a T from the
// queue. This function should not be called concurrently. It can however be
// called concurrently with Remove(). Returns true if the item was inserted or
// false if not (the queue was full).
bool Insert(T* input) {
RTC_DCHECK_RUN_ON(&producer_thread_checker_);
RTC_DCHECK(input);
if (num_elements_ == queue_.size()) {
return false;
}
std::swap(*input, queue_[next_write_index_]);
++next_write_index_;
if (next_write_index_ == queue_.size()) {
next_write_index_ = 0;
}
++num_elements_;
RTC_DCHECK_LT(next_write_index_, queue_.size());
return true;
}
// Removes the frontmost T from the queue by swapping it with the T in
// *output. This function should not be called concurrently. It can however be
// called concurrently with Insert(). Returns true if an item could be removed
// or false if not (the queue was empty).
bool Remove(T* output) {
RTC_DCHECK_RUN_ON(&consumer_thread_checker_);
RTC_DCHECK(output);
if (num_elements_ == 0) {
return false;
}
std::swap(*output, queue_[next_read_index_]);
++next_read_index_;
if (next_read_index_ == queue_.size()) {
next_read_index_ = 0;
}
--num_elements_;
RTC_DCHECK_LT(next_read_index_, queue_.size());
return true;
}
private:
uint32_t next_write_index_ = 0;
uint32_t next_read_index_ = 0;
rtc::ThreadChecker producer_thread_checker_;
rtc::ThreadChecker consumer_thread_checker_;
std::atomic<uint32_t> num_elements_;
std::vector<T> queue_;
};
} // namespace webrtc
#endif // MODULES_AUDIO_PROCESSING_AEC3_MESSAGE_QUEUE_H_