Reland "Running FrameBuffer on task queue."

This is a reland of 13943b7b7f6d00568912b9969db2c7871d18e21f

Original change's description:
> Running FrameBuffer on task queue.
> 
> This prepares for running WebRTC in simulated time where event::Wait
> based timing doesn't work.
> 
> Bug: webrtc:10365
> Change-Id: Ia0f9b1cc8e3c8c27a38e45b40487050a4699d8cf
> Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/129962
> Reviewed-by: Philip Eliasson <philipel@webrtc.org>
> Reviewed-by: Erik Språng <sprang@webrtc.org>
> Commit-Queue: Sebastian Jansson <srte@webrtc.org>
> Cr-Commit-Position: refs/heads/master@{#27422}

Bug: webrtc:10365
Change-Id: I412d3e0fe06c6dd57cdb42974f09e03f3a6ad038
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/131124
Reviewed-by: Philip Eliasson <philipel@webrtc.org>
Reviewed-by: Erik Språng <sprang@webrtc.org>
Commit-Queue: Sebastian Jansson <srte@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#27572}
diff --git a/modules/video_coding/BUILD.gn b/modules/video_coding/BUILD.gn
index 12d02bf..29a5d8c 100644
--- a/modules/video_coding/BUILD.gn
+++ b/modules/video_coding/BUILD.gn
@@ -151,6 +151,7 @@
     "..:module_api_public",
     "../../api:fec_controller_api",
     "../../api:rtp_headers",
+    "../../api/task_queue:global_task_queue_factory",
     "../../api/units:data_rate",
     "../../api/video:builtin_video_bitrate_allocator_factory",
     "../../api/video:encoded_frame",
@@ -170,6 +171,7 @@
     "../../rtc_base/experiments:rtt_mult_experiment",
     "../../rtc_base/synchronization:sequence_checker",
     "../../rtc_base/system:fallthrough",
+    "../../rtc_base/task_utils:repeating_task",
     "../../rtc_base/third_party/base64",
     "../../rtc_base/time:timestamp_extrapolator",
     "../../system_wrappers",
diff --git a/modules/video_coding/frame_buffer2.cc b/modules/video_coding/frame_buffer2.cc
index 5d427b0..20b680e 100644
--- a/modules/video_coding/frame_buffer2.cc
+++ b/modules/video_coding/frame_buffer2.cc
@@ -17,6 +17,7 @@
 #include <utility>
 #include <vector>
 
+#include "absl/memory/memory.h"
 #include "api/video/encoded_image.h"
 #include "api/video/video_timing.h"
 #include "modules/video_coding/include/video_coding_defines.h"
@@ -53,6 +54,7 @@
                          VCMReceiveStatisticsCallback* stats_callback)
     : decoded_frames_history_(kMaxFramesHistory),
       clock_(clock),
+      callback_queue_(nullptr),
       jitter_estimator_(jitter_estimator),
       timing_(timing),
       inter_frame_delay_(clock_->TimeInMilliseconds()),
@@ -65,6 +67,55 @@
 
 FrameBuffer::~FrameBuffer() {}
 
+void FrameBuffer::NextFrame(
+    int64_t max_wait_time_ms,
+    bool keyframe_required,
+    rtc::TaskQueue* callback_queue,
+    std::function<void(std::unique_ptr<EncodedFrame>, ReturnReason)> handler) {
+  RTC_DCHECK_RUN_ON(callback_queue);
+  TRACE_EVENT0("webrtc", "FrameBuffer::NextFrame");
+  int64_t latest_return_time_ms =
+      clock_->TimeInMilliseconds() + max_wait_time_ms;
+  rtc::CritScope lock(&crit_);
+  if (stopped_) {
+    return;
+  }
+  latest_return_time_ms_ = latest_return_time_ms;
+  keyframe_required_ = keyframe_required;
+  frame_handler_ = handler;
+  callback_queue_ = callback_queue;
+  StartWaitForNextFrameOnQueue();
+}
+
+void FrameBuffer::StartWaitForNextFrameOnQueue() {
+  RTC_DCHECK(callback_queue_);
+  RTC_DCHECK(!callback_task_.Running());
+  int64_t wait_ms = FindNextFrame(clock_->TimeInMilliseconds());
+  callback_task_ = RepeatingTaskHandle::DelayedStart(
+      callback_queue_->Get(), TimeDelta::ms(wait_ms), [this] {
+        // If this task has not been cancelled, we did not get any new frames
+        // while waiting. Continue with frame delivery.
+        rtc::CritScope lock(&crit_);
+        if (!frames_to_decode_.empty()) {
+          // We have frames, deliver!
+          frame_handler_(absl::WrapUnique(GetNextFrame()), kFrameFound);
+          CancelCallback();
+          return TimeDelta::Zero();  // Ignored.
+        } else if (clock_->TimeInMilliseconds() >= latest_return_time_ms_) {
+          // We have timed out, signal this and stop repeating.
+          frame_handler_(nullptr, kTimeout);
+          CancelCallback();
+          return TimeDelta::Zero();  // Ignored.
+        } else {
+          // If there's no frames to decode and there is still time left, it
+          // means that the frame buffer was cleared between creation and
+          // execution of this task. Continue waiting for the remaining time.
+          int64_t wait_ms = FindNextFrame(clock_->TimeInMilliseconds());
+          return TimeDelta::ms(wait_ms);
+        }
+      });
+}
+
 FrameBuffer::ReturnReason FrameBuffer::NextFrame(
     int64_t max_wait_time_ms,
     std::unique_ptr<EncodedFrame>* frame_out,
@@ -313,6 +364,7 @@
   rtc::CritScope lock(&crit_);
   stopped_ = true;
   new_continuous_frame_event_.Set();
+  CancelCallback();
 }
 
 void FrameBuffer::Clear() {
@@ -342,6 +394,12 @@
   return true;
 }
 
+void FrameBuffer::CancelCallback() {
+  frame_handler_ = {};
+  callback_task_.Stop();
+  callback_queue_ = nullptr;
+}
+
 bool FrameBuffer::IsCompleteSuperFrame(const EncodedFrame& frame) {
   if (frame.inter_layer_predicted) {
     // Check that all previous spatial layers are already inserted.
@@ -487,9 +545,19 @@
     last_continuous_picture_id = last_continuous_frame_->picture_id;
 
     // Since we now have new continuous frames there might be a better frame
-    // to return from NextFrame. Signal that thread so that it again can choose
-    // which frame to return.
+    // to return from NextFrame.
     new_continuous_frame_event_.Set();
+
+    if (callback_queue_) {
+      callback_queue_->PostTask([this] {
+        rtc::CritScope lock(&crit_);
+        if (!callback_task_.Running())
+          return;
+        RTC_CHECK(frame_handler_);
+        callback_task_.Stop();
+        StartWaitForNextFrameOnQueue();
+      });
+    }
   }
 
   return last_continuous_picture_id;
diff --git a/modules/video_coding/frame_buffer2.h b/modules/video_coding/frame_buffer2.h
index 78a6171..57dbaa4 100644
--- a/modules/video_coding/frame_buffer2.h
+++ b/modules/video_coding/frame_buffer2.h
@@ -27,6 +27,8 @@
 #include "rtc_base/event.h"
 #include "rtc_base/experiments/rtt_mult_experiment.h"
 #include "rtc_base/numerics/sequence_number_util.h"
+#include "rtc_base/task_queue.h"
+#include "rtc_base/task_utils/repeating_task.h"
 #include "rtc_base/thread_annotations.h"
 
 namespace webrtc {
@@ -64,6 +66,11 @@
   ReturnReason NextFrame(int64_t max_wait_time_ms,
                          std::unique_ptr<EncodedFrame>* frame_out,
                          bool keyframe_required = false);
+  void NextFrame(
+      int64_t max_wait_time_ms,
+      bool keyframe_required,
+      rtc::TaskQueue* callback_queue,
+      std::function<void(std::unique_ptr<EncodedFrame>, ReturnReason)> handler);
 
   // Tells the FrameBuffer which protection mode that is in use. Affects
   // the frame timing.
@@ -121,6 +128,9 @@
   int64_t FindNextFrame(int64_t now_ms) RTC_EXCLUSIVE_LOCKS_REQUIRED(crit_);
   EncodedFrame* GetNextFrame() RTC_EXCLUSIVE_LOCKS_REQUIRED(crit_);
 
+  void StartWaitForNextFrameOnQueue() RTC_EXCLUSIVE_LOCKS_REQUIRED(crit_);
+  void CancelCallback() RTC_EXCLUSIVE_LOCKS_REQUIRED(crit_);
+
   // Update all directly dependent and indirectly dependent frames and mark
   // them as continuous if all their references has been fulfilled.
   void PropagateContinuity(FrameMap::iterator start)
@@ -163,6 +173,11 @@
 
   rtc::CriticalSection crit_;
   Clock* const clock_;
+
+  rtc::TaskQueue* callback_queue_ RTC_GUARDED_BY(crit_);
+  RepeatingTaskHandle callback_task_ RTC_GUARDED_BY(crit_);
+  std::function<void(std::unique_ptr<EncodedFrame>, ReturnReason)>
+      frame_handler_ RTC_GUARDED_BY(crit_);
   int64_t latest_return_time_ms_ RTC_GUARDED_BY(crit_);
   bool keyframe_required_ RTC_GUARDED_BY(crit_);
 
diff --git a/video/video_receive_stream.cc b/video/video_receive_stream.cc
index 6e3a063..29c8820 100644
--- a/video/video_receive_stream.cc
+++ b/video/video_receive_stream.cc
@@ -185,6 +185,8 @@
       num_cpu_cores_(num_cpu_cores),
       process_thread_(process_thread),
       clock_(clock),
+      use_task_queue_(
+          !field_trial::IsDisabled("WebRTC-Video-DecodeOnTaskQueue")),
       decode_thread_(&DecodeThreadFunction,
                      this,
                      "DecodingThread",
@@ -213,7 +215,10 @@
                                     .value_or(kMaxWaitForKeyFrameMs)),
       max_wait_for_frame_ms_(KeyframeIntervalSettings::ParseFromFieldTrials()
                                  .MaxWaitForFrameMs()
-                                 .value_or(kMaxWaitForFrameMs)) {
+                                 .value_or(kMaxWaitForFrameMs)),
+      decode_queue_(task_queue_factory_->CreateTaskQueue(
+          "DecodingQueue",
+          TaskQueueFactory::Priority::HIGH)) {
   RTC_LOG(LS_INFO) << "VideoReceiveStream: " << config_.ToString();
 
   RTC_DCHECK(config_.renderer);
@@ -309,7 +314,7 @@
 void VideoReceiveStream::Start() {
   RTC_DCHECK_RUN_ON(&worker_sequence_checker_);
 
-  if (decode_thread_.IsRunning()) {
+  if (decoder_running_) {
     return;
   }
 
@@ -388,7 +393,16 @@
   // Start the decode thread
   video_receiver_.DecoderThreadStarting();
   stats_proxy_.DecoderThreadStarting();
-  decode_thread_.Start();
+  if (!use_task_queue_) {
+    decode_thread_.Start();
+  } else {
+    decode_queue_.PostTask([this] {
+      RTC_DCHECK_RUN_ON(&decode_queue_);
+      decoder_stopped_ = false;
+      StartNextDecode();
+    });
+  }
+  decoder_running_ = true;
   rtp_video_stream_receiver_.StartReceive();
 }
 
@@ -399,16 +413,31 @@
   stats_proxy_.OnUniqueFramesCounted(
       rtp_video_stream_receiver_.GetUniqueFramesSeen());
 
-  frame_buffer_->Stop();
+  if (!use_task_queue_) {
+    frame_buffer_->Stop();
+  } else {
+    decode_queue_.PostTask([this] { frame_buffer_->Stop(); });
+  }
   call_stats_->DeregisterStatsObserver(this);
 
-  if (decode_thread_.IsRunning()) {
+  if (decoder_running_) {
     // TriggerDecoderShutdown will release any waiting decoder thread and make
     // it stop immediately, instead of waiting for a timeout. Needs to be called
     // before joining the decoder thread.
     video_receiver_.TriggerDecoderShutdown();
 
-    decode_thread_.Stop();
+    if (!use_task_queue_) {
+      decode_thread_.Stop();
+    } else {
+      rtc::Event done;
+      decode_queue_.PostTask([this, &done] {
+        RTC_DCHECK_RUN_ON(&decode_queue_);
+        decoder_stopped_ = true;
+        done.Set();
+      });
+      done.Wait(rtc::Event::kForever);
+    }
+    decoder_running_ = false;
     video_receiver_.DecoderThreadStopped();
     stats_proxy_.DecoderThreadStopped();
     // Deregister external decoders so they are no longer running during
@@ -572,6 +601,38 @@
   return keyframe_required_ ? max_wait_for_keyframe_ms_
                             : max_wait_for_frame_ms_;
 }
+
+void VideoReceiveStream::StartNextDecode() {
+  RTC_DCHECK(use_task_queue_);
+  TRACE_EVENT0("webrtc", "VideoReceiveStream::StartNextDecode");
+
+  struct DecodeTask {
+    void operator()() {
+      RTC_DCHECK_RUN_ON(&stream->decode_queue_);
+      if (stream->decoder_stopped_)
+        return;
+      if (frame) {
+        stream->HandleEncodedFrame(std::move(frame));
+      } else {
+        stream->HandleFrameBufferTimeout();
+      }
+      stream->StartNextDecode();
+    }
+    VideoReceiveStream* stream;
+    std::unique_ptr<EncodedFrame> frame;
+  };
+
+  // TODO(philipel): Call NextFrame with |keyframe_required| argument set when
+  //                 downstream project has been fixed.
+  frame_buffer_->NextFrame(
+      GetWaitMs(), /*keyframe_required*/ false, &decode_queue_,
+      [this](std::unique_ptr<EncodedFrame> frame, ReturnReason res) {
+        RTC_DCHECK_EQ(frame == nullptr, res == ReturnReason::kTimeout);
+        RTC_DCHECK_EQ(frame != nullptr, res == ReturnReason::kFrameFound);
+        decode_queue_.PostTask(DecodeTask{this, std::move(frame)});
+      });
+}
+
 void VideoReceiveStream::DecodeThreadFunction(void* ptr) {
   ScopedRegisterThreadForDebugging thread_dbg(RTC_FROM_HERE);
   while (static_cast<VideoReceiveStream*>(ptr)->Decode()) {
@@ -579,6 +640,7 @@
 }
 
 bool VideoReceiveStream::Decode() {
+  RTC_DCHECK(!use_task_queue_);
   TRACE_EVENT0("webrtc", "VideoReceiveStream::Decode");
 
   std::unique_ptr<video_coding::EncodedFrame> frame;
diff --git a/video/video_receive_stream.h b/video/video_receive_stream.h
index 585a2cc..dc4e4b7 100644
--- a/video/video_receive_stream.h
+++ b/video/video_receive_stream.h
@@ -23,6 +23,7 @@
 #include "modules/video_coding/frame_buffer2.h"
 #include "modules/video_coding/video_coding_impl.h"
 #include "rtc_base/synchronization/sequence_checker.h"
+#include "rtc_base/task_queue.h"
 #include "system_wrappers/include/clock.h"
 #include "video/receive_statistics_proxy.h"
 #include "video/rtp_streams_synchronizer.h"
@@ -133,6 +134,7 @@
 
  private:
   int64_t GetWaitMs() const;
+  void StartNextDecode() RTC_RUN_ON(decode_queue_);
   static void DecodeThreadFunction(void* ptr);
   bool Decode();
   void HandleEncodedFrame(std::unique_ptr<video_coding::EncodedFrame> frame);
@@ -153,10 +155,15 @@
   ProcessThread* const process_thread_;
   Clock* const clock_;
 
+  const bool use_task_queue_;
+
   rtc::PlatformThread decode_thread_;
 
   CallStats* const call_stats_;
 
+  bool decoder_running_ RTC_GUARDED_BY(worker_sequence_checker_) = false;
+  bool decoder_stopped_ RTC_GUARDED_BY(decode_queue_) = true;
+
   ReceiveStatisticsProxy stats_proxy_;
   // Shared by media and rtx stream receivers, since the latter has no RtpRtcp
   // module of its own.
@@ -211,6 +218,9 @@
 
   // Maximum delay as decided by the RTP playout delay extension.
   int frame_maximum_playout_delay_ms_ RTC_GUARDED_BY(playout_delay_lock_) = -1;
+
+  // Defined last so they are destroyed before all other members.
+  rtc::TaskQueue decode_queue_;
 };
 }  // namespace internal
 }  // namespace webrtc