Add thread checks to FifoBuffer (test-only class)
These checks replace the need for a mutex as the usage of the
StreamInterface methods is consistently on the same thread as
the callbacks.
Bug: none
Change-Id: I0c5aaddcbdaa4a6a84c3bc73306563a9f8a8821d
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/347902
Commit-Queue: Tomas Gunnarsson <tommi@webrtc.org>
Reviewed-by: Danil Chapovalov <danilchap@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#42148}
diff --git a/rtc_base/memory/BUILD.gn b/rtc_base/memory/BUILD.gn
index ee3baa4..b8f3bff 100644
--- a/rtc_base/memory/BUILD.gn
+++ b/rtc_base/memory/BUILD.gn
@@ -36,7 +36,6 @@
"..:stream",
"..:threading",
"../../api/task_queue:pending_task_safety_flag",
- "../synchronization:mutex",
]
}
diff --git a/rtc_base/memory/fifo_buffer.cc b/rtc_base/memory/fifo_buffer.cc
index c159bc9..9723656 100644
--- a/rtc_base/memory/fifo_buffer.cc
+++ b/rtc_base/memory/fifo_buffer.cc
@@ -39,20 +39,20 @@
FifoBuffer::~FifoBuffer() {}
bool FifoBuffer::GetBuffered(size_t* size) const {
- webrtc::MutexLock lock(&mutex_);
+ RTC_DCHECK_RUN_ON(&callback_sequence_);
*size = data_length_;
return true;
}
StreamState FifoBuffer::GetState() const {
- webrtc::MutexLock lock(&mutex_);
+ RTC_DCHECK_RUN_ON(&callback_sequence_);
return state_;
}
StreamResult FifoBuffer::Read(rtc::ArrayView<uint8_t> buffer,
size_t& bytes_read,
int& error) {
- webrtc::MutexLock lock(&mutex_);
+ RTC_DCHECK_RUN_ON(&callback_sequence_);
const bool was_writable = data_length_ < buffer_length_;
size_t copy = 0;
StreamResult result = ReadLocked(buffer.data(), buffer.size(), ©);
@@ -75,7 +75,7 @@
StreamResult FifoBuffer::Write(rtc::ArrayView<const uint8_t> buffer,
size_t& bytes_written,
int& error) {
- webrtc::MutexLock lock(&mutex_);
+ RTC_DCHECK_RUN_ON(&callback_sequence_);
const bool was_readable = (data_length_ > 0);
size_t copy = 0;
@@ -94,12 +94,12 @@
}
void FifoBuffer::Close() {
- webrtc::MutexLock lock(&mutex_);
+ RTC_DCHECK_RUN_ON(&callback_sequence_);
state_ = SS_CLOSED;
}
const void* FifoBuffer::GetReadData(size_t* size) {
- webrtc::MutexLock lock(&mutex_);
+ RTC_DCHECK_RUN_ON(&callback_sequence_);
*size = (read_position_ + data_length_ <= buffer_length_)
? data_length_
: buffer_length_ - read_position_;
@@ -107,8 +107,8 @@
}
void FifoBuffer::ConsumeReadData(size_t size) {
- webrtc::MutexLock lock(&mutex_);
- RTC_DCHECK(size <= data_length_);
+ RTC_DCHECK_RUN_ON(&callback_sequence_);
+ RTC_DCHECK_LE(size, data_length_);
const bool was_writable = data_length_ < buffer_length_;
read_position_ = (read_position_ + size) % buffer_length_;
data_length_ -= size;
@@ -118,7 +118,8 @@
}
void* FifoBuffer::GetWriteBuffer(size_t* size) {
- webrtc::MutexLock lock(&mutex_);
+ RTC_DCHECK_RUN_ON(&callback_sequence_);
+
if (state_ == SS_CLOSED) {
return nullptr;
}
@@ -138,8 +139,8 @@
}
void FifoBuffer::ConsumeWriteBuffer(size_t size) {
- webrtc::MutexLock lock(&mutex_);
- RTC_DCHECK(size <= buffer_length_ - data_length_);
+ RTC_DCHECK_RUN_ON(&callback_sequence_);
+ RTC_DCHECK_LE(size, buffer_length_ - data_length_);
const bool was_readable = (data_length_ > 0);
data_length_ += size;
if (!was_readable && size > 0) {
diff --git a/rtc_base/memory/fifo_buffer.h b/rtc_base/memory/fifo_buffer.h
index 658a3c7..8b31c5f 100644
--- a/rtc_base/memory/fifo_buffer.h
+++ b/rtc_base/memory/fifo_buffer.h
@@ -15,7 +15,6 @@
#include "api/task_queue/pending_task_safety_flag.h"
#include "rtc_base/stream.h"
-#include "rtc_base/synchronization/mutex.h"
namespace rtc {
@@ -78,6 +77,7 @@
private:
void PostEvent(int events, int err) {
+ RTC_DCHECK_RUN_ON(owner_);
owner_->PostTask(
webrtc::SafeTask(task_safety_.flag(), [this, events, err]() {
RTC_DCHECK_RUN_ON(&callback_sequence_);
@@ -88,31 +88,29 @@
// Helper method that implements Read. Caller must acquire a lock
// when calling this method.
StreamResult ReadLocked(void* buffer, size_t bytes, size_t* bytes_read)
- RTC_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
+ RTC_EXCLUSIVE_LOCKS_REQUIRED(callback_sequence_);
// Helper method that implements Write. Caller must acquire a lock
// when calling this method.
StreamResult WriteLocked(const void* buffer,
size_t bytes,
size_t* bytes_written)
- RTC_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
+ RTC_EXCLUSIVE_LOCKS_REQUIRED(callback_sequence_);
webrtc::ScopedTaskSafety task_safety_;
// keeps the opened/closed state of the stream
- StreamState state_ RTC_GUARDED_BY(mutex_);
+ StreamState state_ RTC_GUARDED_BY(callback_sequence_);
// the allocated buffer
- std::unique_ptr<char[]> buffer_ RTC_GUARDED_BY(mutex_);
+ std::unique_ptr<char[]> buffer_ RTC_GUARDED_BY(callback_sequence_);
// size of the allocated buffer
const size_t buffer_length_;
// amount of readable data in the buffer
- size_t data_length_ RTC_GUARDED_BY(mutex_);
+ size_t data_length_ RTC_GUARDED_BY(callback_sequence_);
// offset to the readable data
- size_t read_position_ RTC_GUARDED_BY(mutex_);
+ size_t read_position_ RTC_GUARDED_BY(callback_sequence_);
// stream callbacks are dispatched on this thread
Thread* const owner_;
- // object lock
- mutable webrtc::Mutex mutex_;
};
} // namespace rtc