Revert "[WebRTC-SendPacketsOnWorkerThread] Cleanup AudioSendStream"
This reverts commit dd557fdb1e300068c62c870d9dc5273b48c7b79d.
Reason for revert: Looks like the Chromium FYI builders are failing.
Original change's description:
> [WebRTC-SendPacketsOnWorkerThread] Cleanup AudioSendStream
>
> This remove use of MaybeWorkerThread* rtp_transport_queue_ from
> AudioSendStream. The worker queue is alwauys assumed ot be used where
> rtp_transport_queue_ was used.
>
> Bug: webrtc:14502
> Change-Id: Ia516ce7340d712671e0ecb301bba9d66e7216973
> Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/300400
> Reviewed-by: Evan Shrubsole <eshr@webrtc.org>
> Reviewed-by: Jakob Ivarsson‎ <jakobi@webrtc.org>
> Commit-Queue: Per Kjellander <perkj@webrtc.org>
> Cr-Commit-Position: refs/heads/main@{#39816}
Bug: webrtc:14502
Change-Id: I0547548032756fc579b76b6bb362f576aa06b8f7
No-Presubmit: true
No-Tree-Checks: true
No-Try: true
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/301020
Commit-Queue: Tomas Gunnarsson <tommi@webrtc.org>
Auto-Submit: Tomas Gunnarsson <tommi@webrtc.org>
Bot-Commit: rubber-stamper@appspot.gserviceaccount.com <rubber-stamper@appspot.gserviceaccount.com>
Cr-Commit-Position: refs/heads/main@{#39820}
diff --git a/audio/BUILD.gn b/audio/BUILD.gn
index d1a42b6..d3f2d87 100644
--- a/audio/BUILD.gn
+++ b/audio/BUILD.gn
@@ -86,6 +86,7 @@
"../modules/pacing",
"../modules/rtp_rtcp",
"../modules/rtp_rtcp:rtp_rtcp_format",
+ "../modules/utility:utility",
"../rtc_base:audio_format_to_string",
"../rtc_base:buffer",
"../rtc_base:checks",
@@ -195,6 +196,7 @@
"../modules/pacing",
"../modules/rtp_rtcp:mock_rtp_rtcp",
"../modules/rtp_rtcp:rtp_rtcp_format",
+ "../modules/utility:utility",
"../rtc_base:checks",
"../rtc_base:gunit_helpers",
"../rtc_base:macromagic",
diff --git a/audio/audio_send_stream.cc b/audio/audio_send_stream.cc
index 19d71c9..7d6ec79 100644
--- a/audio/audio_send_stream.cc
+++ b/audio/audio_send_stream.cc
@@ -147,6 +147,7 @@
const FieldTrialsView& field_trials)
: clock_(clock),
field_trials_(field_trials),
+ rtp_transport_queue_(rtp_transport->GetWorkerQueue()),
allocate_audio_without_feedback_(
field_trials_.IsEnabled("WebRTC-Audio-ABWENoTWCC")),
enable_audio_alr_probing_(
@@ -163,6 +164,7 @@
rtp_rtcp_module_(channel_send_->GetRtpRtcp()),
suspended_rtp_state_(suspended_rtp_state) {
RTC_LOG(LS_INFO) << "AudioSendStream: " << config.rtp.ssrc;
+ RTC_DCHECK(rtp_transport_queue_);
RTC_DCHECK(audio_state_);
RTC_DCHECK(channel_send_);
RTC_DCHECK(bitrate_allocator_);
@@ -180,6 +182,10 @@
RTC_LOG(LS_INFO) << "~AudioSendStream: " << config_.rtp.ssrc;
RTC_DCHECK(!sending_);
channel_send_->ResetSenderCongestionControlObjects();
+
+ // Blocking call to synchronize state with worker queue to ensure that there
+ // are no pending tasks left that keeps references to audio.
+ rtp_transport_queue_->RunSynchronous([] {});
}
const webrtc::AudioSendStream::Config& AudioSendStream::GetConfig() const {
@@ -504,7 +510,7 @@
}
uint32_t AudioSendStream::OnBitrateUpdated(BitrateAllocationUpdate update) {
- RTC_DCHECK_RUN_ON(&worker_thread_checker_);
+ RTC_DCHECK_RUN_ON(rtp_transport_queue_);
// Pick a target bitrate between the constraints. Overrules the allocator if
// it 1) allocated a bitrate of zero to disable the stream or 2) allocated a
@@ -819,7 +825,6 @@
}
void AudioSendStream::ConfigureBitrateObserver() {
- RTC_DCHECK_RUN_ON(&worker_thread_checker_);
// This either updates the current observer or adds a new observer.
// TODO(srte): Add overhead compensation here.
auto constraints = GetMinMaxBitrateConstraints();
@@ -841,24 +846,30 @@
priority_bitrate += min_overhead;
}
- if (allocation_settings_.priority_bitrate_raw) {
+ if (allocation_settings_.priority_bitrate_raw)
priority_bitrate = *allocation_settings_.priority_bitrate_raw;
- }
- bitrate_allocator_->AddObserver(
- this,
- MediaStreamAllocationConfig{
- constraints->min.bps<uint32_t>(), constraints->max.bps<uint32_t>(), 0,
- priority_bitrate.bps(), true,
- allocation_settings_.bitrate_priority.value_or(
- config_.bitrate_priority)});
-
+ rtp_transport_queue_->RunOrPost([this, constraints, priority_bitrate,
+ config_bitrate_priority =
+ config_.bitrate_priority] {
+ RTC_DCHECK_RUN_ON(rtp_transport_queue_);
+ bitrate_allocator_->AddObserver(
+ this,
+ MediaStreamAllocationConfig{
+ constraints->min.bps<uint32_t>(), constraints->max.bps<uint32_t>(),
+ 0, priority_bitrate.bps(), true,
+ allocation_settings_.bitrate_priority.value_or(
+ config_bitrate_priority)});
+ });
registered_with_allocator_ = true;
}
void AudioSendStream::RemoveBitrateObserver() {
registered_with_allocator_ = false;
- bitrate_allocator_->RemoveObserver(this);
+ rtp_transport_queue_->RunSynchronous([this] {
+ RTC_DCHECK_RUN_ON(rtp_transport_queue_);
+ bitrate_allocator_->RemoveObserver(this);
+ });
}
absl::optional<AudioSendStream::TargetAudioBitrateConstraints>
@@ -919,7 +930,10 @@
if (!new_constraints.has_value()) {
return;
}
- cached_constraints_ = new_constraints;
+ rtp_transport_queue_->RunOrPost([this, new_constraints]() {
+ RTC_DCHECK_RUN_ON(rtp_transport_queue_);
+ cached_constraints_ = new_constraints;
+ });
}
} // namespace internal
diff --git a/audio/audio_send_stream.h b/audio/audio_send_stream.h
index 6cda9c3..42be43a 100644
--- a/audio/audio_send_stream.h
+++ b/audio/audio_send_stream.h
@@ -25,6 +25,7 @@
#include "call/audio_state.h"
#include "call/bitrate_allocator.h"
#include "modules/rtp_rtcp/source/rtp_rtcp_interface.h"
+#include "modules/utility/maybe_worker_thread.h"
#include "rtc_base/experiments/struct_parameters_parser.h"
#include "rtc_base/race_checker.h"
#include "rtc_base/synchronization/mutex.h"
@@ -172,6 +173,7 @@
SequenceChecker worker_thread_checker_;
rtc::RaceChecker audio_capture_race_checker_;
+ MaybeWorkerThread* rtp_transport_queue_;
const bool allocate_audio_without_feedback_;
const bool force_no_audio_feedback_ = allocate_audio_without_feedback_;
@@ -194,10 +196,10 @@
webrtc::voe::AudioLevel audio_level_ RTC_GUARDED_BY(audio_level_lock_);
BitrateAllocatorInterface* const bitrate_allocator_
- RTC_GUARDED_BY(worker_thread_checker_);
+ RTC_GUARDED_BY(rtp_transport_queue_);
+ // Constrains cached to be accessed from `rtp_transport_queue_`.
absl::optional<AudioSendStream::TargetAudioBitrateConstraints>
- cached_constraints_ RTC_GUARDED_BY(worker_thread_checker_) =
- absl::nullopt;
+ cached_constraints_ RTC_GUARDED_BY(rtp_transport_queue_) = absl::nullopt;
RtpTransportControllerSendInterface* const rtp_transport_;
RtpRtcpInterface* const rtp_rtcp_module_;
diff --git a/audio/audio_send_stream_unittest.cc b/audio/audio_send_stream_unittest.cc
index a6450d3..a81b40c 100644
--- a/audio/audio_send_stream_unittest.cc
+++ b/audio/audio_send_stream_unittest.cc
@@ -30,6 +30,7 @@
#include "modules/audio_processing/include/mock_audio_processing.h"
#include "modules/rtp_rtcp/mocks/mock_rtcp_bandwidth_observer.h"
#include "modules/rtp_rtcp/mocks/mock_rtp_rtcp.h"
+#include "modules/utility/maybe_worker_thread.h"
#include "system_wrappers/include/clock.h"
#include "test/gtest.h"
#include "test/mock_audio_encoder.h"
@@ -154,6 +155,9 @@
? nullptr
: rtc::make_ref_counted<NiceMock<MockAudioProcessing>>()),
bitrate_allocator_(&limit_observer_),
+ worker_queue_(field_trials,
+ "ConfigHelper_worker_queue",
+ time_controller_.GetTaskQueueFactory()),
audio_encoder_(nullptr) {
using ::testing::Invoke;
@@ -184,6 +188,8 @@
}
std::unique_ptr<internal::AudioSendStream> CreateAudioSendStream() {
+ EXPECT_CALL(rtp_transport_, GetWorkerQueue())
+ .WillRepeatedly(Return(&worker_queue_));
return std::unique_ptr<internal::AudioSendStream>(
new internal::AudioSendStream(
time_controller_.GetClock(), stream_config_, audio_state_,
@@ -313,6 +319,8 @@
}
}
+ MaybeWorkerThread* worker() { return &worker_queue_; }
+
test::ScopedKeyValueConfig field_trials;
private:
@@ -328,6 +336,9 @@
::testing::NiceMock<MockRtpRtcpInterface> rtp_rtcp_;
::testing::NiceMock<MockLimitObserver> limit_observer_;
BitrateAllocator bitrate_allocator_;
+ // `worker_queue` is defined last to ensure all pending tasks are cancelled
+ // and deleted before any other members.
+ MaybeWorkerThread worker_queue_;
std::unique_ptr<AudioEncoder> audio_encoder_;
};
@@ -625,7 +636,8 @@
update.packet_loss_ratio = 0;
update.round_trip_time = TimeDelta::Millis(50);
update.bwe_period = TimeDelta::Millis(6000);
- send_stream->OnBitrateUpdated(update);
+ helper.worker()->RunSynchronous(
+ [&] { send_stream->OnBitrateUpdated(update); });
}
}
@@ -641,7 +653,8 @@
BitrateAllocationUpdate update;
update.target_bitrate =
DataRate::BitsPerSec(helper.config().max_bitrate_bps - 5000);
- send_stream->OnBitrateUpdated(update);
+ helper.worker()->RunSynchronous(
+ [&] { send_stream->OnBitrateUpdated(update); });
}
}
@@ -657,7 +670,8 @@
Eq(DataRate::KilobitsPerSec(6)))));
BitrateAllocationUpdate update;
update.target_bitrate = DataRate::KilobitsPerSec(1);
- send_stream->OnBitrateUpdated(update);
+ helper.worker()->RunSynchronous(
+ [&] { send_stream->OnBitrateUpdated(update); });
}
}
@@ -673,7 +687,8 @@
Eq(DataRate::KilobitsPerSec(64)))));
BitrateAllocationUpdate update;
update.target_bitrate = DataRate::KilobitsPerSec(128);
- send_stream->OnBitrateUpdated(update);
+ helper.worker()->RunSynchronous(
+ [&] { send_stream->OnBitrateUpdated(update); });
}
}
@@ -693,7 +708,8 @@
&BitrateAllocationUpdate::target_bitrate, Eq(bitrate))));
BitrateAllocationUpdate update;
update.target_bitrate = bitrate;
- send_stream->OnBitrateUpdated(update);
+ helper.worker()->RunSynchronous(
+ [&] { send_stream->OnBitrateUpdated(update); });
}
}
@@ -713,7 +729,8 @@
&BitrateAllocationUpdate::target_bitrate, Eq(bitrate))));
BitrateAllocationUpdate update;
update.target_bitrate = DataRate::KilobitsPerSec(1);
- send_stream->OnBitrateUpdated(update);
+ helper.worker()->RunSynchronous(
+ [&] { send_stream->OnBitrateUpdated(update); });
}
}
@@ -733,7 +750,8 @@
&BitrateAllocationUpdate::target_bitrate, Eq(bitrate))));
BitrateAllocationUpdate update;
update.target_bitrate = DataRate::KilobitsPerSec(128);
- send_stream->OnBitrateUpdated(update);
+ helper.worker()->RunSynchronous(
+ [&] { send_stream->OnBitrateUpdated(update); });
}
}
@@ -751,7 +769,8 @@
update.packet_loss_ratio = 0;
update.round_trip_time = TimeDelta::Millis(50);
update.bwe_period = TimeDelta::Millis(5000);
- send_stream->OnBitrateUpdated(update);
+ helper.worker()->RunSynchronous(
+ [&] { send_stream->OnBitrateUpdated(update); });
}
}
@@ -853,7 +872,8 @@
DataRate::BitsPerSec(helper.config().max_bitrate_bps) +
kMaxOverheadRate;
EXPECT_CALL(*helper.channel_send(), OnBitrateAllocation);
- send_stream->OnBitrateUpdated(update);
+ helper.worker()->RunSynchronous(
+ [&] { send_stream->OnBitrateUpdated(update); });
EXPECT_EQ(audio_overhead_per_packet_bytes,
send_stream->TestOnlyGetPerPacketOverheadBytes());
@@ -861,7 +881,8 @@
EXPECT_CALL(*helper.rtp_rtcp(), ExpectedPerPacketOverhead)
.WillRepeatedly(Return(audio_overhead_per_packet_bytes + 20));
EXPECT_CALL(*helper.channel_send(), OnBitrateAllocation);
- send_stream->OnBitrateUpdated(update);
+ helper.worker()->RunSynchronous(
+ [&] { send_stream->OnBitrateUpdated(update); });
EXPECT_EQ(audio_overhead_per_packet_bytes + 20,
send_stream->TestOnlyGetPerPacketOverheadBytes());
@@ -885,7 +906,8 @@
DataRate::BitsPerSec(helper.config().max_bitrate_bps) +
kMaxOverheadRate;
EXPECT_CALL(*helper.channel_send(), OnBitrateAllocation);
- send_stream->OnBitrateUpdated(update);
+ helper.worker()->RunSynchronous(
+ [&] { send_stream->OnBitrateUpdated(update); });
EXPECT_EQ(
transport_overhead_per_packet_bytes + audio_overhead_per_packet_bytes,