[Adaptation] Make ResourceAdaptationProcessorInterface resources thread-safe

This is one less dependency on the task queue, and will make
things like removing resources and cleanup much easier in the future.

Bug: webrtc:11754
Change-Id: I732f1935d1b58ffe09ca2a2bf59beebc1930214d
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/178869
Commit-Queue: Evan Shrubsole <eshr@google.com>
Reviewed-by: Henrik Boström <hbos@webrtc.org>
Reviewed-by: Ilya Nikolaevskiy <ilnik@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#31686}
diff --git a/call/adaptation/resource_adaptation_processor.cc b/call/adaptation/resource_adaptation_processor.cc
index bea8334..4ec643f 100644
--- a/call/adaptation/resource_adaptation_processor.cc
+++ b/call/adaptation/resource_adaptation_processor.cc
@@ -20,6 +20,7 @@
 #include "rtc_base/logging.h"
 #include "rtc_base/ref_counted_object.h"
 #include "rtc_base/strings/string_builder.h"
+#include "rtc_base/synchronization/sequence_checker.h"
 #include "rtc_base/task_utils/to_queued_task.h"
 
 namespace webrtc {
@@ -127,38 +128,77 @@
 
 void ResourceAdaptationProcessor::AddResource(
     rtc::scoped_refptr<Resource> resource) {
-  RTC_DCHECK_RUN_ON(resource_adaptation_queue_);
   RTC_DCHECK(resource);
-  RTC_DCHECK(absl::c_find(resources_, resource) == resources_.end())
-      << "Resource \"" << resource->Name() << "\" was already registered.";
-  resources_.push_back(resource);
+  {
+    MutexLock crit(&resources_lock_);
+    RTC_DCHECK(absl::c_find(resources_, resource) == resources_.end())
+        << "Resource \"" << resource->Name() << "\" was already registered.";
+    resources_.push_back(resource);
+  }
   resource->SetResourceListener(resource_listener_delegate_);
 }
 
 std::vector<rtc::scoped_refptr<Resource>>
 ResourceAdaptationProcessor::GetResources() const {
-  RTC_DCHECK_RUN_ON(resource_adaptation_queue_);
+  MutexLock crit(&resources_lock_);
   return resources_;
 }
 
 void ResourceAdaptationProcessor::RemoveResource(
     rtc::scoped_refptr<Resource> resource) {
-  RTC_DCHECK_RUN_ON(resource_adaptation_queue_);
   RTC_DCHECK(resource);
   RTC_LOG(INFO) << "Removing resource \"" << resource->Name() << "\".";
-  auto it = absl::c_find(resources_, resource);
-  RTC_DCHECK(it != resources_.end()) << "Resource \"" << resource->Name()
-                                     << "\" was not a registered resource.";
+  resource->SetResourceListener(nullptr);
+  {
+    MutexLock crit(&resources_lock_);
+    auto it = absl::c_find(resources_, resource);
+    RTC_DCHECK(it != resources_.end()) << "Resource \"" << resource->Name()
+                                       << "\" was not a registered resource.";
+    resources_.erase(it);
+  }
+  RemoveLimitationsImposedByResource(std::move(resource));
+}
+
+void ResourceAdaptationProcessor::RemoveLimitationsImposedByResource(
+    rtc::scoped_refptr<Resource> resource) {
+  if (!resource_adaptation_queue_->IsCurrent()) {
+    resource_adaptation_queue_->PostTask(ToQueuedTask(
+        [this, resource]() { RemoveLimitationsImposedByResource(resource); }));
+    return;
+  }
+  RTC_DCHECK_RUN_ON(resource_adaptation_queue_);
   auto resource_adaptation_limits =
       adaptation_limits_by_resources_.find(resource);
   if (resource_adaptation_limits != adaptation_limits_by_resources_.end()) {
     VideoStreamAdapter::RestrictionsWithCounters adaptation_limits =
         resource_adaptation_limits->second;
     adaptation_limits_by_resources_.erase(resource_adaptation_limits);
-    MaybeUpdateResourceLimitationsOnResourceRemoval(adaptation_limits);
+    if (adaptation_limits_by_resources_.empty()) {
+      // Only the resource being removed was adapted so clear restrictions.
+      stream_adapter_->ClearRestrictions();
+      return;
+    }
+
+    VideoStreamAdapter::RestrictionsWithCounters most_limited =
+        FindMostLimitedResources().second;
+
+    if (adaptation_limits.counters.Total() <= most_limited.counters.Total()) {
+      // The removed limitations were less limited than the most limited
+      // resource. Don't change the current restrictions.
+      return;
+    }
+
+    // Apply the new most limited resource as the next restrictions.
+    Adaptation adapt_to = stream_adapter_->GetAdaptationTo(
+        most_limited.counters, most_limited.restrictions);
+    RTC_DCHECK_EQ(adapt_to.status(), Adaptation::Status::kValid);
+    stream_adapter_->ApplyAdaptation(adapt_to, nullptr);
+
+    RTC_LOG(INFO) << "Most limited resource removed. Restoring restrictions to "
+                     "next most limited restrictions: "
+                  << most_limited.restrictions.ToString() << " with counters "
+                  << most_limited.counters.ToString();
   }
-  resources_.erase(it);
-  resource->SetResourceListener(nullptr);
 }
 
 void ResourceAdaptationProcessor::AddAdaptationConstraint(
@@ -185,10 +225,13 @@
   RTC_DCHECK_RUN_ON(resource_adaptation_queue_);
   RTC_DCHECK(resource);
   // |resource| could have been removed after signalling.
-  if (absl::c_find(resources_, resource) == resources_.end()) {
-    RTC_LOG(INFO) << "Ignoring signal from removed resource \""
-                  << resource->Name() << "\".";
-    return;
+  {
+    MutexLock crit(&resources_lock_);
+    if (absl::c_find(resources_, resource) == resources_.end()) {
+      RTC_LOG(INFO) << "Ignoring signal from removed resource \""
+                    << resource->Name() << "\".";
+      return;
+    }
   }
   MitigationResultAndLogMessage result_and_message;
   switch (usage_state) {
@@ -372,36 +415,6 @@
   }
 }
 
-void ResourceAdaptationProcessor::
-    MaybeUpdateResourceLimitationsOnResourceRemoval(
-        VideoStreamAdapter::RestrictionsWithCounters removed_limitations) {
-  if (adaptation_limits_by_resources_.empty()) {
-    // Only the resource being removed was adapted so clear restrictions.
-    stream_adapter_->ClearRestrictions();
-    return;
-  }
-
-  VideoStreamAdapter::RestrictionsWithCounters most_limited =
-      FindMostLimitedResources().second;
-
-  if (removed_limitations.counters.Total() <= most_limited.counters.Total()) {
-    // The removed limitations were less limited than the most limited resource.
-    // Don't change the current restrictions.
-    return;
-  }
-
-  // Apply the new most limited resource as the next restrictions.
-  Adaptation adapt_to = stream_adapter_->GetAdaptationTo(
-      most_limited.counters, most_limited.restrictions);
-  RTC_DCHECK_EQ(adapt_to.status(), Adaptation::Status::kValid);
-  stream_adapter_->ApplyAdaptation(adapt_to, nullptr);
-
-  RTC_LOG(INFO) << "Most limited resource removed. Restoring restrictions to "
-                   "next most limited restrictions: "
-                << most_limited.restrictions.ToString() << " with counters "
-                << most_limited.counters.ToString();
-}
-
 void ResourceAdaptationProcessor::OnVideoSourceRestrictionsUpdated(
     VideoSourceRestrictions restrictions,
     const VideoAdaptationCounters& adaptation_counters,
diff --git a/call/adaptation/resource_adaptation_processor.h b/call/adaptation/resource_adaptation_processor.h
index 9f20bdb..c10b3f6 100644
--- a/call/adaptation/resource_adaptation_processor.h
+++ b/call/adaptation/resource_adaptation_processor.h
@@ -49,7 +49,8 @@
 //
 // The ResourceAdaptationProcessor is single-threaded. It may be constructed on
 // any thread but MUST subsequently be used and destroyed on a single sequence,
-// i.e. the "resource adaptation task queue".
+// i.e. the "resource adaptation task queue". Resources can be added and removed
+// from any thread.
 class ResourceAdaptationProcessor : public ResourceAdaptationProcessorInterface,
                                     public VideoSourceRestrictionsListener,
                                     public ResourceListener {
@@ -146,18 +147,18 @@
             VideoStreamAdapter::RestrictionsWithCounters>
   FindMostLimitedResources() const RTC_RUN_ON(resource_adaptation_queue_);
 
-  void MaybeUpdateResourceLimitationsOnResourceRemoval(
-      VideoStreamAdapter::RestrictionsWithCounters removed_limitations)
-      RTC_RUN_ON(resource_adaptation_queue_);
+  void RemoveLimitationsImposedByResource(
+      rtc::scoped_refptr<Resource> resource);
 
   TaskQueueBase* resource_adaptation_queue_;
   rtc::scoped_refptr<ResourceListenerDelegate> resource_listener_delegate_;
   // Input and output.
   VideoStreamEncoderObserver* const encoder_stats_observer_
       RTC_GUARDED_BY(resource_adaptation_queue_);
-  std::vector<ResourceLimitationsListener*> resource_limitations_listeners_
-      RTC_GUARDED_BY(resource_adaptation_queue_);
+  mutable Mutex resources_lock_;
   std::vector<rtc::scoped_refptr<Resource>> resources_
+      RTC_GUARDED_BY(resources_lock_);
+  std::vector<ResourceLimitationsListener*> resource_limitations_listeners_
       RTC_GUARDED_BY(resource_adaptation_queue_);
   std::vector<AdaptationConstraint*> adaptation_constraints_
       RTC_GUARDED_BY(resource_adaptation_queue_);
diff --git a/call/adaptation/resource_adaptation_processor_interface.h b/call/adaptation/resource_adaptation_processor_interface.h
index fe500c9..59d2323 100644
--- a/call/adaptation/resource_adaptation_processor_interface.h
+++ b/call/adaptation/resource_adaptation_processor_interface.h
@@ -51,16 +51,16 @@
   virtual void SetResourceAdaptationQueue(
       TaskQueueBase* resource_adaptation_queue) = 0;
 
-  // Starts or stops listening to resources, effectively enabling or disabling
-  // processing.
-  // TODO(https://crbug.com/webrtc/11172): Automatically register and unregister
-  // with AddResource() and RemoveResource() instead. When the processor is
-  // multi-stream aware, stream-specific resouces will get added and removed
-  // over time.
   virtual void AddResourceLimitationsListener(
       ResourceLimitationsListener* limitations_listener) = 0;
   virtual void RemoveResourceLimitationsListener(
       ResourceLimitationsListener* limitations_listener) = 0;
+  // Starts or stops listening to resources, effectively enabling or disabling
+  // processing. May be called from anywhere.
+  // TODO(https://crbug.com/webrtc/11172): Automatically register and unregister
+  // with AddResource() and RemoveResource() instead. When the processor is
+  // multi-stream aware, stream-specific resouces will get added and removed
+  // over time.
   virtual void AddResource(rtc::scoped_refptr<Resource> resource) = 0;
   virtual std::vector<rtc::scoped_refptr<Resource>> GetResources() const = 0;
   virtual void RemoveResource(rtc::scoped_refptr<Resource> resource) = 0;
diff --git a/video/adaptation/video_stream_encoder_resource.cc b/video/adaptation/video_stream_encoder_resource.cc
index fae3355..df76df4 100644
--- a/video/adaptation/video_stream_encoder_resource.cc
+++ b/video/adaptation/video_stream_encoder_resource.cc
@@ -52,9 +52,9 @@
 
 void VideoStreamEncoderResource::SetResourceListener(
     ResourceListener* listener) {
-  RTC_DCHECK_RUN_ON(resource_adaptation_queue());
   // If you want to change listener you need to unregister the old listener by
   // setting it to null first.
+  MutexLock crit(&listener_lock_);
   RTC_DCHECK(!listener_ || !listener) << "A listener is already set";
   listener_ = listener;
 }
@@ -65,7 +65,7 @@
 
 void VideoStreamEncoderResource::OnResourceUsageStateMeasured(
     ResourceUsageState usage_state) {
-  RTC_DCHECK_RUN_ON(resource_adaptation_queue());
+  MutexLock crit(&listener_lock_);
   if (listener_) {
     listener_->OnResourceUsageStateMeasured(this, usage_state);
   }
diff --git a/video/adaptation/video_stream_encoder_resource.h b/video/adaptation/video_stream_encoder_resource.h
index 6ff314f..08994c1 100644
--- a/video/adaptation/video_stream_encoder_resource.h
+++ b/video/adaptation/video_stream_encoder_resource.h
@@ -72,7 +72,8 @@
   // Treated as const after initialization.
   TaskQueueBase* encoder_queue_;
   TaskQueueBase* resource_adaptation_queue_ RTC_GUARDED_BY(lock_);
-  ResourceListener* listener_ RTC_GUARDED_BY(resource_adaptation_queue());
+  mutable Mutex listener_lock_;
+  ResourceListener* listener_ RTC_GUARDED_BY(listener_lock_);
 };
 
 }  // namespace webrtc
diff --git a/video/video_stream_encoder.cc b/video/video_stream_encoder.cc
index 255405e..9776e06 100644
--- a/video/video_stream_encoder.cc
+++ b/video/video_stream_encoder.cc
@@ -410,14 +410,19 @@
   RTC_DCHECK_RUN_ON(&thread_checker_);
   video_source_sink_controller_.SetSource(nullptr);
 
+  if (resource_adaptation_processor_) {
+    for (auto& resource : stream_resource_manager_.MappedResources()) {
+      resource_adaptation_processor_->RemoveResource(resource);
+    }
+  }
   rtc::Event shutdown_adaptation_processor_event;
   resource_adaptation_queue_.PostTask([this,
                                        &shutdown_adaptation_processor_event] {
     RTC_DCHECK_RUN_ON(&resource_adaptation_queue_);
     if (resource_adaptation_processor_) {
-      for (auto& resource : stream_resource_manager_.MappedResources()) {
-        resource_adaptation_processor_->RemoveResource(resource);
-      }
+      // Removed on the resource_adaptaiton_processor_ queue because the
+      // adaptation_constraints_ and adaptation_listeners_ fields are guarded by
+      // this queue.
       for (auto* constraint : adaptation_constraints_) {
         resource_adaptation_processor_->RemoveAdaptationConstraint(constraint);
       }
@@ -479,41 +484,15 @@
     RTC_DCHECK_RUN_ON(&encoder_queue_);
     stream_resource_manager_.MapResourceToReason(resource,
                                                  VideoAdaptationReason::kCpu);
+    resource_adaptation_processor_->AddResource(resource);
     map_resource_event.Set();
   });
   map_resource_event.Wait(rtc::Event::kForever);
-
-  // Add the resource to the processor.
-  rtc::Event add_resource_event;
-  resource_adaptation_queue_.PostTask([this, resource, &add_resource_event] {
-    RTC_DCHECK_RUN_ON(&resource_adaptation_queue_);
-    if (!resource_adaptation_processor_) {
-      // The VideoStreamEncoder was stopped and the processor destroyed before
-      // this task had a chance to execute. No action needed.
-      return;
-    }
-    resource_adaptation_processor_->AddResource(resource);
-    add_resource_event.Set();
-  });
-  add_resource_event.Wait(rtc::Event::kForever);
 }
 
 std::vector<rtc::scoped_refptr<Resource>>
 VideoStreamEncoder::GetAdaptationResources() {
-  std::vector<rtc::scoped_refptr<Resource>> resources;
-  rtc::Event event;
-  resource_adaptation_queue_.PostTask([this, &resources, &event] {
-    RTC_DCHECK_RUN_ON(&resource_adaptation_queue_);
-    if (!resource_adaptation_processor_) {
-      // The VideoStreamEncoder was stopped and the processor destroyed before
-      // this task had a chance to execute. No action needed.
-      return;
-    }
-    resources = resource_adaptation_processor_->GetResources();
-    event.Set();
-  });
-  event.Wait(rtc::Event::kForever);
-  return resources;
+  return resource_adaptation_processor_->GetResources();
 }
 
 void VideoStreamEncoder::SetSource(
@@ -2115,22 +2094,10 @@
   encoder_queue_.PostTask([this, resource, reason, &map_resource_event] {
     RTC_DCHECK_RUN_ON(&encoder_queue_);
     stream_resource_manager_.MapResourceToReason(resource, reason);
+    resource_adaptation_processor_->AddResource(resource);
     map_resource_event.Set();
   });
   map_resource_event.Wait(rtc::Event::kForever);
-
-  rtc::Event add_resource_event;
-  resource_adaptation_queue_.PostTask([this, resource, &add_resource_event] {
-    RTC_DCHECK_RUN_ON(&resource_adaptation_queue_);
-    if (!resource_adaptation_processor_) {
-      // The VideoStreamEncoder was stopped and the processor destroyed before
-      // this task had a chance to execute. No action needed.
-      return;
-    }
-    resource_adaptation_processor_->AddResource(resource);
-    add_resource_event.Set();
-  });
-  add_resource_event.Wait(rtc::Event::kForever);
 }
 
 void VideoStreamEncoder::InjectAdaptationConstraint(
diff --git a/video/video_stream_encoder.h b/video/video_stream_encoder.h
index 95d4dcb..bd1593d 100644
--- a/video/video_stream_encoder.h
+++ b/video/video_stream_encoder.h
@@ -416,9 +416,10 @@
       RTC_GUARDED_BY(&resource_adaptation_queue_);
   // Responsible for adapting input resolution or frame rate to ensure resources
   // (e.g. CPU or bandwidth) are not overused.
-  // This class is single-threaded on the resource adaptation queue.
-  std::unique_ptr<ResourceAdaptationProcessor> resource_adaptation_processor_
-      RTC_GUARDED_BY(&resource_adaptation_queue_);
+  // Adding resources can occur on any thread, but all other methods need to be
+  // called on the adaptation thread.
+  std::unique_ptr<ResourceAdaptationProcessorInterface>
+      resource_adaptation_processor_;
   std::unique_ptr<DegradationPreferenceManager> degradation_preference_manager_;
   std::vector<AdaptationConstraint*> adaptation_constraints_
       RTC_GUARDED_BY(&resource_adaptation_queue_);