[Adaptation] Give QualityScaler a pending callbacks queue and unittests

This CL adds a queue for pending QualityScalerQpUsageHandlerCallbacks
and private methods for "Queueing", "Handling" and "Aborting" them,
using a sequence number as an ID to ensure we don't accidentally invoke
the same callback twice.

Because we don't have the adaptation task queue yet, callbacks are still
synchronously handled, which means the "pending callbacks" queue would
never have more than 1 element. However, when the adaptation task queue
is added and this is made asynchronous, it will be possible for multiple
callbacks to be pending simultaneously. This design is future-proof.

This CL is split out to aid reviewability. The CL that adds the
adaptation task queue will affect a lot of code. By landing this
separately, the adaptation queue CL will be easier to review.

This CL adds quality_scaler_resource_unittest.cc.

Bug: webrtc:11542, webrtc:11520
Change-Id: I00e7f6bfda9f8e8e82ec25916aa48e9349c8d70c
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/174802
Commit-Queue: Henrik Boström <hbos@webrtc.org>
Reviewed-by: Evan Shrubsole <eshr@google.com>
Reviewed-by: Ilya Nikolaevskiy <ilnik@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#31219}
diff --git a/video/adaptation/BUILD.gn b/video/adaptation/BUILD.gn
index 3269d89..51e6a2d 100644
--- a/video/adaptation/BUILD.gn
+++ b/video/adaptation/BUILD.gn
@@ -59,11 +59,14 @@
     defines = []
     sources = [
       "overuse_frame_detector_unittest.cc",
+      "quality_scaler_resource_unittest.cc",
       "video_stream_encoder_resource_manager_unittest.cc",
     ]
     deps = [
       ":video_adaptation",
       "../../api:scoped_refptr",
+      "../../api/task_queue:default_task_queue_factory",
+      "../../api/task_queue:task_queue",
       "../../api/video:encoded_image",
       "../../api/video:video_adaptation",
       "../../api/video:video_frame_i420",
@@ -76,6 +79,7 @@
       "../../rtc_base:rtc_base_tests_utils",
       "../../rtc_base:rtc_event",
       "../../rtc_base:rtc_numerics",
+      "../../rtc_base:rtc_task_queue",
       "../../rtc_base:task_queue_for_test",
       "../../test:field_trial",
       "//test:rtc_expect_death",
diff --git a/video/adaptation/quality_scaler_resource.cc b/video/adaptation/quality_scaler_resource.cc
index 9fcc58e..631e5b0 100644
--- a/video/adaptation/quality_scaler_resource.cc
+++ b/video/adaptation/quality_scaler_resource.cc
@@ -21,7 +21,15 @@
       encoder_queue_(nullptr),
       adaptation_processor_(nullptr),
       quality_scaler_(nullptr),
-      pending_qp_usage_callback_(nullptr) {}
+      num_handled_callbacks_(0),
+      pending_callbacks_(),
+      processing_in_progress_(false),
+      clear_qp_samples_(false) {}
+
+QualityScalerResource::~QualityScalerResource() {
+  RTC_DCHECK(!quality_scaler_);
+  RTC_DCHECK(pending_callbacks_.empty());
+}
 
 void QualityScalerResource::Initialize(rtc::TaskQueue* encoder_queue) {
   RTC_DCHECK(!encoder_queue_);
@@ -29,8 +37,6 @@
   encoder_queue_ = encoder_queue;
 }
 
-QualityScalerResource::~QualityScalerResource() {}
-
 void QualityScalerResource::SetAdaptationProcessor(
     ResourceAdaptationProcessorInterface* adaptation_processor) {
   RTC_DCHECK_RUN_ON(encoder_queue_);
@@ -52,6 +58,9 @@
 
 void QualityScalerResource::StopCheckForOveruse() {
   RTC_DCHECK_RUN_ON(encoder_queue_);
+  // Ensure we have no pending callbacks. This makes it safe to destroy the
+  // QualityScaler and even task queues with tasks in-flight.
+  AbortPendingCallbacks();
   quality_scaler_.reset();
 }
 
@@ -101,30 +110,28 @@
 void QualityScalerResource::OnReportQpUsageHigh(
     rtc::scoped_refptr<QualityScalerQpUsageHandlerCallbackInterface> callback) {
   RTC_DCHECK_RUN_ON(encoder_queue_);
-  RTC_DCHECK(!pending_qp_usage_callback_);
+  size_t callback_id = QueuePendingCallback(callback);
   // TODO(https://crbug.com/webrtc/11542): When we have an adaptation queue,
   // PostTask the resource usage measurements.
-  pending_qp_usage_callback_ = std::move(callback);
-  // If this triggers adaptation, OnAdaptationApplied() is called by the
-  // processor where we determine if QP should be cleared and we invoke and null
-  // the |pending_qp_usage_callback_|.
+  RTC_DCHECK(!processing_in_progress_);
+  processing_in_progress_ = true;
+  clear_qp_samples_ = false;
+  // If this OnResourceUsageStateMeasured() triggers an adaptation,
+  // OnAdaptationApplied() will occur between this line and the next. This
+  // allows modifying |clear_qp_samples_| based on the adaptation.
   OnResourceUsageStateMeasured(ResourceUsageState::kOveruse);
-  // If |pending_qp_usage_callback_| has not been nulled yet then we did not
-  // just trigger an adaptation and should not clear the QP samples.
-  if (pending_qp_usage_callback_) {
-    pending_qp_usage_callback_->OnQpUsageHandled(false);
-    pending_qp_usage_callback_ = nullptr;
-  }
+  HandlePendingCallback(callback_id, clear_qp_samples_);
+  processing_in_progress_ = false;
 }
 
 void QualityScalerResource::OnReportQpUsageLow(
     rtc::scoped_refptr<QualityScalerQpUsageHandlerCallbackInterface> callback) {
   RTC_DCHECK_RUN_ON(encoder_queue_);
-  RTC_DCHECK(!pending_qp_usage_callback_);
+  size_t callback_id = QueuePendingCallback(callback);
   // TODO(https://crbug.com/webrtc/11542): When we have an adaptation queue,
   // PostTask the resource usage measurements.
   OnResourceUsageStateMeasured(ResourceUsageState::kUnderuse);
-  callback->OnQpUsageHandled(true);
+  HandlePendingCallback(callback_id, true);
 }
 
 void QualityScalerResource::OnAdaptationApplied(
@@ -132,13 +139,11 @@
     const VideoSourceRestrictions& restrictions_before,
     const VideoSourceRestrictions& restrictions_after,
     rtc::scoped_refptr<Resource> reason_resource) {
-  // TODO(https://crbug.com/webrtc/11542): When we have an adaptation queue,
-  // ensure that this is running on it instead.
   RTC_DCHECK_RUN_ON(encoder_queue_);
   // We only clear QP samples on adaptations triggered by the QualityScaler.
-  if (!pending_qp_usage_callback_)
+  if (!processing_in_progress_)
     return;
-  bool clear_qp_samples = true;
+  clear_qp_samples_ = true;
   // If we're in "balanced" and the frame rate before and after adaptation did
   // not differ that much, don't clear the QP samples and instead check for QP
   // again in a short amount of time. This may trigger adapting down again soon.
@@ -160,12 +165,45 @@
       int fps_diff = input_state.frames_per_second() -
                      restrictions_after.max_frame_rate().value();
       if (fps_diff < min_diff.value()) {
-        clear_qp_samples = false;
+        clear_qp_samples_ = false;
       }
     }
   }
-  pending_qp_usage_callback_->OnQpUsageHandled(clear_qp_samples);
-  pending_qp_usage_callback_ = nullptr;
+}
+
+size_t QualityScalerResource::QueuePendingCallback(
+    rtc::scoped_refptr<QualityScalerQpUsageHandlerCallbackInterface> callback) {
+  RTC_DCHECK_RUN_ON(encoder_queue_);
+  pending_callbacks_.push(callback);
+  // The ID of a callback is its sequence number (1, 2, 3...).
+  return num_handled_callbacks_ + pending_callbacks_.size();
+}
+
+void QualityScalerResource::HandlePendingCallback(size_t callback_id,
+                                                  bool clear_qp_samples) {
+  // TODO(https://crbug.com/webrtc/11542): When we have an adaptation queue,
+  // this method would be invoked on the adaptation queue and a PostTask would
+  // be used to resolve the callback.
+  RTC_DCHECK_RUN_ON(encoder_queue_);
+  if (num_handled_callbacks_ >= callback_id) {
+    // The callback with this ID has already been handled.
+    // This happens if AbortPendingCallbacks() is called while the task is
+    // in flight.
+    return;
+  }
+  RTC_DCHECK(!pending_callbacks_.empty());
+  pending_callbacks_.front()->OnQpUsageHandled(clear_qp_samples);
+  ++num_handled_callbacks_;
+  pending_callbacks_.pop();
+}
+
+void QualityScalerResource::AbortPendingCallbacks() {
+  RTC_DCHECK_RUN_ON(encoder_queue_);
+  while (!pending_callbacks_.empty()) {
+    pending_callbacks_.front()->OnQpUsageHandled(false);
+    ++num_handled_callbacks_;
+    pending_callbacks_.pop();
+  }
 }
 
 }  // namespace webrtc
diff --git a/video/adaptation/quality_scaler_resource.h b/video/adaptation/quality_scaler_resource.h
index 6cec79c..7c55e9b 100644
--- a/video/adaptation/quality_scaler_resource.h
+++ b/video/adaptation/quality_scaler_resource.h
@@ -12,6 +12,7 @@
 #define VIDEO_ADAPTATION_QUALITY_SCALER_RESOURCE_H_
 
 #include <memory>
+#include <queue>
 #include <string>
 
 #include "api/video/video_adaptation_reason.h"
@@ -19,16 +20,13 @@
 #include "call/adaptation/resource.h"
 #include "call/adaptation/resource_adaptation_processor_interface.h"
 #include "modules/video_coding/utility/quality_scaler.h"
+#include "rtc_base/critical_section.h"
 #include "rtc_base/ref_counted_object.h"
 #include "rtc_base/task_queue.h"
 
 namespace webrtc {
 
 // Handles interaction with the QualityScaler.
-// TODO(hbos): Add unittests specific to this class, it is currently only tested
-// indirectly by usage in the ResourceAdaptationProcessor (which is only tested
-// because of its usage in VideoStreamEncoder); all tests are currently in
-// video_stream_encoder_unittest.cc.
 class QualityScalerResource : public rtc::RefCountedObject<Resource>,
                               public QualityScalerQpUsageHandlerInterface {
  public:
@@ -70,14 +68,31 @@
       rtc::scoped_refptr<Resource> reason_resource) override;
 
  private:
+  size_t QueuePendingCallback(
+      rtc::scoped_refptr<QualityScalerQpUsageHandlerCallbackInterface>
+          callback);
+  void HandlePendingCallback(size_t callback_id, bool clear_qp_samples);
+  void AbortPendingCallbacks();
+
   rtc::TaskQueue* encoder_queue_;
   // TODO(https://crbug.com/webrtc/11542): When we have an adaptation queue,
   // guard the processor by it instead.
   ResourceAdaptationProcessorInterface* adaptation_processor_
       RTC_GUARDED_BY(encoder_queue_);
   std::unique_ptr<QualityScaler> quality_scaler_ RTC_GUARDED_BY(encoder_queue_);
-  rtc::scoped_refptr<QualityScalerQpUsageHandlerCallbackInterface>
-      pending_qp_usage_callback_ RTC_GUARDED_BY(encoder_queue_);
+  // Every OnReportQpUsageHigh/Low() operation has a callback that MUST be
+  // invoked on the |encoder_queue_|.
+  // TODO(https://crbug.com/webrtc/11542): When we have an adaptation queue,
+  // handling a measurement entails a task queue "ping" round-trip between the
+  // encoder queue and the adaptation queue. Multiple callbacks in-flight would
+  // then be possible.
+  size_t num_handled_callbacks_ RTC_GUARDED_BY(encoder_queue_);
+  std::queue<rtc::scoped_refptr<QualityScalerQpUsageHandlerCallbackInterface>>
+      pending_callbacks_ RTC_GUARDED_BY(encoder_queue_);
+  // TODO(https://crbug.com/webrtc/11542): When we have an adaptation queue,
+  // guard processing_in_progress_/clear_cp_samples_ by it instead.
+  bool processing_in_progress_ RTC_GUARDED_BY(encoder_queue_);
+  bool clear_qp_samples_ RTC_GUARDED_BY(encoder_queue_);
 };
 
 }  // namespace webrtc
diff --git a/video/adaptation/quality_scaler_resource_unittest.cc b/video/adaptation/quality_scaler_resource_unittest.cc
new file mode 100644
index 0000000..d49addf
--- /dev/null
+++ b/video/adaptation/quality_scaler_resource_unittest.cc
@@ -0,0 +1,158 @@
+/*
+ *  Copyright 2020 The WebRTC Project Authors. All rights reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "video/adaptation/quality_scaler_resource.h"
+
+#include <memory>
+
+#include "absl/types/optional.h"
+#include "api/task_queue/default_task_queue_factory.h"
+#include "api/task_queue/task_queue_factory.h"
+#include "api/video_codecs/video_encoder.h"
+#include "rtc_base/event.h"
+#include "rtc_base/task_queue.h"
+#include "test/gtest.h"
+
+namespace webrtc {
+
+namespace {
+
+const int kDefaultTimeout = 5000;
+
+class FakeQualityScalerQpUsageHandlerCallback
+    : public QualityScalerQpUsageHandlerCallbackInterface {
+ public:
+  explicit FakeQualityScalerQpUsageHandlerCallback(
+      rtc::TaskQueue* encoder_queue)
+      : QualityScalerQpUsageHandlerCallbackInterface(),
+        encoder_queue_(encoder_queue),
+        is_handled_(false),
+        qp_usage_handled_event_(true /* manual_reset */, false),
+        clear_qp_samples_result_(absl::nullopt) {}
+  ~FakeQualityScalerQpUsageHandlerCallback() override {
+    RTC_DCHECK(is_handled_)
+        << "The callback was destroyed without being invoked.";
+  }
+
+  void OnQpUsageHandled(bool clear_qp_samples) override {
+    ASSERT_TRUE(encoder_queue_->IsCurrent());
+    RTC_DCHECK(!is_handled_);
+    clear_qp_samples_result_ = clear_qp_samples;
+    is_handled_ = true;
+    qp_usage_handled_event_.Set();
+  }
+
+  bool is_handled() const { return is_handled_; }
+  rtc::Event* qp_usage_handled_event() { return &qp_usage_handled_event_; }
+  absl::optional<bool> clear_qp_samples_result() const {
+    return clear_qp_samples_result_;
+  }
+
+ private:
+  rtc::TaskQueue* const encoder_queue_;
+  bool is_handled_;
+  rtc::Event qp_usage_handled_event_;
+  absl::optional<bool> clear_qp_samples_result_;
+};
+
+}  // namespace
+
+class QualityScalerResourceTest : public ::testing::Test {
+ public:
+  QualityScalerResourceTest()
+      : task_queue_factory_(CreateDefaultTaskQueueFactory()),
+        encoder_queue_(task_queue_factory_->CreateTaskQueue(
+            "EncoderQueue",
+            TaskQueueFactory::Priority::NORMAL)),
+        quality_scaler_resource_(new QualityScalerResource()) {
+    quality_scaler_resource_->Initialize(&encoder_queue_);
+    rtc::Event event;
+    encoder_queue_.PostTask([this, &event] {
+      quality_scaler_resource_->StartCheckForOveruse(
+          VideoEncoder::QpThresholds());
+      event.Set();
+    });
+    event.Wait(kDefaultTimeout);
+  }
+
+  ~QualityScalerResourceTest() {
+    rtc::Event event;
+    encoder_queue_.PostTask([this, &event] {
+      quality_scaler_resource_->StopCheckForOveruse();
+      event.Set();
+    });
+    event.Wait(kDefaultTimeout);
+  }
+
+ protected:
+  const std::unique_ptr<TaskQueueFactory> task_queue_factory_;
+  rtc::TaskQueue encoder_queue_;
+  rtc::scoped_refptr<QualityScalerResource> quality_scaler_resource_;
+};
+
+TEST_F(QualityScalerResourceTest, ReportQpHigh) {
+  rtc::scoped_refptr<FakeQualityScalerQpUsageHandlerCallback> callback =
+      new FakeQualityScalerQpUsageHandlerCallback(&encoder_queue_);
+  encoder_queue_.PostTask([this, callback] {
+    quality_scaler_resource_->OnReportQpUsageHigh(callback);
+  });
+  callback->qp_usage_handled_event()->Wait(kDefaultTimeout);
+}
+
+TEST_F(QualityScalerResourceTest, ReportQpLow) {
+  rtc::scoped_refptr<FakeQualityScalerQpUsageHandlerCallback> callback =
+      new FakeQualityScalerQpUsageHandlerCallback(&encoder_queue_);
+  encoder_queue_.PostTask([this, callback] {
+    quality_scaler_resource_->OnReportQpUsageLow(callback);
+  });
+  callback->qp_usage_handled_event()->Wait(kDefaultTimeout);
+}
+
+// TODO(https://crbug.com/webrtc/11542): Callbacks are currently resolved
+// immediately, but when we have an adaptation queue this test will ensure we
+// can have multiple callbacks pending at the same time.
+TEST_F(QualityScalerResourceTest, MultipleCallbacksInFlight) {
+  rtc::scoped_refptr<FakeQualityScalerQpUsageHandlerCallback> callback1 =
+      new FakeQualityScalerQpUsageHandlerCallback(&encoder_queue_);
+  rtc::scoped_refptr<FakeQualityScalerQpUsageHandlerCallback> callback2 =
+      new FakeQualityScalerQpUsageHandlerCallback(&encoder_queue_);
+  rtc::scoped_refptr<FakeQualityScalerQpUsageHandlerCallback> callback3 =
+      new FakeQualityScalerQpUsageHandlerCallback(&encoder_queue_);
+  encoder_queue_.PostTask([this, callback1, callback2, callback3] {
+    quality_scaler_resource_->OnReportQpUsageHigh(callback1);
+    quality_scaler_resource_->OnReportQpUsageLow(callback2);
+    quality_scaler_resource_->OnReportQpUsageHigh(callback3);
+  });
+  callback1->qp_usage_handled_event()->Wait(kDefaultTimeout);
+  callback2->qp_usage_handled_event()->Wait(kDefaultTimeout);
+  callback3->qp_usage_handled_event()->Wait(kDefaultTimeout);
+}
+
+// TODO(https://crbug.com/webrtc/11542): Callbacks are currently resolved
+// immediately, but when we have an adaptation queue this test will ensure we
+// can abort pending callbacks.
+TEST_F(QualityScalerResourceTest, AbortPendingCallbacksAndStartAgain) {
+  rtc::scoped_refptr<FakeQualityScalerQpUsageHandlerCallback> callback1 =
+      new FakeQualityScalerQpUsageHandlerCallback(&encoder_queue_);
+  rtc::scoped_refptr<FakeQualityScalerQpUsageHandlerCallback> callback2 =
+      new FakeQualityScalerQpUsageHandlerCallback(&encoder_queue_);
+  encoder_queue_.PostTask([this, callback1, callback2] {
+    quality_scaler_resource_->OnReportQpUsageHigh(callback1);
+    quality_scaler_resource_->StopCheckForOveruse();
+    EXPECT_TRUE(callback1->qp_usage_handled_event()->Wait(0));
+    quality_scaler_resource_->StartCheckForOveruse(
+        VideoEncoder::QpThresholds());
+    quality_scaler_resource_->OnReportQpUsageHigh(callback2);
+  });
+  callback1->qp_usage_handled_event()->Wait(kDefaultTimeout);
+  callback2->qp_usage_handled_event()->Wait(kDefaultTimeout);
+}
+
+}  // namespace webrtc