Remove send_crit_, bitrate_crit_ and last_bandwidth_bps_crit_ locks.
...from the Call class.
With minimal shifting of ownership of a few variables, we can
maintain their state fully on the worker thread, where they're used.
This change also removes the dependency on RWLockWrapper.
Bug: webrtc:11612
Change-Id: Ia14be5bd6b50bd0b32d04f078b1e283080c00a19
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/176122
Commit-Queue: Tommi <tommi@webrtc.org>
Reviewed-by: Markus Handell <handellm@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#31360}
diff --git a/call/BUILD.gn b/call/BUILD.gn
index a5e21b1..e9d16df 100644
--- a/call/BUILD.gn
+++ b/call/BUILD.gn
@@ -280,8 +280,8 @@
"../rtc_base:safe_minmax",
"../rtc_base/experiments:field_trial_parser",
"../rtc_base/network:sent_packet",
- "../rtc_base/synchronization:rw_lock_wrapper",
"../rtc_base/synchronization:sequence_checker",
+ "../rtc_base/task_utils:pending_task_safety_flag",
"../system_wrappers",
"../system_wrappers:field_trial",
"../system_wrappers:metrics",
diff --git a/call/call.cc b/call/call.cc
index eb407e0..4bd52a6 100644
--- a/call/call.cc
+++ b/call/call.cc
@@ -49,8 +49,8 @@
#include "rtc_base/location.h"
#include "rtc_base/logging.h"
#include "rtc_base/strings/string_builder.h"
-#include "rtc_base/synchronization/rw_lock_wrapper.h"
#include "rtc_base/synchronization/sequence_checker.h"
+#include "rtc_base/task_utils/pending_task_safety_flag.h"
#include "rtc_base/thread_annotations.h"
#include "rtc_base/time_utils.h"
#include "rtc_base/trace_event.h"
@@ -244,20 +244,20 @@
DeliveryStatus DeliverRtcp(MediaType media_type,
const uint8_t* packet,
size_t length)
- RTC_EXCLUSIVE_LOCKS_REQUIRED(configuration_sequence_checker_);
+ RTC_EXCLUSIVE_LOCKS_REQUIRED(worker_thread_);
DeliveryStatus DeliverRtp(MediaType media_type,
rtc::CopyOnWriteBuffer packet,
int64_t packet_time_us)
- RTC_EXCLUSIVE_LOCKS_REQUIRED(configuration_sequence_checker_);
+ RTC_EXCLUSIVE_LOCKS_REQUIRED(worker_thread_);
void ConfigureSync(const std::string& sync_group)
- RTC_EXCLUSIVE_LOCKS_REQUIRED(configuration_sequence_checker_);
+ RTC_EXCLUSIVE_LOCKS_REQUIRED(worker_thread_);
void NotifyBweOfReceivedPacket(const RtpPacketReceived& packet,
MediaType media_type)
- RTC_SHARED_LOCKS_REQUIRED(configuration_sequence_checker_);
+ RTC_SHARED_LOCKS_REQUIRED(worker_thread_);
void UpdateSendHistograms(Timestamp first_sent_packet)
- RTC_EXCLUSIVE_LOCKS_REQUIRED(&bitrate_crit_);
+ RTC_EXCLUSIVE_LOCKS_REQUIRED(worker_thread_);
void UpdateReceiveHistograms();
void UpdateHistograms();
void UpdateAggregateNetworkState();
@@ -270,28 +270,28 @@
Clock* const clock_;
TaskQueueFactory* const task_queue_factory_;
+ TaskQueueBase* const worker_thread_;
const int num_cpu_cores_;
const rtc::scoped_refptr<SharedModuleThread> module_process_thread_;
const std::unique_ptr<CallStats> call_stats_;
const std::unique_ptr<BitrateAllocator> bitrate_allocator_;
Call::Config config_;
- SequenceChecker configuration_sequence_checker_;
SequenceChecker network_sequence_checker_;
NetworkState audio_network_state_;
NetworkState video_network_state_;
- bool aggregate_network_up_ RTC_GUARDED_BY(configuration_sequence_checker_);
+ bool aggregate_network_up_ RTC_GUARDED_BY(worker_thread_);
// Audio, Video, and FlexFEC receive streams are owned by the client that
// creates them.
std::set<AudioReceiveStream*> audio_receive_streams_
- RTC_GUARDED_BY(configuration_sequence_checker_);
+ RTC_GUARDED_BY(worker_thread_);
std::set<VideoReceiveStream2*> video_receive_streams_
- RTC_GUARDED_BY(configuration_sequence_checker_);
+ RTC_GUARDED_BY(worker_thread_);
std::map<std::string, AudioReceiveStream*> sync_stream_mapping_
- RTC_GUARDED_BY(configuration_sequence_checker_);
+ RTC_GUARDED_BY(worker_thread_);
// TODO(nisse): Should eventually be injected at creation,
// with a single object in the bundled case.
@@ -325,25 +325,22 @@
const bool use_send_side_bwe;
};
std::map<uint32_t, ReceiveRtpConfig> receive_rtp_config_
- RTC_GUARDED_BY(configuration_sequence_checker_);
+ RTC_GUARDED_BY(worker_thread_);
- std::unique_ptr<RWLockWrapper> send_crit_;
// Audio and Video send streams are owned by the client that creates them.
std::map<uint32_t, AudioSendStream*> audio_send_ssrcs_
- RTC_GUARDED_BY(send_crit_);
+ RTC_GUARDED_BY(worker_thread_);
std::map<uint32_t, VideoSendStream*> video_send_ssrcs_
- RTC_GUARDED_BY(send_crit_);
- std::set<VideoSendStream*> video_send_streams_ RTC_GUARDED_BY(send_crit_);
+ RTC_GUARDED_BY(worker_thread_);
+ std::set<VideoSendStream*> video_send_streams_ RTC_GUARDED_BY(worker_thread_);
using RtpStateMap = std::map<uint32_t, RtpState>;
- RtpStateMap suspended_audio_send_ssrcs_
- RTC_GUARDED_BY(configuration_sequence_checker_);
- RtpStateMap suspended_video_send_ssrcs_
- RTC_GUARDED_BY(configuration_sequence_checker_);
+ RtpStateMap suspended_audio_send_ssrcs_ RTC_GUARDED_BY(worker_thread_);
+ RtpStateMap suspended_video_send_ssrcs_ RTC_GUARDED_BY(worker_thread_);
using RtpPayloadStateMap = std::map<uint32_t, RtpPayloadState>;
RtpPayloadStateMap suspended_video_payload_states_
- RTC_GUARDED_BY(configuration_sequence_checker_);
+ RTC_GUARDED_BY(worker_thread_);
webrtc::RtcEventLog* event_log_;
@@ -359,17 +356,14 @@
absl::optional<int64_t> first_received_rtp_video_ms_;
absl::optional<int64_t> last_received_rtp_video_ms_;
- rtc::CriticalSection last_bandwidth_bps_crit_;
- uint32_t last_bandwidth_bps_ RTC_GUARDED_BY(&last_bandwidth_bps_crit_);
+ uint32_t last_bandwidth_bps_ RTC_GUARDED_BY(worker_thread_);
// TODO(holmer): Remove this lock once BitrateController no longer calls
// OnNetworkChanged from multiple threads.
- rtc::CriticalSection bitrate_crit_;
- uint32_t min_allocated_send_bitrate_bps_
- RTC_GUARDED_BY(&network_sequence_checker_);
- uint32_t configured_max_padding_bitrate_bps_ RTC_GUARDED_BY(&bitrate_crit_);
+ uint32_t min_allocated_send_bitrate_bps_ RTC_GUARDED_BY(worker_thread_);
+ uint32_t configured_max_padding_bitrate_bps_ RTC_GUARDED_BY(worker_thread_);
AvgCounter estimated_send_bitrate_kbps_counter_
- RTC_GUARDED_BY(&bitrate_crit_);
- AvgCounter pacer_bitrate_kbps_counter_ RTC_GUARDED_BY(&bitrate_crit_);
+ RTC_GUARDED_BY(worker_thread_);
+ AvgCounter pacer_bitrate_kbps_counter_ RTC_GUARDED_BY(worker_thread_);
ReceiveSideCongestionController receive_side_cc_;
@@ -378,6 +372,11 @@
const std::unique_ptr<SendDelayStats> video_send_delay_stats_;
const int64_t start_ms_;
+ // Note that |task_safety_| needs to be at a greater scope than the task queue
+ // owned by |transport_send_| since calls might arrive on the network thread
+ // while Call is being deleted and the task queue is being torn down.
+ ScopedTaskSafety task_safety_;
+
// Caches transport_send_.get(), to avoid racing with destructor.
// Note that this is declared before transport_send_ to ensure that it is not
// invalidated until no more tasks can be running on the transport_send_ task
@@ -387,8 +386,8 @@
// last ensures that it is destroyed first and any running tasks are finished.
std::unique_ptr<RtpTransportControllerSendInterface> transport_send_;
- bool is_target_rate_observer_registered_
- RTC_GUARDED_BY(&configuration_sequence_checker_) = false;
+ bool is_target_rate_observer_registered_ RTC_GUARDED_BY(worker_thread_) =
+ false;
RTC_DISALLOW_COPY_AND_ASSIGN(Call);
};
@@ -550,15 +549,15 @@
TaskQueueFactory* task_queue_factory)
: clock_(clock),
task_queue_factory_(task_queue_factory),
+ worker_thread_(GetCurrentTaskQueueOrThread()),
num_cpu_cores_(CpuInfo::DetectNumberOfCores()),
module_process_thread_(std::move(module_process_thread)),
- call_stats_(new CallStats(clock_, GetCurrentTaskQueueOrThread())),
+ call_stats_(new CallStats(clock_, worker_thread_)),
bitrate_allocator_(new BitrateAllocator(this)),
config_(config),
audio_network_state_(kNetworkDown),
video_network_state_(kNetworkDown),
aggregate_network_up_(false),
- send_crit_(RWLockWrapper::CreateRWLock()),
event_log_(config.event_log),
received_bytes_per_second_counter_(clock_, nullptr, true),
received_audio_bytes_per_second_counter_(clock_, nullptr, true),
@@ -577,6 +576,7 @@
transport_send_(std::move(transport_send)) {
RTC_DCHECK(config.event_log != nullptr);
RTC_DCHECK(config.trials != nullptr);
+ RTC_DCHECK(worker_thread_->IsCurrent());
network_sequence_checker_.Detach();
call_stats_->RegisterStatsObserver(&receive_side_cc_);
@@ -588,7 +588,7 @@
}
Call::~Call() {
- RTC_DCHECK_RUN_ON(&configuration_sequence_checker_);
+ RTC_DCHECK_RUN_ON(worker_thread_);
RTC_CHECK(audio_send_ssrcs_.empty());
RTC_CHECK(video_send_ssrcs_.empty());
@@ -607,7 +607,6 @@
// Only update histograms after process threads have been shut down, so that
// they won't try to concurrently update stats.
if (first_sent_packet_ms) {
- rtc::CritScope lock(&bitrate_crit_);
UpdateSendHistograms(*first_sent_packet_ms);
}
@@ -616,7 +615,7 @@
}
void Call::RegisterRateObserver() {
- RTC_DCHECK_RUN_ON(&configuration_sequence_checker_);
+ RTC_DCHECK_RUN_ON(worker_thread_);
if (is_target_rate_observer_registered_)
return;
@@ -631,7 +630,7 @@
}
void Call::SetClientBitratePreferences(const BitrateSettings& preferences) {
- RTC_DCHECK_RUN_ON(&configuration_sequence_checker_);
+ RTC_DCHECK_RUN_ON(worker_thread_);
GetTransportControllerSend()->SetClientBitratePreferences(preferences);
}
@@ -713,14 +712,14 @@
}
PacketReceiver* Call::Receiver() {
- RTC_DCHECK_RUN_ON(&configuration_sequence_checker_);
+ RTC_DCHECK_RUN_ON(worker_thread_);
return this;
}
webrtc::AudioSendStream* Call::CreateAudioSendStream(
const webrtc::AudioSendStream::Config& config) {
TRACE_EVENT0("webrtc", "Call::CreateAudioSendStream");
- RTC_DCHECK_RUN_ON(&configuration_sequence_checker_);
+ RTC_DCHECK_RUN_ON(worker_thread_);
RegisterRateObserver();
@@ -739,12 +738,9 @@
module_process_thread_->process_thread(), transport_send_ptr_,
bitrate_allocator_.get(), event_log_, call_stats_->AsRtcpRttStats(),
suspended_rtp_state);
- {
- WriteLockScoped write_lock(*send_crit_);
- RTC_DCHECK(audio_send_ssrcs_.find(config.rtp.ssrc) ==
- audio_send_ssrcs_.end());
- audio_send_ssrcs_[config.rtp.ssrc] = send_stream;
- }
+ RTC_DCHECK(audio_send_ssrcs_.find(config.rtp.ssrc) ==
+ audio_send_ssrcs_.end());
+ audio_send_ssrcs_[config.rtp.ssrc] = send_stream;
for (AudioReceiveStream* stream : audio_receive_streams_) {
if (stream->config().rtp.local_ssrc == config.rtp.ssrc) {
@@ -758,7 +754,7 @@
void Call::DestroyAudioSendStream(webrtc::AudioSendStream* send_stream) {
TRACE_EVENT0("webrtc", "Call::DestroyAudioSendStream");
- RTC_DCHECK_RUN_ON(&configuration_sequence_checker_);
+ RTC_DCHECK_RUN_ON(worker_thread_);
RTC_DCHECK(send_stream != nullptr);
send_stream->Stop();
@@ -767,11 +763,9 @@
webrtc::internal::AudioSendStream* audio_send_stream =
static_cast<webrtc::internal::AudioSendStream*>(send_stream);
suspended_audio_send_ssrcs_[ssrc] = audio_send_stream->GetRtpState();
- {
- WriteLockScoped write_lock(*send_crit_);
- size_t num_deleted = audio_send_ssrcs_.erase(ssrc);
- RTC_DCHECK_EQ(1, num_deleted);
- }
+
+ size_t num_deleted = audio_send_ssrcs_.erase(ssrc);
+ RTC_DCHECK_EQ(1, num_deleted);
for (AudioReceiveStream* stream : audio_receive_streams_) {
if (stream->config().rtp.local_ssrc == ssrc) {
@@ -786,7 +780,7 @@
webrtc::AudioReceiveStream* Call::CreateAudioReceiveStream(
const webrtc::AudioReceiveStream::Config& config) {
TRACE_EVENT0("webrtc", "Call::CreateAudioReceiveStream");
- RTC_DCHECK_RUN_ON(&configuration_sequence_checker_);
+ RTC_DCHECK_RUN_ON(worker_thread_);
RegisterRateObserver();
event_log_->Log(std::make_unique<RtcEventAudioReceiveStreamConfig>(
CreateRtcLogStreamConfig(config)));
@@ -800,13 +794,11 @@
ConfigureSync(config.sync_group);
- {
- ReadLockScoped read_lock(*send_crit_);
- auto it = audio_send_ssrcs_.find(config.rtp.local_ssrc);
- if (it != audio_send_ssrcs_.end()) {
- receive_stream->AssociateSendStream(it->second);
- }
+ auto it = audio_send_ssrcs_.find(config.rtp.local_ssrc);
+ if (it != audio_send_ssrcs_.end()) {
+ receive_stream->AssociateSendStream(it->second);
}
+
UpdateAggregateNetworkState();
return receive_stream;
}
@@ -814,7 +806,7 @@
void Call::DestroyAudioReceiveStream(
webrtc::AudioReceiveStream* receive_stream) {
TRACE_EVENT0("webrtc", "Call::DestroyAudioReceiveStream");
- RTC_DCHECK_RUN_ON(&configuration_sequence_checker_);
+ RTC_DCHECK_RUN_ON(worker_thread_);
RTC_DCHECK(receive_stream != nullptr);
webrtc::internal::AudioReceiveStream* audio_receive_stream =
static_cast<webrtc::internal::AudioReceiveStream*>(receive_stream);
@@ -842,7 +834,7 @@
VideoEncoderConfig encoder_config,
std::unique_ptr<FecController> fec_controller) {
TRACE_EVENT0("webrtc", "Call::CreateVideoSendStream");
- RTC_DCHECK_RUN_ON(&configuration_sequence_checker_);
+ RTC_DCHECK_RUN_ON(worker_thread_);
RegisterRateObserver();
@@ -865,14 +857,12 @@
std::move(config), std::move(encoder_config), suspended_video_send_ssrcs_,
suspended_video_payload_states_, std::move(fec_controller));
- {
- WriteLockScoped write_lock(*send_crit_);
- for (uint32_t ssrc : ssrcs) {
- RTC_DCHECK(video_send_ssrcs_.find(ssrc) == video_send_ssrcs_.end());
- video_send_ssrcs_[ssrc] = send_stream;
- }
- video_send_streams_.insert(send_stream);
+ for (uint32_t ssrc : ssrcs) {
+ RTC_DCHECK(video_send_ssrcs_.find(ssrc) == video_send_ssrcs_.end());
+ video_send_ssrcs_[ssrc] = send_stream;
}
+ video_send_streams_.insert(send_stream);
+
UpdateAggregateNetworkState();
return send_stream;
@@ -895,24 +885,23 @@
void Call::DestroyVideoSendStream(webrtc::VideoSendStream* send_stream) {
TRACE_EVENT0("webrtc", "Call::DestroyVideoSendStream");
RTC_DCHECK(send_stream != nullptr);
- RTC_DCHECK_RUN_ON(&configuration_sequence_checker_);
+ RTC_DCHECK_RUN_ON(worker_thread_);
send_stream->Stop();
VideoSendStream* send_stream_impl = nullptr;
- {
- WriteLockScoped write_lock(*send_crit_);
- auto it = video_send_ssrcs_.begin();
- while (it != video_send_ssrcs_.end()) {
- if (it->second == static_cast<VideoSendStream*>(send_stream)) {
- send_stream_impl = it->second;
- video_send_ssrcs_.erase(it++);
- } else {
- ++it;
- }
+
+ auto it = video_send_ssrcs_.begin();
+ while (it != video_send_ssrcs_.end()) {
+ if (it->second == static_cast<VideoSendStream*>(send_stream)) {
+ send_stream_impl = it->second;
+ video_send_ssrcs_.erase(it++);
+ } else {
+ ++it;
}
- video_send_streams_.erase(send_stream_impl);
}
+ video_send_streams_.erase(send_stream_impl);
+
RTC_CHECK(send_stream_impl != nullptr);
VideoSendStream::RtpStateMap rtp_states;
@@ -933,7 +922,7 @@
webrtc::VideoReceiveStream* Call::CreateVideoReceiveStream(
webrtc::VideoReceiveStream::Config configuration) {
TRACE_EVENT0("webrtc", "Call::CreateVideoReceiveStream");
- RTC_DCHECK_RUN_ON(&configuration_sequence_checker_);
+ RTC_DCHECK_RUN_ON(worker_thread_);
receive_side_cc_.SetSendPeriodicFeedback(
SendPeriodicFeedback(configuration.rtp.extensions));
@@ -970,7 +959,7 @@
void Call::DestroyVideoReceiveStream(
webrtc::VideoReceiveStream* receive_stream) {
TRACE_EVENT0("webrtc", "Call::DestroyVideoReceiveStream");
- RTC_DCHECK_RUN_ON(&configuration_sequence_checker_);
+ RTC_DCHECK_RUN_ON(worker_thread_);
RTC_DCHECK(receive_stream != nullptr);
VideoReceiveStream2* receive_stream_impl =
static_cast<VideoReceiveStream2*>(receive_stream);
@@ -995,7 +984,7 @@
FlexfecReceiveStream* Call::CreateFlexfecReceiveStream(
const FlexfecReceiveStream::Config& config) {
TRACE_EVENT0("webrtc", "Call::CreateFlexfecReceiveStream");
- RTC_DCHECK_RUN_ON(&configuration_sequence_checker_);
+ RTC_DCHECK_RUN_ON(worker_thread_);
RecoveredPacketReceiver* recovered_packet_receiver = this;
@@ -1022,7 +1011,7 @@
void Call::DestroyFlexfecReceiveStream(FlexfecReceiveStream* receive_stream) {
TRACE_EVENT0("webrtc", "Call::DestroyFlexfecReceiveStream");
- RTC_DCHECK_RUN_ON(&configuration_sequence_checker_);
+ RTC_DCHECK_RUN_ON(worker_thread_);
RTC_DCHECK(receive_stream != nullptr);
const FlexfecReceiveStream::Config& config = receive_stream->GetConfig();
@@ -1042,7 +1031,7 @@
}
Call::Stats Call::GetStats() const {
- RTC_DCHECK_RUN_ON(&configuration_sequence_checker_);
+ RTC_DCHECK_RUN_ON(worker_thread_);
// TODO(tommi): The following stats are managed on the process thread:
// - pacer_delay_ms (PacedSender::Process)
@@ -1066,22 +1055,14 @@
receive_side_cc_.GetRemoteBitrateEstimator(false)->LatestEstimate(
&ssrcs, &recv_bandwidth);
stats.recv_bandwidth_bps = recv_bandwidth;
-
- {
- rtc::CritScope cs(&last_bandwidth_bps_crit_);
- stats.send_bandwidth_bps = last_bandwidth_bps_;
- }
-
- {
- rtc::CritScope cs(&bitrate_crit_);
- stats.max_padding_bitrate_bps = configured_max_padding_bitrate_bps_;
- }
+ stats.send_bandwidth_bps = last_bandwidth_bps_;
+ stats.max_padding_bitrate_bps = configured_max_padding_bitrate_bps_;
return stats;
}
void Call::SignalChannelNetworkState(MediaType media, NetworkState state) {
- RTC_DCHECK_RUN_ON(&configuration_sequence_checker_);
+ RTC_DCHECK_RUN_ON(worker_thread_);
switch (media) {
case MediaType::AUDIO:
audio_network_state_ = state;
@@ -1102,30 +1083,19 @@
}
void Call::OnAudioTransportOverheadChanged(int transport_overhead_per_packet) {
- ReadLockScoped read_lock(*send_crit_);
+ RTC_DCHECK_RUN_ON(worker_thread_);
for (auto& kv : audio_send_ssrcs_) {
kv.second->SetTransportOverhead(transport_overhead_per_packet);
}
}
void Call::UpdateAggregateNetworkState() {
- RTC_DCHECK_RUN_ON(&configuration_sequence_checker_);
+ RTC_DCHECK_RUN_ON(worker_thread_);
- bool have_audio = false;
- bool have_video = false;
- {
- ReadLockScoped read_lock(*send_crit_);
- if (!audio_send_ssrcs_.empty())
- have_audio = true;
- if (!video_send_ssrcs_.empty())
- have_video = true;
- }
-
- if (!audio_receive_streams_.empty())
- have_audio = true;
-
- if (!video_receive_streams_.empty())
- have_video = true;
+ bool have_audio =
+ !audio_send_ssrcs_.empty() || !audio_receive_streams_.empty();
+ bool have_video =
+ !video_send_ssrcs_.empty() || !video_receive_streams_.empty();
bool aggregate_network_up =
((have_video && video_network_state_ == kNetworkUp) ||
@@ -1159,42 +1129,32 @@
void Call::OnTargetTransferRate(TargetTransferRate msg) {
RTC_DCHECK(network_queue()->IsCurrent());
RTC_DCHECK_RUN_ON(&network_sequence_checker_);
- {
- rtc::CritScope cs(&last_bandwidth_bps_crit_);
- last_bandwidth_bps_ = msg.target_rate.bps();
- }
uint32_t target_bitrate_bps = msg.target_rate.bps();
// For controlling the rate of feedback messages.
receive_side_cc_.OnBitrateChanged(target_bitrate_bps);
bitrate_allocator_->OnNetworkEstimateChanged(msg);
- // Ignore updates if bitrate is zero (the aggregate network state is down).
- if (target_bitrate_bps == 0) {
- rtc::CritScope lock(&bitrate_crit_);
- estimated_send_bitrate_kbps_counter_.ProcessAndPause();
- pacer_bitrate_kbps_counter_.ProcessAndPause();
- return;
- }
+ worker_thread_->PostTask(
+ ToQueuedTask(task_safety_, [this, target_bitrate_bps]() {
+ RTC_DCHECK_RUN_ON(worker_thread_);
+ last_bandwidth_bps_ = target_bitrate_bps;
- bool sending_video;
- {
- ReadLockScoped read_lock(*send_crit_);
- sending_video = !video_send_streams_.empty();
- }
+ // Ignore updates if bitrate is zero (the aggregate network state is
+ // down) or if we're not sending video.
+ if (target_bitrate_bps == 0 || video_send_streams_.empty()) {
+ estimated_send_bitrate_kbps_counter_.ProcessAndPause();
+ pacer_bitrate_kbps_counter_.ProcessAndPause();
+ return;
+ }
- rtc::CritScope lock(&bitrate_crit_);
- if (!sending_video) {
- // Do not update the stats if we are not sending video.
- estimated_send_bitrate_kbps_counter_.ProcessAndPause();
- pacer_bitrate_kbps_counter_.ProcessAndPause();
- return;
- }
- estimated_send_bitrate_kbps_counter_.Add(target_bitrate_bps / 1000);
- // Pacer bitrate may be higher than bitrate estimate if enforcing min bitrate.
- uint32_t pacer_bitrate_bps =
- std::max(target_bitrate_bps, min_allocated_send_bitrate_bps_);
- pacer_bitrate_kbps_counter_.Add(pacer_bitrate_bps / 1000);
+ estimated_send_bitrate_kbps_counter_.Add(target_bitrate_bps / 1000);
+ // Pacer bitrate may be higher than bitrate estimate if enforcing min
+ // bitrate.
+ uint32_t pacer_bitrate_bps =
+ std::max(target_bitrate_bps, min_allocated_send_bitrate_bps_);
+ pacer_bitrate_kbps_counter_.Add(pacer_bitrate_bps / 1000);
+ }));
}
void Call::OnAllocationLimitsChanged(BitrateAllocationLimits limits) {
@@ -1203,10 +1163,11 @@
transport_send_ptr_->SetAllocatedSendBitrateLimits(limits);
- min_allocated_send_bitrate_bps_ = limits.min_allocatable_rate.bps();
-
- rtc::CritScope lock(&bitrate_crit_);
- configured_max_padding_bitrate_bps_ = limits.max_padding_rate.bps();
+ worker_thread_->PostTask(ToQueuedTask(task_safety_, [this, limits]() {
+ RTC_DCHECK_RUN_ON(worker_thread_);
+ min_allocated_send_bitrate_bps_ = limits.min_allocatable_rate.bps();
+ configured_max_padding_bitrate_bps_ = limits.max_padding_rate.bps();
+ }));
}
void Call::ConfigureSync(const std::string& sync_group) {
@@ -1285,14 +1246,12 @@
}
}
if (media_type == MediaType::ANY || media_type == MediaType::VIDEO) {
- ReadLockScoped read_lock(*send_crit_);
for (VideoSendStream* stream : video_send_streams_) {
stream->DeliverRtcp(packet, length);
rtcp_delivered = true;
}
}
if (media_type == MediaType::ANY || media_type == MediaType::AUDIO) {
- ReadLockScoped read_lock(*send_crit_);
for (auto& kv : audio_send_ssrcs_) {
kv.second->DeliverRtcp(packet, length);
rtcp_delivered = true;
@@ -1341,7 +1300,7 @@
RTC_LOG(LS_ERROR) << "receive_rtp_config_ lookup failed for ssrc "
<< parsed_packet.Ssrc();
// Destruction of the receive stream, including deregistering from the
- // RtpDemuxer, is not protected by the |configuration_sequence_checker_|.
+ // RtpDemuxer, is not protected by the |worker_thread_|.
// But deregistering in the |receive_rtp_config_| map is. So by not passing
// the packet on to demuxing in this case, we prevent incoming packets to be
// passed on via the demuxer to a receive stream which is being torned down.
@@ -1390,7 +1349,7 @@
MediaType media_type,
rtc::CopyOnWriteBuffer packet,
int64_t packet_time_us) {
- RTC_DCHECK_RUN_ON(&configuration_sequence_checker_);
+ RTC_DCHECK_RUN_ON(worker_thread_);
if (IsRtcp(packet.cdata(), packet.size()))
return DeliverRtcp(media_type, packet.cdata(), packet.size());
@@ -1398,7 +1357,7 @@
}
void Call::OnRecoveredPacket(const uint8_t* packet, size_t length) {
- RTC_DCHECK_RUN_ON(&configuration_sequence_checker_);
+ RTC_DCHECK_RUN_ON(worker_thread_);
RtpPacketReceived parsed_packet;
if (!parsed_packet.Parse(packet, length))
return;
@@ -1410,7 +1369,7 @@
RTC_LOG(LS_ERROR) << "receive_rtp_config_ lookup failed for ssrc "
<< parsed_packet.Ssrc();
// Destruction of the receive stream, including deregistering from the
- // RtpDemuxer, is not protected by the |configuration_sequence_checker_|.
+ // RtpDemuxer, is not protected by the |worker_thread_|.
// But deregistering in the |receive_rtp_config_| map is.
// So by not passing the packet on to demuxing in this case, we prevent
// incoming packets to be passed on via the demuxer to a receive stream