[Adaptation] Make ResourceAdaptationProcessorInterface resources thread-safe
This is one less dependency on the task queue, and will make
things like removing resources and cleanup much easier in the future.
Bug: webrtc:11754
Change-Id: I732f1935d1b58ffe09ca2a2bf59beebc1930214d
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/178869
Commit-Queue: Evan Shrubsole <eshr@google.com>
Reviewed-by: Henrik Boström <hbos@webrtc.org>
Reviewed-by: Ilya Nikolaevskiy <ilnik@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#31686}
diff --git a/call/adaptation/resource_adaptation_processor.cc b/call/adaptation/resource_adaptation_processor.cc
index bea8334..4ec643f 100644
--- a/call/adaptation/resource_adaptation_processor.cc
+++ b/call/adaptation/resource_adaptation_processor.cc
@@ -20,6 +20,7 @@
#include "rtc_base/logging.h"
#include "rtc_base/ref_counted_object.h"
#include "rtc_base/strings/string_builder.h"
+#include "rtc_base/synchronization/sequence_checker.h"
#include "rtc_base/task_utils/to_queued_task.h"
namespace webrtc {
@@ -127,38 +128,77 @@
void ResourceAdaptationProcessor::AddResource(
rtc::scoped_refptr<Resource> resource) {
- RTC_DCHECK_RUN_ON(resource_adaptation_queue_);
RTC_DCHECK(resource);
- RTC_DCHECK(absl::c_find(resources_, resource) == resources_.end())
- << "Resource \"" << resource->Name() << "\" was already registered.";
- resources_.push_back(resource);
+ {
+ MutexLock crit(&resources_lock_);
+ RTC_DCHECK(absl::c_find(resources_, resource) == resources_.end())
+ << "Resource \"" << resource->Name() << "\" was already registered.";
+ resources_.push_back(resource);
+ }
resource->SetResourceListener(resource_listener_delegate_);
}
std::vector<rtc::scoped_refptr<Resource>>
ResourceAdaptationProcessor::GetResources() const {
- RTC_DCHECK_RUN_ON(resource_adaptation_queue_);
+ MutexLock crit(&resources_lock_);
return resources_;
}
void ResourceAdaptationProcessor::RemoveResource(
rtc::scoped_refptr<Resource> resource) {
- RTC_DCHECK_RUN_ON(resource_adaptation_queue_);
RTC_DCHECK(resource);
RTC_LOG(INFO) << "Removing resource \"" << resource->Name() << "\".";
- auto it = absl::c_find(resources_, resource);
- RTC_DCHECK(it != resources_.end()) << "Resource \"" << resource->Name()
- << "\" was not a registered resource.";
+ resource->SetResourceListener(nullptr);
+ {
+ MutexLock crit(&resources_lock_);
+ auto it = absl::c_find(resources_, resource);
+ RTC_DCHECK(it != resources_.end()) << "Resource \"" << resource->Name()
+ << "\" was not a registered resource.";
+ resources_.erase(it);
+ }
+ RemoveLimitationsImposedByResource(std::move(resource));
+}
+
+void ResourceAdaptationProcessor::RemoveLimitationsImposedByResource(
+ rtc::scoped_refptr<Resource> resource) {
+ if (!resource_adaptation_queue_->IsCurrent()) {
+ resource_adaptation_queue_->PostTask(ToQueuedTask(
+ [this, resource]() { RemoveLimitationsImposedByResource(resource); }));
+ return;
+ }
+ RTC_DCHECK_RUN_ON(resource_adaptation_queue_);
auto resource_adaptation_limits =
adaptation_limits_by_resources_.find(resource);
if (resource_adaptation_limits != adaptation_limits_by_resources_.end()) {
VideoStreamAdapter::RestrictionsWithCounters adaptation_limits =
resource_adaptation_limits->second;
adaptation_limits_by_resources_.erase(resource_adaptation_limits);
- MaybeUpdateResourceLimitationsOnResourceRemoval(adaptation_limits);
+ if (adaptation_limits_by_resources_.empty()) {
+ // Only the resource being removed was adapted so clear restrictions.
+ stream_adapter_->ClearRestrictions();
+ return;
+ }
+
+ VideoStreamAdapter::RestrictionsWithCounters most_limited =
+ FindMostLimitedResources().second;
+
+ if (adaptation_limits.counters.Total() <= most_limited.counters.Total()) {
+ // The removed limitations were less limited than the most limited
+ // resource. Don't change the current restrictions.
+ return;
+ }
+
+ // Apply the new most limited resource as the next restrictions.
+ Adaptation adapt_to = stream_adapter_->GetAdaptationTo(
+ most_limited.counters, most_limited.restrictions);
+ RTC_DCHECK_EQ(adapt_to.status(), Adaptation::Status::kValid);
+ stream_adapter_->ApplyAdaptation(adapt_to, nullptr);
+
+ RTC_LOG(INFO) << "Most limited resource removed. Restoring restrictions to "
+ "next most limited restrictions: "
+ << most_limited.restrictions.ToString() << " with counters "
+ << most_limited.counters.ToString();
}
- resources_.erase(it);
- resource->SetResourceListener(nullptr);
}
void ResourceAdaptationProcessor::AddAdaptationConstraint(
@@ -185,10 +225,13 @@
RTC_DCHECK_RUN_ON(resource_adaptation_queue_);
RTC_DCHECK(resource);
// |resource| could have been removed after signalling.
- if (absl::c_find(resources_, resource) == resources_.end()) {
- RTC_LOG(INFO) << "Ignoring signal from removed resource \""
- << resource->Name() << "\".";
- return;
+ {
+ MutexLock crit(&resources_lock_);
+ if (absl::c_find(resources_, resource) == resources_.end()) {
+ RTC_LOG(INFO) << "Ignoring signal from removed resource \""
+ << resource->Name() << "\".";
+ return;
+ }
}
MitigationResultAndLogMessage result_and_message;
switch (usage_state) {
@@ -372,36 +415,6 @@
}
}
-void ResourceAdaptationProcessor::
- MaybeUpdateResourceLimitationsOnResourceRemoval(
- VideoStreamAdapter::RestrictionsWithCounters removed_limitations) {
- if (adaptation_limits_by_resources_.empty()) {
- // Only the resource being removed was adapted so clear restrictions.
- stream_adapter_->ClearRestrictions();
- return;
- }
-
- VideoStreamAdapter::RestrictionsWithCounters most_limited =
- FindMostLimitedResources().second;
-
- if (removed_limitations.counters.Total() <= most_limited.counters.Total()) {
- // The removed limitations were less limited than the most limited resource.
- // Don't change the current restrictions.
- return;
- }
-
- // Apply the new most limited resource as the next restrictions.
- Adaptation adapt_to = stream_adapter_->GetAdaptationTo(
- most_limited.counters, most_limited.restrictions);
- RTC_DCHECK_EQ(adapt_to.status(), Adaptation::Status::kValid);
- stream_adapter_->ApplyAdaptation(adapt_to, nullptr);
-
- RTC_LOG(INFO) << "Most limited resource removed. Restoring restrictions to "
- "next most limited restrictions: "
- << most_limited.restrictions.ToString() << " with counters "
- << most_limited.counters.ToString();
-}
-
void ResourceAdaptationProcessor::OnVideoSourceRestrictionsUpdated(
VideoSourceRestrictions restrictions,
const VideoAdaptationCounters& adaptation_counters,
diff --git a/call/adaptation/resource_adaptation_processor.h b/call/adaptation/resource_adaptation_processor.h
index 9f20bdb..c10b3f6 100644
--- a/call/adaptation/resource_adaptation_processor.h
+++ b/call/adaptation/resource_adaptation_processor.h
@@ -49,7 +49,8 @@
//
// The ResourceAdaptationProcessor is single-threaded. It may be constructed on
// any thread but MUST subsequently be used and destroyed on a single sequence,
-// i.e. the "resource adaptation task queue".
+// i.e. the "resource adaptation task queue". Resources can be added and removed
+// from any thread.
class ResourceAdaptationProcessor : public ResourceAdaptationProcessorInterface,
public VideoSourceRestrictionsListener,
public ResourceListener {
@@ -146,18 +147,18 @@
VideoStreamAdapter::RestrictionsWithCounters>
FindMostLimitedResources() const RTC_RUN_ON(resource_adaptation_queue_);
- void MaybeUpdateResourceLimitationsOnResourceRemoval(
- VideoStreamAdapter::RestrictionsWithCounters removed_limitations)
- RTC_RUN_ON(resource_adaptation_queue_);
+ void RemoveLimitationsImposedByResource(
+ rtc::scoped_refptr<Resource> resource);
TaskQueueBase* resource_adaptation_queue_;
rtc::scoped_refptr<ResourceListenerDelegate> resource_listener_delegate_;
// Input and output.
VideoStreamEncoderObserver* const encoder_stats_observer_
RTC_GUARDED_BY(resource_adaptation_queue_);
- std::vector<ResourceLimitationsListener*> resource_limitations_listeners_
- RTC_GUARDED_BY(resource_adaptation_queue_);
+ mutable Mutex resources_lock_;
std::vector<rtc::scoped_refptr<Resource>> resources_
+ RTC_GUARDED_BY(resources_lock_);
+ std::vector<ResourceLimitationsListener*> resource_limitations_listeners_
RTC_GUARDED_BY(resource_adaptation_queue_);
std::vector<AdaptationConstraint*> adaptation_constraints_
RTC_GUARDED_BY(resource_adaptation_queue_);
diff --git a/call/adaptation/resource_adaptation_processor_interface.h b/call/adaptation/resource_adaptation_processor_interface.h
index fe500c9..59d2323 100644
--- a/call/adaptation/resource_adaptation_processor_interface.h
+++ b/call/adaptation/resource_adaptation_processor_interface.h
@@ -51,16 +51,16 @@
virtual void SetResourceAdaptationQueue(
TaskQueueBase* resource_adaptation_queue) = 0;
- // Starts or stops listening to resources, effectively enabling or disabling
- // processing.
- // TODO(https://crbug.com/webrtc/11172): Automatically register and unregister
- // with AddResource() and RemoveResource() instead. When the processor is
- // multi-stream aware, stream-specific resouces will get added and removed
- // over time.
virtual void AddResourceLimitationsListener(
ResourceLimitationsListener* limitations_listener) = 0;
virtual void RemoveResourceLimitationsListener(
ResourceLimitationsListener* limitations_listener) = 0;
+ // Starts or stops listening to resources, effectively enabling or disabling
+ // processing. May be called from anywhere.
+ // TODO(https://crbug.com/webrtc/11172): Automatically register and unregister
+ // with AddResource() and RemoveResource() instead. When the processor is
+ // multi-stream aware, stream-specific resouces will get added and removed
+ // over time.
virtual void AddResource(rtc::scoped_refptr<Resource> resource) = 0;
virtual std::vector<rtc::scoped_refptr<Resource>> GetResources() const = 0;
virtual void RemoveResource(rtc::scoped_refptr<Resource> resource) = 0;
diff --git a/video/adaptation/video_stream_encoder_resource.cc b/video/adaptation/video_stream_encoder_resource.cc
index fae3355..df76df4 100644
--- a/video/adaptation/video_stream_encoder_resource.cc
+++ b/video/adaptation/video_stream_encoder_resource.cc
@@ -52,9 +52,9 @@
void VideoStreamEncoderResource::SetResourceListener(
ResourceListener* listener) {
- RTC_DCHECK_RUN_ON(resource_adaptation_queue());
// If you want to change listener you need to unregister the old listener by
// setting it to null first.
+ MutexLock crit(&listener_lock_);
RTC_DCHECK(!listener_ || !listener) << "A listener is already set";
listener_ = listener;
}
@@ -65,7 +65,7 @@
void VideoStreamEncoderResource::OnResourceUsageStateMeasured(
ResourceUsageState usage_state) {
- RTC_DCHECK_RUN_ON(resource_adaptation_queue());
+ MutexLock crit(&listener_lock_);
if (listener_) {
listener_->OnResourceUsageStateMeasured(this, usage_state);
}
diff --git a/video/adaptation/video_stream_encoder_resource.h b/video/adaptation/video_stream_encoder_resource.h
index 6ff314f..08994c1 100644
--- a/video/adaptation/video_stream_encoder_resource.h
+++ b/video/adaptation/video_stream_encoder_resource.h
@@ -72,7 +72,8 @@
// Treated as const after initialization.
TaskQueueBase* encoder_queue_;
TaskQueueBase* resource_adaptation_queue_ RTC_GUARDED_BY(lock_);
- ResourceListener* listener_ RTC_GUARDED_BY(resource_adaptation_queue());
+ mutable Mutex listener_lock_;
+ ResourceListener* listener_ RTC_GUARDED_BY(listener_lock_);
};
} // namespace webrtc
diff --git a/video/video_stream_encoder.cc b/video/video_stream_encoder.cc
index 255405e..9776e06 100644
--- a/video/video_stream_encoder.cc
+++ b/video/video_stream_encoder.cc
@@ -410,14 +410,19 @@
RTC_DCHECK_RUN_ON(&thread_checker_);
video_source_sink_controller_.SetSource(nullptr);
+ if (resource_adaptation_processor_) {
+ for (auto& resource : stream_resource_manager_.MappedResources()) {
+ resource_adaptation_processor_->RemoveResource(resource);
+ }
+ }
rtc::Event shutdown_adaptation_processor_event;
resource_adaptation_queue_.PostTask([this,
&shutdown_adaptation_processor_event] {
RTC_DCHECK_RUN_ON(&resource_adaptation_queue_);
if (resource_adaptation_processor_) {
- for (auto& resource : stream_resource_manager_.MappedResources()) {
- resource_adaptation_processor_->RemoveResource(resource);
- }
+ // Removed on the resource_adaptaiton_processor_ queue because the
+ // adaptation_constraints_ and adaptation_listeners_ fields are guarded by
+ // this queue.
for (auto* constraint : adaptation_constraints_) {
resource_adaptation_processor_->RemoveAdaptationConstraint(constraint);
}
@@ -479,41 +484,15 @@
RTC_DCHECK_RUN_ON(&encoder_queue_);
stream_resource_manager_.MapResourceToReason(resource,
VideoAdaptationReason::kCpu);
+ resource_adaptation_processor_->AddResource(resource);
map_resource_event.Set();
});
map_resource_event.Wait(rtc::Event::kForever);
-
- // Add the resource to the processor.
- rtc::Event add_resource_event;
- resource_adaptation_queue_.PostTask([this, resource, &add_resource_event] {
- RTC_DCHECK_RUN_ON(&resource_adaptation_queue_);
- if (!resource_adaptation_processor_) {
- // The VideoStreamEncoder was stopped and the processor destroyed before
- // this task had a chance to execute. No action needed.
- return;
- }
- resource_adaptation_processor_->AddResource(resource);
- add_resource_event.Set();
- });
- add_resource_event.Wait(rtc::Event::kForever);
}
std::vector<rtc::scoped_refptr<Resource>>
VideoStreamEncoder::GetAdaptationResources() {
- std::vector<rtc::scoped_refptr<Resource>> resources;
- rtc::Event event;
- resource_adaptation_queue_.PostTask([this, &resources, &event] {
- RTC_DCHECK_RUN_ON(&resource_adaptation_queue_);
- if (!resource_adaptation_processor_) {
- // The VideoStreamEncoder was stopped and the processor destroyed before
- // this task had a chance to execute. No action needed.
- return;
- }
- resources = resource_adaptation_processor_->GetResources();
- event.Set();
- });
- event.Wait(rtc::Event::kForever);
- return resources;
+ return resource_adaptation_processor_->GetResources();
}
void VideoStreamEncoder::SetSource(
@@ -2115,22 +2094,10 @@
encoder_queue_.PostTask([this, resource, reason, &map_resource_event] {
RTC_DCHECK_RUN_ON(&encoder_queue_);
stream_resource_manager_.MapResourceToReason(resource, reason);
+ resource_adaptation_processor_->AddResource(resource);
map_resource_event.Set();
});
map_resource_event.Wait(rtc::Event::kForever);
-
- rtc::Event add_resource_event;
- resource_adaptation_queue_.PostTask([this, resource, &add_resource_event] {
- RTC_DCHECK_RUN_ON(&resource_adaptation_queue_);
- if (!resource_adaptation_processor_) {
- // The VideoStreamEncoder was stopped and the processor destroyed before
- // this task had a chance to execute. No action needed.
- return;
- }
- resource_adaptation_processor_->AddResource(resource);
- add_resource_event.Set();
- });
- add_resource_event.Wait(rtc::Event::kForever);
}
void VideoStreamEncoder::InjectAdaptationConstraint(
diff --git a/video/video_stream_encoder.h b/video/video_stream_encoder.h
index 95d4dcb..bd1593d 100644
--- a/video/video_stream_encoder.h
+++ b/video/video_stream_encoder.h
@@ -416,9 +416,10 @@
RTC_GUARDED_BY(&resource_adaptation_queue_);
// Responsible for adapting input resolution or frame rate to ensure resources
// (e.g. CPU or bandwidth) are not overused.
- // This class is single-threaded on the resource adaptation queue.
- std::unique_ptr<ResourceAdaptationProcessor> resource_adaptation_processor_
- RTC_GUARDED_BY(&resource_adaptation_queue_);
+ // Adding resources can occur on any thread, but all other methods need to be
+ // called on the adaptation thread.
+ std::unique_ptr<ResourceAdaptationProcessorInterface>
+ resource_adaptation_processor_;
std::unique_ptr<DegradationPreferenceManager> degradation_preference_manager_;
std::vector<AdaptationConstraint*> adaptation_constraints_
RTC_GUARDED_BY(&resource_adaptation_queue_);