AEC3: Lockless transfer of render data to the capture thread
This CL implements a lockless queue that replaces SwapQueue
in the RenderWriter. This avoid stalls when the render and
capture threads are accessing the queue at the same time.
Bug: webrtc:10205
Change-Id: Ie7d6fcf9c80fad957e2a90537658fb730ca2ed72
Reviewed-on: https://webrtc-review.googlesource.com/c/117643
Reviewed-by: Per Ã…hgren <peah@webrtc.org>
Commit-Queue: Gustaf Ullberg <gustaf@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#26298}
diff --git a/modules/audio_processing/aec3/BUILD.gn b/modules/audio_processing/aec3/BUILD.gn
index 189bcfd..b34ed22 100644
--- a/modules/audio_processing/aec3/BUILD.gn
+++ b/modules/audio_processing/aec3/BUILD.gn
@@ -75,6 +75,7 @@
"matched_filter_lag_aggregator.h",
"matrix_buffer.cc",
"matrix_buffer.h",
+ "message_queue.h",
"moving_average.cc",
"moving_average.h",
"render_buffer.cc",
diff --git a/modules/audio_processing/aec3/echo_canceller3.cc b/modules/audio_processing/aec3/echo_canceller3.cc
index e384605..fc802b1 100644
--- a/modules/audio_processing/aec3/echo_canceller3.cc
+++ b/modules/audio_processing/aec3/echo_canceller3.cc
@@ -276,13 +276,13 @@
class EchoCanceller3::RenderWriter {
public:
- RenderWriter(ApmDataDumper* data_dumper,
- SwapQueue<std::vector<std::vector<float>>,
- Aec3RenderQueueItemVerifier>* render_transfer_queue,
- std::unique_ptr<CascadedBiQuadFilter> render_highpass_filter,
- int sample_rate_hz,
- int frame_length,
- int num_bands);
+ RenderWriter(
+ ApmDataDumper* data_dumper,
+ MessageQueue<std::vector<std::vector<float>>>* render_transfer_queue,
+ std::unique_ptr<CascadedBiQuadFilter> render_highpass_filter,
+ int sample_rate_hz,
+ int frame_length,
+ int num_bands);
~RenderWriter();
void Insert(AudioBuffer* input);
@@ -293,15 +293,13 @@
const int num_bands_;
std::unique_ptr<CascadedBiQuadFilter> render_highpass_filter_;
std::vector<std::vector<float>> render_queue_input_frame_;
- SwapQueue<std::vector<std::vector<float>>, Aec3RenderQueueItemVerifier>*
- render_transfer_queue_;
+ MessageQueue<std::vector<std::vector<float>>>* render_transfer_queue_;
RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(RenderWriter);
};
EchoCanceller3::RenderWriter::RenderWriter(
ApmDataDumper* data_dumper,
- SwapQueue<std::vector<std::vector<float>>, Aec3RenderQueueItemVerifier>*
- render_transfer_queue,
+ MessageQueue<std::vector<std::vector<float>>>* render_transfer_queue,
std::unique_ptr<CascadedBiQuadFilter> render_highpass_filter,
int sample_rate_hz,
int frame_length,
@@ -370,12 +368,10 @@
output_framer_(num_bands_),
capture_blocker_(num_bands_),
render_blocker_(num_bands_),
- render_transfer_queue_(
- kRenderTransferQueueSizeFrames,
- std::vector<std::vector<float>>(
- num_bands_,
- std::vector<float>(frame_length_, 0.f)),
- Aec3RenderQueueItemVerifier(num_bands_, frame_length_)),
+ render_transfer_queue_(kRenderTransferQueueSizeFrames,
+ std::vector<std::vector<float>>(
+ num_bands_,
+ std::vector<float>(frame_length_, 0.f))),
block_processor_(std::move(block_processor)),
render_queue_output_frame_(num_bands_,
std::vector<float>(frame_length_, 0.f)),
diff --git a/modules/audio_processing/aec3/echo_canceller3.h b/modules/audio_processing/aec3/echo_canceller3.h
index c1298d2..cb3b382 100644
--- a/modules/audio_processing/aec3/echo_canceller3.h
+++ b/modules/audio_processing/aec3/echo_canceller3.h
@@ -24,12 +24,12 @@
#include "modules/audio_processing/aec3/block_processor.h"
#include "modules/audio_processing/aec3/cascaded_biquad_filter.h"
#include "modules/audio_processing/aec3/frame_blocker.h"
+#include "modules/audio_processing/aec3/message_queue.h"
#include "modules/audio_processing/audio_buffer.h"
#include "modules/audio_processing/logging/apm_data_dumper.h"
#include "rtc_base/checks.h"
#include "rtc_base/constructor_magic.h"
#include "rtc_base/race_checker.h"
-#include "rtc_base/swap_queue.h"
#include "rtc_base/thread_annotations.h"
namespace webrtc {
@@ -107,7 +107,7 @@
private:
class RenderWriter;
- // Empties the render SwapQueue.
+ // Empties the render MessageQueue.
void EmptyRenderQueue();
rtc::RaceChecker capture_race_checker_;
@@ -127,8 +127,7 @@
BlockFramer output_framer_ RTC_GUARDED_BY(capture_race_checker_);
FrameBlocker capture_blocker_ RTC_GUARDED_BY(capture_race_checker_);
FrameBlocker render_blocker_ RTC_GUARDED_BY(capture_race_checker_);
- SwapQueue<std::vector<std::vector<float>>, Aec3RenderQueueItemVerifier>
- render_transfer_queue_;
+ MessageQueue<std::vector<std::vector<float>>> render_transfer_queue_;
std::unique_ptr<BlockProcessor> block_processor_
RTC_GUARDED_BY(capture_race_checker_);
std::vector<std::vector<float>> render_queue_output_frame_
diff --git a/modules/audio_processing/aec3/echo_canceller3_unittest.cc b/modules/audio_processing/aec3/echo_canceller3_unittest.cc
index 3f1e059..ab2ae04 100644
--- a/modules/audio_processing/aec3/echo_canceller3_unittest.cc
+++ b/modules/audio_processing/aec3/echo_canceller3_unittest.cc
@@ -461,7 +461,7 @@
// This test verifies that the swapqueue is able to handle jitter in the
// capture and render API calls.
- void RunRenderSwapQueueVerificationTest() {
+ void RunRenderMessageQueueVerificationTest() {
const EchoCanceller3Config config;
EchoCanceller3 aec3(
config, sample_rate_hz_, false,
@@ -502,7 +502,7 @@
// This test verifies that a buffer overrun in the render swapqueue is
// properly reported.
- void RunRenderPipelineSwapQueueOverrunReturnValueTest() {
+ void RunRenderPipelineMessageQueueOverrunReturnValueTest() {
EchoCanceller3 aec3(EchoCanceller3Config(), sample_rate_hz_, false);
constexpr size_t kRenderTransferQueueSize = 30;
@@ -631,18 +631,18 @@
}
}
-TEST(EchoCanceller3Buffering, RenderSwapQueue) {
+TEST(EchoCanceller3Buffering, RenderMessageQueue) {
for (auto rate : {8000, 16000}) {
SCOPED_TRACE(ProduceDebugText(rate));
- EchoCanceller3Tester(rate).RunRenderSwapQueueVerificationTest();
+ EchoCanceller3Tester(rate).RunRenderMessageQueueVerificationTest();
}
}
-TEST(EchoCanceller3Buffering, RenderSwapQueueOverrunReturnValue) {
+TEST(EchoCanceller3Buffering, RenderMessageQueueOverrunReturnValue) {
for (auto rate : {8000, 16000, 32000, 48000}) {
SCOPED_TRACE(ProduceDebugText(rate));
EchoCanceller3Tester(rate)
- .RunRenderPipelineSwapQueueOverrunReturnValueTest();
+ .RunRenderPipelineMessageQueueOverrunReturnValueTest();
}
}
diff --git a/modules/audio_processing/aec3/message_queue.h b/modules/audio_processing/aec3/message_queue.h
new file mode 100644
index 0000000..ad07d52
--- /dev/null
+++ b/modules/audio_processing/aec3/message_queue.h
@@ -0,0 +1,103 @@
+/*
+ * Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#ifndef MODULES_AUDIO_PROCESSING_AEC3_MESSAGE_QUEUE_H_
+#define MODULES_AUDIO_PROCESSING_AEC3_MESSAGE_QUEUE_H_
+
+#include <atomic>
+#include <utility>
+#include <vector>
+
+#include "rtc_base/checks.h"
+#include "rtc_base/thread_checker.h"
+
+namespace webrtc {
+
+// Fixed-size circular queue similar to SwapQueue, but lock-free and no
+// QueueItemVerifierFunction.
+// The queue is designed for single-producer-single-consumer (accessed by one
+// producer thread, calling Insert(), and one consumer thread, calling Remove().
+template <typename T>
+class MessageQueue {
+ public:
+ explicit MessageQueue(size_t size) : num_elements_(0), queue_(size) {
+ producer_thread_checker_.DetachFromThread();
+ consumer_thread_checker_.DetachFromThread();
+ }
+ MessageQueue(size_t size, const T& prototype)
+ : num_elements_(0), queue_(size, prototype) {
+ producer_thread_checker_.DetachFromThread();
+ consumer_thread_checker_.DetachFromThread();
+ }
+ ~MessageQueue() = default;
+
+ // Inserts a T at the back of the queue by swapping *input with a T from the
+ // queue. This function should not be called concurrently. It can however be
+ // called concurrently with Remove(). Returns true if the item was inserted or
+ // false if not (the queue was full).
+ bool Insert(T* input) {
+ RTC_DCHECK_RUN_ON(&producer_thread_checker_);
+ RTC_DCHECK(input);
+
+ if (num_elements_ == queue_.size()) {
+ return false;
+ }
+
+ std::swap(*input, queue_[next_write_index_]);
+
+ ++next_write_index_;
+ if (next_write_index_ == queue_.size()) {
+ next_write_index_ = 0;
+ }
+
+ ++num_elements_;
+
+ RTC_DCHECK_LT(next_write_index_, queue_.size());
+
+ return true;
+ }
+
+ // Removes the frontmost T from the queue by swapping it with the T in
+ // *output. This function should not be called concurrently. It can however be
+ // called concurrently with Insert(). Returns true if an item could be removed
+ // or false if not (the queue was empty).
+ bool Remove(T* output) {
+ RTC_DCHECK_RUN_ON(&consumer_thread_checker_);
+ RTC_DCHECK(output);
+
+ if (num_elements_ == 0) {
+ return false;
+ }
+
+ std::swap(*output, queue_[next_read_index_]);
+
+ ++next_read_index_;
+ if (next_read_index_ == queue_.size()) {
+ next_read_index_ = 0;
+ }
+
+ --num_elements_;
+
+ RTC_DCHECK_LT(next_read_index_, queue_.size());
+
+ return true;
+ }
+
+ private:
+ uint32_t next_write_index_ = 0;
+ uint32_t next_read_index_ = 0;
+ rtc::ThreadChecker producer_thread_checker_;
+ rtc::ThreadChecker consumer_thread_checker_;
+ std::atomic<uint32_t> num_elements_;
+ std::vector<T> queue_;
+};
+} // namespace webrtc
+
+#endif // MODULES_AUDIO_PROCESSING_AEC3_MESSAGE_QUEUE_H_