Reland "Fix data race for config_ in AudioSendStream"
This is a reland of 51e5c4b0f47926e2586d809e47dc60fe4812b782
It may happen that user will pass config with min bitrate > max bitrate.
In such case we can't generate cached_constraints and will crash before.
The reland will handle this situation gracefully.
Original change's description:
> Fix data race for config_ in AudioSendStream
>
> config_ was written and read on different threads without sync. This CL
> moves config access on worker_thread_ with all other required fields.
> It keeps only bitrate allocator accessed from worker_queue_, because
> it is used from it in other classes and supposed to be single threaded.
>
> Bug: None
> Change-Id: I23ece4dc8b09b41a8c589412bedd36d63b76cbc5
> Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/203267
> Reviewed-by: Danil Chapovalov <danilchap@webrtc.org>
> Reviewed-by: Niels Moller <nisse@webrtc.org>
> Reviewed-by: Per Åhgren <peah@webrtc.org>
> Reviewed-by: Harald Alvestrand <hta@webrtc.org>
> Commit-Queue: Artem Titov <titovartem@webrtc.org>
> Cr-Commit-Position: refs/heads/master@{#33125}
Bug: None
Change-Id: I274ff15208d69c25fb25a0f1dd0a0e37b72480b0
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/205523
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Reviewed-by: Danil Chapovalov <danilchap@webrtc.org>
Reviewed-by: Niels Moller <nisse@webrtc.org>
Reviewed-by: Per Åhgren <peah@webrtc.org>
Commit-Queue: Artem Titov <titovartem@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#33162}
diff --git a/audio/audio_send_stream.cc b/audio/audio_send_stream.cc
index 4e21b1f..b769569 100644
--- a/audio/audio_send_stream.cc
+++ b/audio/audio_send_stream.cc
@@ -168,13 +168,14 @@
RTC_DCHECK(rtp_rtcp_module_);
+ RTC_DCHECK_RUN_ON(&worker_thread_checker_);
ConfigureStream(config, true);
-
+ UpdateCachedTargetAudioBitrateConstraints();
pacer_thread_checker_.Detach();
}
AudioSendStream::~AudioSendStream() {
- RTC_DCHECK(worker_thread_checker_.IsCurrent());
+ RTC_DCHECK_RUN_ON(&worker_thread_checker_);
RTC_LOG(LS_INFO) << "~AudioSendStream: " << config_.rtp.ssrc;
RTC_DCHECK(!sending_);
channel_send_->ResetSenderCongestionControlObjects();
@@ -186,13 +187,13 @@
}
const webrtc::AudioSendStream::Config& AudioSendStream::GetConfig() const {
- RTC_DCHECK(worker_thread_checker_.IsCurrent());
+ RTC_DCHECK_RUN_ON(&worker_thread_checker_);
return config_;
}
void AudioSendStream::Reconfigure(
const webrtc::AudioSendStream::Config& new_config) {
- RTC_DCHECK(worker_thread_checker_.IsCurrent());
+ RTC_DCHECK_RUN_ON(&worker_thread_checker_);
ConfigureStream(new_config, false);
}
@@ -351,20 +352,22 @@
}
channel_send_->CallEncoder([this](AudioEncoder* encoder) {
+ RTC_DCHECK_RUN_ON(&worker_thread_checker_);
if (!encoder) {
return;
}
- worker_queue_->PostTask(
- [this, length_range = encoder->GetFrameLengthRange()] {
- RTC_DCHECK_RUN_ON(worker_queue_);
- frame_length_range_ = length_range;
- });
+ frame_length_range_ = encoder->GetFrameLengthRange();
+ UpdateCachedTargetAudioBitrateConstraints();
});
if (sending_) {
ReconfigureBitrateObserver(new_config);
}
+
config_ = new_config;
+ if (!first_time) {
+ UpdateCachedTargetAudioBitrateConstraints();
+ }
}
void AudioSendStream::Start() {
@@ -379,13 +382,7 @@
if (send_side_bwe_with_overhead_)
rtp_transport_->IncludeOverheadInPacedSender();
rtp_rtcp_module_->SetAsPartOfAllocation(true);
- rtc::Event thread_sync_event;
- worker_queue_->PostTask([&] {
- RTC_DCHECK_RUN_ON(worker_queue_);
- ConfigureBitrateObserver();
- thread_sync_event.Set();
- });
- thread_sync_event.Wait(rtc::Event::kForever);
+ ConfigureBitrateObserver();
} else {
rtp_rtcp_module_->SetAsPartOfAllocation(false);
}
@@ -396,7 +393,7 @@
}
void AudioSendStream::Stop() {
- RTC_DCHECK(worker_thread_checker_.IsCurrent());
+ RTC_DCHECK_RUN_ON(&worker_thread_checker_);
if (!sending_) {
return;
}
@@ -431,14 +428,14 @@
int payload_frequency,
int event,
int duration_ms) {
- RTC_DCHECK(worker_thread_checker_.IsCurrent());
+ RTC_DCHECK_RUN_ON(&worker_thread_checker_);
channel_send_->SetSendTelephoneEventPayloadType(payload_type,
payload_frequency);
return channel_send_->SendTelephoneEventOutband(event, duration_ms);
}
void AudioSendStream::SetMuted(bool muted) {
- RTC_DCHECK(worker_thread_checker_.IsCurrent());
+ RTC_DCHECK_RUN_ON(&worker_thread_checker_);
channel_send_->SetInputMute(muted);
}
@@ -448,7 +445,7 @@
webrtc::AudioSendStream::Stats AudioSendStream::GetStats(
bool has_remote_tracks) const {
- RTC_DCHECK(worker_thread_checker_.IsCurrent());
+ RTC_DCHECK_RUN_ON(&worker_thread_checker_);
webrtc::AudioSendStream::Stats stats;
stats.local_ssrc = config_.rtp.ssrc;
stats.target_bitrate_bps = channel_send_->GetBitrate();
@@ -509,12 +506,14 @@
void AudioSendStream::DeliverRtcp(const uint8_t* packet, size_t length) {
RTC_DCHECK_RUN_ON(&worker_thread_checker_);
channel_send_->ReceivedRTCPPacket(packet, length);
- worker_queue_->PostTask([&]() {
+
+ {
// Poll if overhead has changed, which it can do if ack triggers us to stop
// sending mid/rid.
MutexLock lock(&overhead_per_packet_lock_);
UpdateOverheadForEncoder();
- });
+ }
+ UpdateCachedTargetAudioBitrateConstraints();
}
uint32_t AudioSendStream::OnBitrateUpdated(BitrateAllocationUpdate update) {
@@ -523,9 +522,11 @@
// Pick a target bitrate between the constraints. Overrules the allocator if
// it 1) allocated a bitrate of zero to disable the stream or 2) allocated a
// higher than max to allow for e.g. extra FEC.
- auto constraints = GetMinMaxBitrateConstraints();
- update.target_bitrate.Clamp(constraints.min, constraints.max);
- update.stable_target_bitrate.Clamp(constraints.min, constraints.max);
+ RTC_DCHECK(cached_constraints_.has_value());
+ update.target_bitrate.Clamp(cached_constraints_->min,
+ cached_constraints_->max);
+ update.stable_target_bitrate.Clamp(cached_constraints_->min,
+ cached_constraints_->max);
channel_send_->OnBitrateAllocation(update);
@@ -536,13 +537,17 @@
void AudioSendStream::SetTransportOverhead(
int transport_overhead_per_packet_bytes) {
- RTC_DCHECK(worker_thread_checker_.IsCurrent());
- MutexLock lock(&overhead_per_packet_lock_);
- transport_overhead_per_packet_bytes_ = transport_overhead_per_packet_bytes;
- UpdateOverheadForEncoder();
+ RTC_DCHECK_RUN_ON(&worker_thread_checker_);
+ {
+ MutexLock lock(&overhead_per_packet_lock_);
+ transport_overhead_per_packet_bytes_ = transport_overhead_per_packet_bytes;
+ UpdateOverheadForEncoder();
+ }
+ UpdateCachedTargetAudioBitrateConstraints();
}
void AudioSendStream::UpdateOverheadForEncoder() {
+ RTC_DCHECK_RUN_ON(&worker_thread_checker_);
size_t overhead_per_packet_bytes = GetPerPacketOverheadBytes();
if (overhead_per_packet_ == overhead_per_packet_bytes) {
return;
@@ -552,19 +557,11 @@
channel_send_->CallEncoder([&](AudioEncoder* encoder) {
encoder->OnReceivedOverhead(overhead_per_packet_bytes);
});
- auto update_task = [this, overhead_per_packet_bytes] {
- RTC_DCHECK_RUN_ON(worker_queue_);
- if (total_packet_overhead_bytes_ != overhead_per_packet_bytes) {
- total_packet_overhead_bytes_ = overhead_per_packet_bytes;
- if (registered_with_allocator_) {
- ConfigureBitrateObserver();
- }
+ if (total_packet_overhead_bytes_ != overhead_per_packet_bytes) {
+ total_packet_overhead_bytes_ = overhead_per_packet_bytes;
+ if (registered_with_allocator_) {
+ ConfigureBitrateObserver();
}
- };
- if (worker_queue_->IsCurrent()) {
- update_task();
- } else {
- worker_queue_->PostTask(update_task);
}
}
@@ -602,7 +599,6 @@
void AudioSendStream::StoreEncoderProperties(int sample_rate_hz,
size_t num_channels) {
- RTC_DCHECK(worker_thread_checker_.IsCurrent());
encoder_sample_rate_hz_ = sample_rate_hz;
encoder_num_channels_ = num_channels;
if (sending_) {
@@ -800,7 +796,6 @@
void AudioSendStream::ReconfigureBitrateObserver(
const webrtc::AudioSendStream::Config& new_config) {
- RTC_DCHECK_RUN_ON(&worker_thread_checker_);
// Since the Config's default is for both of these to be -1, this test will
// allow us to configure the bitrate observer if the new config has bitrate
// limits set, but would only have us call RemoveBitrateObserver if we were
@@ -819,20 +814,13 @@
rtp_transport_->AccountForAudioPacketsInPacedSender(true);
if (send_side_bwe_with_overhead_)
rtp_transport_->IncludeOverheadInPacedSender();
- rtc::Event thread_sync_event;
- worker_queue_->PostTask([&] {
- RTC_DCHECK_RUN_ON(worker_queue_);
- // We may get a callback immediately as the observer is registered, so
- // make
- // sure the bitrate limits in config_ are up-to-date.
- config_.min_bitrate_bps = new_config.min_bitrate_bps;
- config_.max_bitrate_bps = new_config.max_bitrate_bps;
+ // We may get a callback immediately as the observer is registered, so
+ // make sure the bitrate limits in config_ are up-to-date.
+ config_.min_bitrate_bps = new_config.min_bitrate_bps;
+ config_.max_bitrate_bps = new_config.max_bitrate_bps;
- config_.bitrate_priority = new_config.bitrate_priority;
- ConfigureBitrateObserver();
- thread_sync_event.Set();
- });
- thread_sync_event.Wait(rtc::Event::kForever);
+ config_.bitrate_priority = new_config.bitrate_priority;
+ ConfigureBitrateObserver();
rtp_rtcp_module_->SetAsPartOfAllocation(true);
} else {
rtp_transport_->AccountForAudioPacketsInPacedSender(false);
@@ -845,6 +833,7 @@
// This either updates the current observer or adds a new observer.
// TODO(srte): Add overhead compensation here.
auto constraints = GetMinMaxBitrateConstraints();
+ RTC_DCHECK(constraints.has_value());
DataRate priority_bitrate = allocation_settings_.priority_bitrate;
if (send_side_bwe_with_overhead_) {
@@ -866,30 +855,40 @@
if (allocation_settings_.priority_bitrate_raw)
priority_bitrate = *allocation_settings_.priority_bitrate_raw;
- bitrate_allocator_->AddObserver(
- this,
- MediaStreamAllocationConfig{
- constraints.min.bps<uint32_t>(), constraints.max.bps<uint32_t>(), 0,
- priority_bitrate.bps(), true,
- allocation_settings_.bitrate_priority.value_or(
- config_.bitrate_priority)});
+ worker_queue_->PostTask([this, constraints, priority_bitrate,
+ config_bitrate_priority = config_.bitrate_priority] {
+ RTC_DCHECK_RUN_ON(worker_queue_);
+ bitrate_allocator_->AddObserver(
+ this,
+ MediaStreamAllocationConfig{
+ constraints->min.bps<uint32_t>(), constraints->max.bps<uint32_t>(),
+ 0, priority_bitrate.bps(), true,
+ allocation_settings_.bitrate_priority.value_or(
+ config_bitrate_priority)});
+ });
registered_with_allocator_ = true;
}
void AudioSendStream::RemoveBitrateObserver() {
- RTC_DCHECK(worker_thread_checker_.IsCurrent());
+ registered_with_allocator_ = false;
rtc::Event thread_sync_event;
worker_queue_->PostTask([this, &thread_sync_event] {
RTC_DCHECK_RUN_ON(worker_queue_);
- registered_with_allocator_ = false;
bitrate_allocator_->RemoveObserver(this);
thread_sync_event.Set();
});
thread_sync_event.Wait(rtc::Event::kForever);
}
-AudioSendStream::TargetAudioBitrateConstraints
+absl::optional<AudioSendStream::TargetAudioBitrateConstraints>
AudioSendStream::GetMinMaxBitrateConstraints() const {
+ if (config_.min_bitrate_bps < 0 || config_.max_bitrate_bps < 0) {
+ RTC_LOG(LS_WARNING) << "Config is invalid: min_bitrate_bps="
+ << config_.min_bitrate_bps
+ << "; max_bitrate_bps=" << config_.max_bitrate_bps
+ << "; both expected greater or equal to 0";
+ return absl::nullopt;
+ }
TargetAudioBitrateConstraints constraints{
DataRate::BitsPerSec(config_.min_bitrate_bps),
DataRate::BitsPerSec(config_.max_bitrate_bps)};
@@ -902,7 +901,11 @@
RTC_DCHECK_GE(constraints.min, DataRate::Zero());
RTC_DCHECK_GE(constraints.max, DataRate::Zero());
- RTC_DCHECK_GE(constraints.max, constraints.min);
+ if (constraints.max < constraints.min) {
+ RTC_LOG(LS_WARNING) << "TargetAudioBitrateConstraints::max is less than "
+ << "TargetAudioBitrateConstraints::min";
+ return absl::nullopt;
+ }
if (send_side_bwe_with_overhead_) {
if (use_legacy_overhead_calculation_) {
// OverheadPerPacket = Ipv4(20B) + UDP(8B) + SRTP(10B) + RTP(12)
@@ -913,7 +916,10 @@
constraints.min += kMinOverhead;
constraints.max += kMinOverhead;
} else {
- RTC_DCHECK(frame_length_range_);
+ if (!frame_length_range_.has_value()) {
+ RTC_LOG(LS_WARNING) << "frame_length_range_ is not set";
+ return absl::nullopt;
+ }
const DataSize kOverheadPerPacket =
DataSize::Bytes(total_packet_overhead_bytes_);
constraints.min += kOverheadPerPacket / frame_length_range_->second;
@@ -927,5 +933,18 @@
int clockrate_hz) {
channel_send_->RegisterCngPayloadType(payload_type, clockrate_hz);
}
+
+void AudioSendStream::UpdateCachedTargetAudioBitrateConstraints() {
+ absl::optional<AudioSendStream::TargetAudioBitrateConstraints>
+ new_constraints = GetMinMaxBitrateConstraints();
+ if (!new_constraints.has_value()) {
+ return;
+ }
+ worker_queue_->PostTask([this, new_constraints]() {
+ RTC_DCHECK_RUN_ON(worker_queue_);
+ cached_constraints_ = new_constraints;
+ });
+}
+
} // namespace internal
} // namespace webrtc
diff --git a/audio/audio_send_stream.h b/audio/audio_send_stream.h
index 1e6982e..113d259 100644
--- a/audio/audio_send_stream.h
+++ b/audio/audio_send_stream.h
@@ -24,8 +24,8 @@
#include "rtc_base/experiments/struct_parameters_parser.h"
#include "rtc_base/race_checker.h"
#include "rtc_base/synchronization/mutex.h"
+#include "rtc_base/synchronization/sequence_checker.h"
#include "rtc_base/task_queue.h"
-#include "rtc_base/thread_checker.h"
namespace webrtc {
class RtcEventLog;
@@ -121,22 +121,29 @@
internal::AudioState* audio_state();
const internal::AudioState* audio_state() const;
- void StoreEncoderProperties(int sample_rate_hz, size_t num_channels);
+ void StoreEncoderProperties(int sample_rate_hz, size_t num_channels)
+ RTC_RUN_ON(worker_thread_checker_);
- void ConfigureStream(const Config& new_config, bool first_time);
- bool SetupSendCodec(const Config& new_config);
- bool ReconfigureSendCodec(const Config& new_config);
- void ReconfigureANA(const Config& new_config);
- void ReconfigureCNG(const Config& new_config);
- void ReconfigureBitrateObserver(const Config& new_config);
+ void ConfigureStream(const Config& new_config, bool first_time)
+ RTC_RUN_ON(worker_thread_checker_);
+ bool SetupSendCodec(const Config& new_config)
+ RTC_RUN_ON(worker_thread_checker_);
+ bool ReconfigureSendCodec(const Config& new_config)
+ RTC_RUN_ON(worker_thread_checker_);
+ void ReconfigureANA(const Config& new_config)
+ RTC_RUN_ON(worker_thread_checker_);
+ void ReconfigureCNG(const Config& new_config)
+ RTC_RUN_ON(worker_thread_checker_);
+ void ReconfigureBitrateObserver(const Config& new_config)
+ RTC_RUN_ON(worker_thread_checker_);
- void ConfigureBitrateObserver() RTC_RUN_ON(worker_queue_);
- void RemoveBitrateObserver();
+ void ConfigureBitrateObserver() RTC_RUN_ON(worker_thread_checker_);
+ void RemoveBitrateObserver() RTC_RUN_ON(worker_thread_checker_);
// Returns bitrate constraints, maybe including overhead when enabled by
// field trial.
- TargetAudioBitrateConstraints GetMinMaxBitrateConstraints() const
- RTC_RUN_ON(worker_queue_);
+ absl::optional<TargetAudioBitrateConstraints> GetMinMaxBitrateConstraints()
+ const RTC_RUN_ON(worker_thread_checker_);
// Sets per-packet overhead on encoded (for ANA) based on current known values
// of transport and packetization overheads.
@@ -147,11 +154,16 @@
size_t GetPerPacketOverheadBytes() const
RTC_EXCLUSIVE_LOCKS_REQUIRED(overhead_per_packet_lock_);
- void RegisterCngPayloadType(int payload_type, int clockrate_hz);
+ void RegisterCngPayloadType(int payload_type, int clockrate_hz)
+ RTC_RUN_ON(worker_thread_checker_);
+
+ void UpdateCachedTargetAudioBitrateConstraints()
+ RTC_RUN_ON(worker_thread_checker_);
+
Clock* clock_;
- rtc::ThreadChecker worker_thread_checker_;
- rtc::ThreadChecker pacer_thread_checker_;
+ SequenceChecker worker_thread_checker_;
+ SequenceChecker pacer_thread_checker_;
rtc::RaceChecker audio_capture_race_checker_;
rtc::TaskQueue* worker_queue_;
@@ -161,15 +173,16 @@
const bool send_side_bwe_with_overhead_;
const AudioAllocationConfig allocation_settings_;
- webrtc::AudioSendStream::Config config_;
+ webrtc::AudioSendStream::Config config_
+ RTC_GUARDED_BY(worker_thread_checker_);
rtc::scoped_refptr<webrtc::AudioState> audio_state_;
const std::unique_ptr<voe::ChannelSendInterface> channel_send_;
RtcEventLog* const event_log_;
const bool use_legacy_overhead_calculation_;
- int encoder_sample_rate_hz_ = 0;
- size_t encoder_num_channels_ = 0;
- bool sending_ = false;
+ int encoder_sample_rate_hz_ RTC_GUARDED_BY(worker_thread_checker_) = 0;
+ size_t encoder_num_channels_ RTC_GUARDED_BY(worker_thread_checker_) = 0;
+ bool sending_ RTC_GUARDED_BY(worker_thread_checker_) = false;
mutable Mutex audio_level_lock_;
// Keeps track of audio level, total audio energy and total samples duration.
// https://w3c.github.io/webrtc-stats/#dom-rtcaudiohandlerstats-totalaudioenergy
@@ -177,6 +190,9 @@
BitrateAllocatorInterface* const bitrate_allocator_
RTC_GUARDED_BY(worker_queue_);
+ // Constrains cached to be accessed from |worker_queue_|.
+ absl::optional<AudioSendStream::TargetAudioBitrateConstraints>
+ cached_constraints_ RTC_GUARDED_BY(worker_queue_) = absl::nullopt;
RtpTransportControllerSendInterface* const rtp_transport_;
RtpRtcpInterface* const rtp_rtcp_module_;
@@ -205,10 +221,12 @@
size_t transport_overhead_per_packet_bytes_
RTC_GUARDED_BY(overhead_per_packet_lock_) = 0;
- bool registered_with_allocator_ RTC_GUARDED_BY(worker_queue_) = false;
- size_t total_packet_overhead_bytes_ RTC_GUARDED_BY(worker_queue_) = 0;
+ bool registered_with_allocator_ RTC_GUARDED_BY(worker_thread_checker_) =
+ false;
+ size_t total_packet_overhead_bytes_ RTC_GUARDED_BY(worker_thread_checker_) =
+ 0;
absl::optional<std::pair<TimeDelta, TimeDelta>> frame_length_range_
- RTC_GUARDED_BY(worker_queue_);
+ RTC_GUARDED_BY(worker_thread_checker_);
};
} // namespace internal
} // namespace webrtc