Reland "Fix data race for config_ in AudioSendStream"
This is a reland of 51e5c4b0f47926e2586d809e47dc60fe4812b782
It may happen that user will pass config with min bitrate > max bitrate.
In such case we can't generate cached_constraints and will crash before.
The reland will handle this situation gracefully.
Original change's description:
> Fix data race for config_ in AudioSendStream
>
> config_ was written and read on different threads without sync. This CL
> moves config access on worker_thread_ with all other required fields.
> It keeps only bitrate allocator accessed from worker_queue_, because
> it is used from it in other classes and supposed to be single threaded.
>
> Bug: None
> Change-Id: I23ece4dc8b09b41a8c589412bedd36d63b76cbc5
> Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/203267
> Reviewed-by: Danil Chapovalov <danilchap@webrtc.org>
> Reviewed-by: Niels Moller <nisse@webrtc.org>
> Reviewed-by: Per Åhgren <peah@webrtc.org>
> Reviewed-by: Harald Alvestrand <hta@webrtc.org>
> Commit-Queue: Artem Titov <titovartem@webrtc.org>
> Cr-Commit-Position: refs/heads/master@{#33125}
Bug: None
Change-Id: I274ff15208d69c25fb25a0f1dd0a0e37b72480b0
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/205523
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Reviewed-by: Danil Chapovalov <danilchap@webrtc.org>
Reviewed-by: Niels Moller <nisse@webrtc.org>
Reviewed-by: Per Åhgren <peah@webrtc.org>
Commit-Queue: Artem Titov <titovartem@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#33162}
diff --git a/audio/audio_send_stream.cc b/audio/audio_send_stream.cc
index 4e21b1f..b769569 100644
--- a/audio/audio_send_stream.cc
+++ b/audio/audio_send_stream.cc
@@ -168,13 +168,14 @@
RTC_DCHECK(rtp_rtcp_module_);
+ RTC_DCHECK_RUN_ON(&worker_thread_checker_);
ConfigureStream(config, true);
-
+ UpdateCachedTargetAudioBitrateConstraints();
pacer_thread_checker_.Detach();
}
AudioSendStream::~AudioSendStream() {
- RTC_DCHECK(worker_thread_checker_.IsCurrent());
+ RTC_DCHECK_RUN_ON(&worker_thread_checker_);
RTC_LOG(LS_INFO) << "~AudioSendStream: " << config_.rtp.ssrc;
RTC_DCHECK(!sending_);
channel_send_->ResetSenderCongestionControlObjects();
@@ -186,13 +187,13 @@
}
const webrtc::AudioSendStream::Config& AudioSendStream::GetConfig() const {
- RTC_DCHECK(worker_thread_checker_.IsCurrent());
+ RTC_DCHECK_RUN_ON(&worker_thread_checker_);
return config_;
}
void AudioSendStream::Reconfigure(
const webrtc::AudioSendStream::Config& new_config) {
- RTC_DCHECK(worker_thread_checker_.IsCurrent());
+ RTC_DCHECK_RUN_ON(&worker_thread_checker_);
ConfigureStream(new_config, false);
}
@@ -351,20 +352,22 @@
}
channel_send_->CallEncoder([this](AudioEncoder* encoder) {
+ RTC_DCHECK_RUN_ON(&worker_thread_checker_);
if (!encoder) {
return;
}
- worker_queue_->PostTask(
- [this, length_range = encoder->GetFrameLengthRange()] {
- RTC_DCHECK_RUN_ON(worker_queue_);
- frame_length_range_ = length_range;
- });
+ frame_length_range_ = encoder->GetFrameLengthRange();
+ UpdateCachedTargetAudioBitrateConstraints();
});
if (sending_) {
ReconfigureBitrateObserver(new_config);
}
+
config_ = new_config;
+ if (!first_time) {
+ UpdateCachedTargetAudioBitrateConstraints();
+ }
}
void AudioSendStream::Start() {
@@ -379,13 +382,7 @@
if (send_side_bwe_with_overhead_)
rtp_transport_->IncludeOverheadInPacedSender();
rtp_rtcp_module_->SetAsPartOfAllocation(true);
- rtc::Event thread_sync_event;
- worker_queue_->PostTask([&] {
- RTC_DCHECK_RUN_ON(worker_queue_);
- ConfigureBitrateObserver();
- thread_sync_event.Set();
- });
- thread_sync_event.Wait(rtc::Event::kForever);
+ ConfigureBitrateObserver();
} else {
rtp_rtcp_module_->SetAsPartOfAllocation(false);
}
@@ -396,7 +393,7 @@
}
void AudioSendStream::Stop() {
- RTC_DCHECK(worker_thread_checker_.IsCurrent());
+ RTC_DCHECK_RUN_ON(&worker_thread_checker_);
if (!sending_) {
return;
}
@@ -431,14 +428,14 @@
int payload_frequency,
int event,
int duration_ms) {
- RTC_DCHECK(worker_thread_checker_.IsCurrent());
+ RTC_DCHECK_RUN_ON(&worker_thread_checker_);
channel_send_->SetSendTelephoneEventPayloadType(payload_type,
payload_frequency);
return channel_send_->SendTelephoneEventOutband(event, duration_ms);
}
void AudioSendStream::SetMuted(bool muted) {
- RTC_DCHECK(worker_thread_checker_.IsCurrent());
+ RTC_DCHECK_RUN_ON(&worker_thread_checker_);
channel_send_->SetInputMute(muted);
}
@@ -448,7 +445,7 @@
webrtc::AudioSendStream::Stats AudioSendStream::GetStats(
bool has_remote_tracks) const {
- RTC_DCHECK(worker_thread_checker_.IsCurrent());
+ RTC_DCHECK_RUN_ON(&worker_thread_checker_);
webrtc::AudioSendStream::Stats stats;
stats.local_ssrc = config_.rtp.ssrc;
stats.target_bitrate_bps = channel_send_->GetBitrate();
@@ -509,12 +506,14 @@
void AudioSendStream::DeliverRtcp(const uint8_t* packet, size_t length) {
RTC_DCHECK_RUN_ON(&worker_thread_checker_);
channel_send_->ReceivedRTCPPacket(packet, length);
- worker_queue_->PostTask([&]() {
+
+ {
// Poll if overhead has changed, which it can do if ack triggers us to stop
// sending mid/rid.
MutexLock lock(&overhead_per_packet_lock_);
UpdateOverheadForEncoder();
- });
+ }
+ UpdateCachedTargetAudioBitrateConstraints();
}
uint32_t AudioSendStream::OnBitrateUpdated(BitrateAllocationUpdate update) {
@@ -523,9 +522,11 @@
// Pick a target bitrate between the constraints. Overrules the allocator if
// it 1) allocated a bitrate of zero to disable the stream or 2) allocated a
// higher than max to allow for e.g. extra FEC.
- auto constraints = GetMinMaxBitrateConstraints();
- update.target_bitrate.Clamp(constraints.min, constraints.max);
- update.stable_target_bitrate.Clamp(constraints.min, constraints.max);
+ RTC_DCHECK(cached_constraints_.has_value());
+ update.target_bitrate.Clamp(cached_constraints_->min,
+ cached_constraints_->max);
+ update.stable_target_bitrate.Clamp(cached_constraints_->min,
+ cached_constraints_->max);
channel_send_->OnBitrateAllocation(update);
@@ -536,13 +537,17 @@
void AudioSendStream::SetTransportOverhead(
int transport_overhead_per_packet_bytes) {
- RTC_DCHECK(worker_thread_checker_.IsCurrent());
- MutexLock lock(&overhead_per_packet_lock_);
- transport_overhead_per_packet_bytes_ = transport_overhead_per_packet_bytes;
- UpdateOverheadForEncoder();
+ RTC_DCHECK_RUN_ON(&worker_thread_checker_);
+ {
+ MutexLock lock(&overhead_per_packet_lock_);
+ transport_overhead_per_packet_bytes_ = transport_overhead_per_packet_bytes;
+ UpdateOverheadForEncoder();
+ }
+ UpdateCachedTargetAudioBitrateConstraints();
}
void AudioSendStream::UpdateOverheadForEncoder() {
+ RTC_DCHECK_RUN_ON(&worker_thread_checker_);
size_t overhead_per_packet_bytes = GetPerPacketOverheadBytes();
if (overhead_per_packet_ == overhead_per_packet_bytes) {
return;
@@ -552,19 +557,11 @@
channel_send_->CallEncoder([&](AudioEncoder* encoder) {
encoder->OnReceivedOverhead(overhead_per_packet_bytes);
});
- auto update_task = [this, overhead_per_packet_bytes] {
- RTC_DCHECK_RUN_ON(worker_queue_);
- if (total_packet_overhead_bytes_ != overhead_per_packet_bytes) {
- total_packet_overhead_bytes_ = overhead_per_packet_bytes;
- if (registered_with_allocator_) {
- ConfigureBitrateObserver();
- }
+ if (total_packet_overhead_bytes_ != overhead_per_packet_bytes) {
+ total_packet_overhead_bytes_ = overhead_per_packet_bytes;
+ if (registered_with_allocator_) {
+ ConfigureBitrateObserver();
}
- };
- if (worker_queue_->IsCurrent()) {
- update_task();
- } else {
- worker_queue_->PostTask(update_task);
}
}
@@ -602,7 +599,6 @@
void AudioSendStream::StoreEncoderProperties(int sample_rate_hz,
size_t num_channels) {
- RTC_DCHECK(worker_thread_checker_.IsCurrent());
encoder_sample_rate_hz_ = sample_rate_hz;
encoder_num_channels_ = num_channels;
if (sending_) {
@@ -800,7 +796,6 @@
void AudioSendStream::ReconfigureBitrateObserver(
const webrtc::AudioSendStream::Config& new_config) {
- RTC_DCHECK_RUN_ON(&worker_thread_checker_);
// Since the Config's default is for both of these to be -1, this test will
// allow us to configure the bitrate observer if the new config has bitrate
// limits set, but would only have us call RemoveBitrateObserver if we were
@@ -819,20 +814,13 @@
rtp_transport_->AccountForAudioPacketsInPacedSender(true);
if (send_side_bwe_with_overhead_)
rtp_transport_->IncludeOverheadInPacedSender();
- rtc::Event thread_sync_event;
- worker_queue_->PostTask([&] {
- RTC_DCHECK_RUN_ON(worker_queue_);
- // We may get a callback immediately as the observer is registered, so
- // make
- // sure the bitrate limits in config_ are up-to-date.
- config_.min_bitrate_bps = new_config.min_bitrate_bps;
- config_.max_bitrate_bps = new_config.max_bitrate_bps;
+ // We may get a callback immediately as the observer is registered, so
+ // make sure the bitrate limits in config_ are up-to-date.
+ config_.min_bitrate_bps = new_config.min_bitrate_bps;
+ config_.max_bitrate_bps = new_config.max_bitrate_bps;
- config_.bitrate_priority = new_config.bitrate_priority;
- ConfigureBitrateObserver();
- thread_sync_event.Set();
- });
- thread_sync_event.Wait(rtc::Event::kForever);
+ config_.bitrate_priority = new_config.bitrate_priority;
+ ConfigureBitrateObserver();
rtp_rtcp_module_->SetAsPartOfAllocation(true);
} else {
rtp_transport_->AccountForAudioPacketsInPacedSender(false);
@@ -845,6 +833,7 @@
// This either updates the current observer or adds a new observer.
// TODO(srte): Add overhead compensation here.
auto constraints = GetMinMaxBitrateConstraints();
+ RTC_DCHECK(constraints.has_value());
DataRate priority_bitrate = allocation_settings_.priority_bitrate;
if (send_side_bwe_with_overhead_) {
@@ -866,30 +855,40 @@
if (allocation_settings_.priority_bitrate_raw)
priority_bitrate = *allocation_settings_.priority_bitrate_raw;
- bitrate_allocator_->AddObserver(
- this,
- MediaStreamAllocationConfig{
- constraints.min.bps<uint32_t>(), constraints.max.bps<uint32_t>(), 0,
- priority_bitrate.bps(), true,
- allocation_settings_.bitrate_priority.value_or(
- config_.bitrate_priority)});
+ worker_queue_->PostTask([this, constraints, priority_bitrate,
+ config_bitrate_priority = config_.bitrate_priority] {
+ RTC_DCHECK_RUN_ON(worker_queue_);
+ bitrate_allocator_->AddObserver(
+ this,
+ MediaStreamAllocationConfig{
+ constraints->min.bps<uint32_t>(), constraints->max.bps<uint32_t>(),
+ 0, priority_bitrate.bps(), true,
+ allocation_settings_.bitrate_priority.value_or(
+ config_bitrate_priority)});
+ });
registered_with_allocator_ = true;
}
void AudioSendStream::RemoveBitrateObserver() {
- RTC_DCHECK(worker_thread_checker_.IsCurrent());
+ registered_with_allocator_ = false;
rtc::Event thread_sync_event;
worker_queue_->PostTask([this, &thread_sync_event] {
RTC_DCHECK_RUN_ON(worker_queue_);
- registered_with_allocator_ = false;
bitrate_allocator_->RemoveObserver(this);
thread_sync_event.Set();
});
thread_sync_event.Wait(rtc::Event::kForever);
}
-AudioSendStream::TargetAudioBitrateConstraints
+absl::optional<AudioSendStream::TargetAudioBitrateConstraints>
AudioSendStream::GetMinMaxBitrateConstraints() const {
+ if (config_.min_bitrate_bps < 0 || config_.max_bitrate_bps < 0) {
+ RTC_LOG(LS_WARNING) << "Config is invalid: min_bitrate_bps="
+ << config_.min_bitrate_bps
+ << "; max_bitrate_bps=" << config_.max_bitrate_bps
+ << "; both expected greater or equal to 0";
+ return absl::nullopt;
+ }
TargetAudioBitrateConstraints constraints{
DataRate::BitsPerSec(config_.min_bitrate_bps),
DataRate::BitsPerSec(config_.max_bitrate_bps)};
@@ -902,7 +901,11 @@
RTC_DCHECK_GE(constraints.min, DataRate::Zero());
RTC_DCHECK_GE(constraints.max, DataRate::Zero());
- RTC_DCHECK_GE(constraints.max, constraints.min);
+ if (constraints.max < constraints.min) {
+ RTC_LOG(LS_WARNING) << "TargetAudioBitrateConstraints::max is less than "
+ << "TargetAudioBitrateConstraints::min";
+ return absl::nullopt;
+ }
if (send_side_bwe_with_overhead_) {
if (use_legacy_overhead_calculation_) {
// OverheadPerPacket = Ipv4(20B) + UDP(8B) + SRTP(10B) + RTP(12)
@@ -913,7 +916,10 @@
constraints.min += kMinOverhead;
constraints.max += kMinOverhead;
} else {
- RTC_DCHECK(frame_length_range_);
+ if (!frame_length_range_.has_value()) {
+ RTC_LOG(LS_WARNING) << "frame_length_range_ is not set";
+ return absl::nullopt;
+ }
const DataSize kOverheadPerPacket =
DataSize::Bytes(total_packet_overhead_bytes_);
constraints.min += kOverheadPerPacket / frame_length_range_->second;
@@ -927,5 +933,18 @@
int clockrate_hz) {
channel_send_->RegisterCngPayloadType(payload_type, clockrate_hz);
}
+
+void AudioSendStream::UpdateCachedTargetAudioBitrateConstraints() {
+ absl::optional<AudioSendStream::TargetAudioBitrateConstraints>
+ new_constraints = GetMinMaxBitrateConstraints();
+ if (!new_constraints.has_value()) {
+ return;
+ }
+ worker_queue_->PostTask([this, new_constraints]() {
+ RTC_DCHECK_RUN_ON(worker_queue_);
+ cached_constraints_ = new_constraints;
+ });
+}
+
} // namespace internal
} // namespace webrtc