Running FrameBuffer on task queue.

This prepares for running WebRTC in simulated time where event::Wait
based timing doesn't work.

Bug: webrtc:10365
Change-Id: Ia0f9b1cc8e3c8c27a38e45b40487050a4699d8cf
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/129962
Reviewed-by: Philip Eliasson <philipel@webrtc.org>
Reviewed-by: Erik Språng <sprang@webrtc.org>
Commit-Queue: Sebastian Jansson <srte@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#27422}
diff --git a/modules/video_coding/BUILD.gn b/modules/video_coding/BUILD.gn
index cdaff5d..5f366a6 100644
--- a/modules/video_coding/BUILD.gn
+++ b/modules/video_coding/BUILD.gn
@@ -151,6 +151,7 @@
     "..:module_api_public",
     "../../api:fec_controller_api",
     "../../api:rtp_headers",
+    "../../api/task_queue:global_task_queue_factory",
     "../../api/units:data_rate",
     "../../api/video:builtin_video_bitrate_allocator_factory",
     "../../api/video:encoded_frame",
@@ -170,6 +171,7 @@
     "../../rtc_base/experiments:jitter_upper_bound_experiment",
     "../../rtc_base/experiments:rtt_mult_experiment",
     "../../rtc_base/system:fallthrough",
+    "../../rtc_base/task_utils:repeating_task",
     "../../rtc_base/third_party/base64",
     "../../rtc_base/time:timestamp_extrapolator",
     "../../system_wrappers",
diff --git a/modules/video_coding/frame_buffer2.cc b/modules/video_coding/frame_buffer2.cc
index ecc0e17..a654019 100644
--- a/modules/video_coding/frame_buffer2.cc
+++ b/modules/video_coding/frame_buffer2.cc
@@ -17,6 +17,8 @@
 #include <utility>
 #include <vector>
 
+#include "absl/memory/memory.h"
+#include "api/task_queue/global_task_queue_factory.h"
 #include "api/video/encoded_image.h"
 #include "api/video/video_timing.h"
 #include "modules/video_coding/include/video_coding_defines.h"
@@ -45,14 +47,30 @@
 constexpr int kMaxAllowedFrameDelayMs = 5;
 
 constexpr int64_t kLogNonDecodedIntervalMs = 5000;
+
+std::unique_ptr<TaskQueueBase, TaskQueueDeleter> CreateQueue(
+    TaskQueueFactory* task_queue_factory) {
+  if (!task_queue_factory)
+    task_queue_factory = &GlobalTaskQueueFactory();
+  return task_queue_factory->CreateTaskQueue("FrameBuffer",
+                                             TaskQueueFactory::Priority::HIGH);
+}
 }  // namespace
 
 FrameBuffer::FrameBuffer(Clock* clock,
                          VCMJitterEstimator* jitter_estimator,
                          VCMTiming* timing,
+                         VCMReceiveStatisticsCallback* stats_proxy)
+    : FrameBuffer(clock, nullptr, jitter_estimator, timing, stats_proxy) {}
+
+FrameBuffer::FrameBuffer(Clock* clock,
+                         TaskQueueFactory* task_queue_factory,
+                         VCMJitterEstimator* jitter_estimator,
+                         VCMTiming* timing,
                          VCMReceiveStatisticsCallback* stats_callback)
     : decoded_frames_history_(kMaxFramesHistory),
       clock_(clock),
+      use_task_queue_(task_queue_factory != nullptr),
       jitter_estimator_(jitter_estimator),
       timing_(timing),
       inter_frame_delay_(clock_->TimeInMilliseconds()),
@@ -61,14 +79,69 @@
       stats_callback_(stats_callback),
       last_log_non_decoded_ms_(-kLogNonDecodedIntervalMs),
       add_rtt_to_playout_delay_(
-          webrtc::field_trial::IsEnabled("WebRTC-AddRttToPlayoutDelay")) {}
+          webrtc::field_trial::IsEnabled("WebRTC-AddRttToPlayoutDelay")),
+      task_queue_(CreateQueue(task_queue_factory)) {}
 
 FrameBuffer::~FrameBuffer() {}
 
+void FrameBuffer::NextFrame(
+    int64_t max_wait_time_ms,
+    bool keyframe_required,
+    std::function<void(std::unique_ptr<EncodedFrame>, ReturnReason)> handler) {
+  RTC_DCHECK(use_task_queue_);
+  TRACE_EVENT0("webrtc", "FrameBuffer::NextFrame");
+  int64_t latest_return_time_ms =
+      clock_->TimeInMilliseconds() + max_wait_time_ms;
+  task_queue_.PostTask([=] {
+    RTC_DCHECK_RUN_ON(&task_queue_);
+    rtc::CritScope lock(&crit_);
+    if (stopped_) {
+      return;
+    }
+    latest_return_time_ms_ = latest_return_time_ms;
+    keyframe_required_ = keyframe_required;
+    frame_handler_ = handler;
+    NextFrameOnQueue();
+  });
+}
+
+void FrameBuffer::NextFrameOnQueue() {
+  RTC_DCHECK(use_task_queue_);
+  RTC_DCHECK(!callback_task_.Running());
+  int64_t wait_ms = UpdateFramesToDecode(clock_->TimeInMilliseconds());
+  callback_task_ = RepeatingTaskHandle::DelayedStart(
+      task_queue_.Get(), TimeDelta::ms(wait_ms), [this] {
+        // If this task has not been cancelled, we did not get any new frames
+        // while waiting. Continue with frame delivery.
+        RTC_DCHECK_RUN_ON(&task_queue_);
+        rtc::CritScope lock(&crit_);
+        if (!frames_to_decode_.empty()) {
+          // We have frames, deliver!
+          frame_handler_(absl::WrapUnique(GetFrameToDecode()), kFrameFound);
+          frame_handler_ = {};
+          callback_task_.Stop();
+          return TimeDelta::Zero();  // Ignored.
+        } else if (clock_->TimeInMilliseconds() >= latest_return_time_ms_) {
+          // We have timed out, signal this and stop repeating.
+          frame_handler_(nullptr, kTimeout);
+          frame_handler_ = {};
+          callback_task_.Stop();
+          return TimeDelta::Zero();  // Ignored.
+        } else {
+          // If there's no frames to decode and there is still time left, it
+          // means that the frame buffer was cleared between creation and
+          // execution of this task. Continue waiting for the remaining time.
+          int64_t wait_ms = UpdateFramesToDecode(clock_->TimeInMilliseconds());
+          return TimeDelta::ms(wait_ms);
+        }
+      });
+}
+
 FrameBuffer::ReturnReason FrameBuffer::NextFrame(
     int64_t max_wait_time_ms,
     std::unique_ptr<EncodedFrame>* frame_out,
     bool keyframe_required) {
+  RTC_DCHECK(!use_task_queue_);
   TRACE_EVENT0("webrtc", "FrameBuffer::NextFrame");
   int64_t latest_return_time_ms =
       clock_->TimeInMilliseconds() + max_wait_time_ms;
@@ -83,183 +156,25 @@
       if (stopped_)
         return kStopped;
 
-      wait_ms = max_wait_time_ms;
-
-      // Need to hold |crit_| in order to access frames_to_decode_. therefore we
+      // Need to hold |crit_| in order to access the members. therefore we
       // set it here in the loop instead of outside the loop in order to not
       // acquire the lock unnecessarily.
-      frames_to_decode_.clear();
-
-      // |last_continuous_frame_| may be empty below, but nullopt is smaller
-      // than everything else and loop will immediately terminate as expected.
-      for (auto frame_it = frames_.begin();
-           frame_it != frames_.end() &&
-           frame_it->first <= last_continuous_frame_;
-           ++frame_it) {
-        if (!frame_it->second.continuous ||
-            frame_it->second.num_missing_decodable > 0) {
-          continue;
-        }
-
-        EncodedFrame* frame = frame_it->second.frame.get();
-
-        if (keyframe_required && !frame->is_keyframe())
-          continue;
-
-        auto last_decoded_frame_timestamp =
-            decoded_frames_history_.GetLastDecodedFrameTimestamp();
-
-        // TODO(https://bugs.webrtc.org/9974): consider removing this check
-        // as it may make a stream undecodable after a very long delay between
-        // frames.
-        if (last_decoded_frame_timestamp &&
-            AheadOf(*last_decoded_frame_timestamp, frame->Timestamp())) {
-          continue;
-        }
-
-        // Only ever return all parts of a superframe. Therefore skip this
-        // frame if it's not a beginning of a superframe.
-        if (frame->inter_layer_predicted) {
-          continue;
-        }
-
-        // Gather all remaining frames for the same superframe.
-        std::vector<FrameMap::iterator> current_superframe;
-        current_superframe.push_back(frame_it);
-        bool last_layer_completed =
-            frame_it->second.frame->is_last_spatial_layer;
-        FrameMap::iterator next_frame_it = frame_it;
-        while (true) {
-          ++next_frame_it;
-          if (next_frame_it == frames_.end() ||
-              next_frame_it->first.picture_id != frame->id.picture_id ||
-              !next_frame_it->second.continuous) {
-            break;
-          }
-          // Check if the next frame has some undecoded references other than
-          // the previous frame in the same superframe.
-          size_t num_allowed_undecoded_refs =
-              (next_frame_it->second.frame->inter_layer_predicted) ? 1 : 0;
-          if (next_frame_it->second.num_missing_decodable >
-              num_allowed_undecoded_refs) {
-            break;
-          }
-          // All frames in the superframe should have the same timestamp.
-          if (frame->Timestamp() != next_frame_it->second.frame->Timestamp()) {
-            RTC_LOG(LS_WARNING)
-                << "Frames in a single superframe have different"
-                   " timestamps. Skipping undecodable superframe.";
-            break;
-          }
-          current_superframe.push_back(next_frame_it);
-          last_layer_completed =
-              next_frame_it->second.frame->is_last_spatial_layer;
-        }
-        // Check if the current superframe is complete.
-        // TODO(bugs.webrtc.org/10064): consider returning all available to
-        // decode frames even if the superframe is not complete yet.
-        if (!last_layer_completed) {
-          continue;
-        }
-
-        frames_to_decode_ = std::move(current_superframe);
-
-        if (frame->RenderTime() == -1) {
-          frame->SetRenderTime(
-              timing_->RenderTimeMs(frame->Timestamp(), now_ms));
-        }
-        wait_ms = timing_->MaxWaitingTime(frame->RenderTime(), now_ms);
-
-        // This will cause the frame buffer to prefer high framerate rather
-        // than high resolution in the case of the decoder not decoding fast
-        // enough and the stream has multiple spatial and temporal layers.
-        // For multiple temporal layers it may cause non-base layer frames to be
-        // skipped if they are late.
-        if (wait_ms < -kMaxAllowedFrameDelayMs)
-          continue;
-
-        break;
-      }
-    }  // rtc::Critscope lock(&crit_);
-
-    wait_ms = std::min<int64_t>(wait_ms, latest_return_time_ms - now_ms);
-    wait_ms = std::max<int64_t>(wait_ms, 0);
+      keyframe_required_ = keyframe_required;
+      latest_return_time_ms_ = latest_return_time_ms;
+      wait_ms = UpdateFramesToDecode(now_ms);
+    }
   } while (new_continuous_frame_event_.Wait(wait_ms));
 
   {
     rtc::CritScope lock(&crit_);
-    now_ms = clock_->TimeInMilliseconds();
-    // TODO(ilnik): remove |frames_out| use frames_to_decode_ directly.
-    std::vector<EncodedFrame*> frames_out;
 
     if (!frames_to_decode_.empty()) {
-      bool superframe_delayed_by_retransmission = false;
-      size_t superframe_size = 0;
-      EncodedFrame* first_frame = frames_to_decode_[0]->second.frame.get();
-      int64_t render_time_ms = first_frame->RenderTime();
-      int64_t receive_time_ms = first_frame->ReceivedTime();
-      // Gracefully handle bad RTP timestamps and render time issues.
-      if (HasBadRenderTiming(*first_frame, now_ms)) {
-        jitter_estimator_->Reset();
-        timing_->Reset();
-        render_time_ms =
-            timing_->RenderTimeMs(first_frame->Timestamp(), now_ms);
-      }
-
-      for (FrameMap::iterator& frame_it : frames_to_decode_) {
-        RTC_DCHECK(frame_it != frames_.end());
-        EncodedFrame* frame = frame_it->second.frame.release();
-
-        frame->SetRenderTime(render_time_ms);
-
-        superframe_delayed_by_retransmission |=
-            frame->delayed_by_retransmission();
-        receive_time_ms = std::max(receive_time_ms, frame->ReceivedTime());
-        superframe_size += frame->size();
-
-        PropagateDecodability(frame_it->second);
-        decoded_frames_history_.InsertDecoded(frame_it->first,
-                                              frame->Timestamp());
-
-        // Remove decoded frame and all undecoded frames before it.
-        frames_.erase(frames_.begin(), ++frame_it);
-
-        frames_out.push_back(frame);
-      }
-
-      if (!superframe_delayed_by_retransmission) {
-        int64_t frame_delay;
-
-        if (inter_frame_delay_.CalculateDelay(first_frame->Timestamp(),
-                                              &frame_delay, receive_time_ms)) {
-          jitter_estimator_->UpdateEstimate(frame_delay, superframe_size);
-        }
-
-        float rtt_mult = protection_mode_ == kProtectionNackFEC ? 0.0 : 1.0;
-        if (RttMultExperiment::RttMultEnabled()) {
-          rtt_mult = RttMultExperiment::GetRttMultValue();
-        }
-        timing_->SetJitterDelay(jitter_estimator_->GetJitterEstimate(rtt_mult));
-        timing_->UpdateCurrentDelay(render_time_ms, now_ms);
-      } else {
-        if (RttMultExperiment::RttMultEnabled() || add_rtt_to_playout_delay_)
-          jitter_estimator_->FrameNacked();
-      }
-
-      UpdateJitterDelay();
-      UpdateTimingFrameInfo();
-    }
-    if (!frames_out.empty()) {
-      if (frames_out.size() == 1) {
-        frame_out->reset(frames_out[0]);
-      } else {
-        frame_out->reset(CombineAndDeleteFrames(frames_out));
-      }
+      frame_out->reset(GetFrameToDecode());
       return kFrameFound;
     }
-  }  // rtc::Critscope lock(&crit_)
+  }
 
-  if (latest_return_time_ms - now_ms > 0) {
+  if (latest_return_time_ms - clock_->TimeInMilliseconds() > 0) {
     // If |next_frame_it_ == frames_.end()| and there is still time left, it
     // means that the frame buffer was cleared as the thread in this function
     // was waiting to acquire |crit_| in order to return. Wait for the
@@ -269,6 +184,166 @@
   return kTimeout;
 }
 
+int64_t FrameBuffer::UpdateFramesToDecode(int64_t now_ms) {
+  int64_t wait_ms = latest_return_time_ms_ - now_ms;
+  frames_to_decode_.clear();
+
+  // |last_continuous_frame_| may be empty below, but nullopt is smaller
+  // than everything else and loop will immediately terminate as expected.
+  for (auto frame_it = frames_.begin();
+       frame_it != frames_.end() && frame_it->first <= last_continuous_frame_;
+       ++frame_it) {
+    if (!frame_it->second.continuous ||
+        frame_it->second.num_missing_decodable > 0) {
+      continue;
+    }
+
+    EncodedFrame* frame = frame_it->second.frame.get();
+
+    if (keyframe_required_ && !frame->is_keyframe())
+      continue;
+
+    auto last_decoded_frame_timestamp =
+        decoded_frames_history_.GetLastDecodedFrameTimestamp();
+
+    // TODO(https://bugs.webrtc.org/9974): consider removing this check
+    // as it may make a stream undecodable after a very long delay between
+    // frames.
+    if (last_decoded_frame_timestamp &&
+        AheadOf(*last_decoded_frame_timestamp, frame->Timestamp())) {
+      continue;
+    }
+
+    // Only ever return all parts of a superframe. Therefore skip this
+    // frame if it's not a beginning of a superframe.
+    if (frame->inter_layer_predicted) {
+      continue;
+    }
+
+    // Gather all remaining frames for the same superframe.
+    std::vector<FrameMap::iterator> current_superframe;
+    current_superframe.push_back(frame_it);
+    bool last_layer_completed = frame_it->second.frame->is_last_spatial_layer;
+    FrameMap::iterator next_frame_it = frame_it;
+    while (true) {
+      ++next_frame_it;
+      if (next_frame_it == frames_.end() ||
+          next_frame_it->first.picture_id != frame->id.picture_id ||
+          !next_frame_it->second.continuous) {
+        break;
+      }
+      // Check if the next frame has some undecoded references other than
+      // the previous frame in the same superframe.
+      size_t num_allowed_undecoded_refs =
+          (next_frame_it->second.frame->inter_layer_predicted) ? 1 : 0;
+      if (next_frame_it->second.num_missing_decodable >
+          num_allowed_undecoded_refs) {
+        break;
+      }
+      // All frames in the superframe should have the same timestamp.
+      if (frame->Timestamp() != next_frame_it->second.frame->Timestamp()) {
+        RTC_LOG(LS_WARNING) << "Frames in a single superframe have different"
+                               " timestamps. Skipping undecodable superframe.";
+        break;
+      }
+      current_superframe.push_back(next_frame_it);
+      last_layer_completed = next_frame_it->second.frame->is_last_spatial_layer;
+    }
+    // Check if the current superframe is complete.
+    // TODO(bugs.webrtc.org/10064): consider returning all available to
+    // decode frames even if the superframe is not complete yet.
+    if (!last_layer_completed) {
+      continue;
+    }
+
+    frames_to_decode_ = std::move(current_superframe);
+
+    if (frame->RenderTime() == -1) {
+      frame->SetRenderTime(timing_->RenderTimeMs(frame->Timestamp(), now_ms));
+    }
+    wait_ms = timing_->MaxWaitingTime(frame->RenderTime(), now_ms);
+
+    // This will cause the frame buffer to prefer high framerate rather
+    // than high resolution in the case of the decoder not decoding fast
+    // enough and the stream has multiple spatial and temporal layers.
+    // For multiple temporal layers it may cause non-base layer frames to be
+    // skipped if they are late.
+    if (wait_ms < -kMaxAllowedFrameDelayMs)
+      continue;
+
+    break;
+  }
+  wait_ms = std::min<int64_t>(wait_ms, latest_return_time_ms_ - now_ms);
+  wait_ms = std::max<int64_t>(wait_ms, 0);
+  return wait_ms;
+}
+
+EncodedFrame* FrameBuffer::GetFrameToDecode() {
+  int64_t now_ms = clock_->TimeInMilliseconds();
+  // TODO(ilnik): remove |frames_out| use frames_to_decode_ directly.
+  std::vector<EncodedFrame*> frames_out;
+
+  RTC_DCHECK(!frames_to_decode_.empty());
+  bool superframe_delayed_by_retransmission = false;
+  size_t superframe_size = 0;
+  EncodedFrame* first_frame = frames_to_decode_[0]->second.frame.get();
+  int64_t render_time_ms = first_frame->RenderTime();
+  int64_t receive_time_ms = first_frame->ReceivedTime();
+  // Gracefully handle bad RTP timestamps and render time issues.
+  if (HasBadRenderTiming(*first_frame, now_ms)) {
+    jitter_estimator_->Reset();
+    timing_->Reset();
+    render_time_ms = timing_->RenderTimeMs(first_frame->Timestamp(), now_ms);
+  }
+
+  for (FrameMap::iterator& frame_it : frames_to_decode_) {
+    RTC_DCHECK(frame_it != frames_.end());
+    EncodedFrame* frame = frame_it->second.frame.release();
+
+    frame->SetRenderTime(render_time_ms);
+
+    superframe_delayed_by_retransmission |= frame->delayed_by_retransmission();
+    receive_time_ms = std::max(receive_time_ms, frame->ReceivedTime());
+    superframe_size += frame->size();
+
+    PropagateDecodability(frame_it->second);
+    decoded_frames_history_.InsertDecoded(frame_it->first, frame->Timestamp());
+
+    // Remove decoded frame and all undecoded frames before it.
+    frames_.erase(frames_.begin(), ++frame_it);
+
+    frames_out.push_back(frame);
+  }
+
+  if (!superframe_delayed_by_retransmission) {
+    int64_t frame_delay;
+
+    if (inter_frame_delay_.CalculateDelay(first_frame->Timestamp(),
+                                          &frame_delay, receive_time_ms)) {
+      jitter_estimator_->UpdateEstimate(frame_delay, superframe_size);
+    }
+
+    float rtt_mult = protection_mode_ == kProtectionNackFEC ? 0.0 : 1.0;
+    if (RttMultExperiment::RttMultEnabled()) {
+      rtt_mult = RttMultExperiment::GetRttMultValue();
+    }
+    timing_->SetJitterDelay(jitter_estimator_->GetJitterEstimate(rtt_mult));
+    timing_->UpdateCurrentDelay(render_time_ms, now_ms);
+  } else {
+    if (RttMultExperiment::RttMultEnabled() || add_rtt_to_playout_delay_)
+      jitter_estimator_->FrameNacked();
+  }
+
+  UpdateJitterDelay();
+  UpdateTimingFrameInfo();
+
+  if (frames_out.size() == 1) {
+    return frames_out[0];
+  } else {
+    return CombineAndDeleteFrames(frames_out);
+  }
+}
+
 bool FrameBuffer::HasBadRenderTiming(const EncodedFrame& frame,
                                      int64_t now_ms) {
   // Assume that render timing errors are due to changes in the video stream.
@@ -297,33 +372,63 @@
   return false;
 }
 
+void FrameBuffer::SafePost(std::function<void()> func) {
+  if (!use_task_queue_) {
+    func();
+  } else {
+    task_queue_.PostTask(func);
+  }
+}
 void FrameBuffer::SetProtectionMode(VCMVideoProtection mode) {
   TRACE_EVENT0("webrtc", "FrameBuffer::SetProtectionMode");
-  rtc::CritScope lock(&crit_);
-  protection_mode_ = mode;
+  SafePost([this, mode] {
+    rtc::CritScope lock(&crit_);
+    protection_mode_ = mode;
+  });
 }
 
 void FrameBuffer::Start() {
   TRACE_EVENT0("webrtc", "FrameBuffer::Start");
-  rtc::CritScope lock(&crit_);
-  stopped_ = false;
+  SafePost([this] {
+    rtc::CritScope lock(&crit_);
+    stopped_ = false;
+  });
 }
 
 void FrameBuffer::Stop() {
   TRACE_EVENT0("webrtc", "FrameBuffer::Stop");
-  rtc::CritScope lock(&crit_);
-  stopped_ = true;
-  new_continuous_frame_event_.Set();
+  if (!use_task_queue_) {
+    rtc::CritScope lock(&crit_);
+    stopped_ = true;
+    new_continuous_frame_event_.Set();
+  } else {
+    rtc::Event done;
+    task_queue_.PostTask([this, &done] {
+      rtc::CritScope lock(&crit_);
+      stopped_ = true;
+      if (frame_handler_) {
+        RTC_DCHECK(callback_task_.Running());
+        callback_task_.Stop();
+        frame_handler_ = {};
+      }
+      done.Set();
+    });
+    done.Wait(rtc::Event::kForever);
+  }
 }
 
 void FrameBuffer::Clear() {
-  rtc::CritScope lock(&crit_);
-  ClearFramesAndHistory();
+  SafePost([this] {
+    rtc::CritScope lock(&crit_);
+    ClearFramesAndHistory();
+  });
 }
 
 void FrameBuffer::UpdateRtt(int64_t rtt_ms) {
-  rtc::CritScope lock(&crit_);
-  jitter_estimator_->UpdateRtt(rtt_ms);
+  SafePost([this, rtt_ms] {
+    rtc::CritScope lock(&crit_);
+    jitter_estimator_->UpdateRtt(rtt_ms);
+  });
 }
 
 bool FrameBuffer::ValidReferences(const EncodedFrame& frame) const {
@@ -384,6 +489,22 @@
   return true;
 }
 
+void FrameBuffer::InsertFrame(std::unique_ptr<EncodedFrame> frame,
+                              std::function<void(int64_t)> picture_id_handler) {
+  struct InsertFrameTask {
+    void operator()() {
+      RTC_DCHECK_RUN_ON(&frame_buffer->task_queue_);
+      int64_t last_continuous_pid = frame_buffer->InsertFrame(std::move(frame));
+      picture_id_handler(last_continuous_pid);
+    }
+    FrameBuffer* frame_buffer;
+    std::unique_ptr<EncodedFrame> frame;
+    std::function<void(int64_t)> picture_id_handler;
+  };
+  task_queue_.PostTask(
+      InsertFrameTask{this, std::move(frame), std::move(picture_id_handler)});
+}
+
 int64_t FrameBuffer::InsertFrame(std::unique_ptr<EncodedFrame> frame) {
   TRACE_EVENT0("webrtc", "FrameBuffer::InsertFrame");
   RTC_DCHECK(frame);
@@ -487,9 +608,14 @@
     last_continuous_picture_id = last_continuous_frame_->picture_id;
 
     // Since we now have new continuous frames there might be a better frame
-    // to return from NextFrame. Signal that thread so that it again can choose
-    // which frame to return.
-    new_continuous_frame_event_.Set();
+    // to return from NextFrame.
+    if (!use_task_queue_) {
+      new_continuous_frame_event_.Set();
+    } else if (callback_task_.Running()) {
+      RTC_CHECK(frame_handler_);
+      callback_task_.Stop();
+      NextFrameOnQueue();
+    }
   }
 
   return last_continuous_picture_id;
diff --git a/modules/video_coding/frame_buffer2.h b/modules/video_coding/frame_buffer2.h
index fda496e..7772167 100644
--- a/modules/video_coding/frame_buffer2.h
+++ b/modules/video_coding/frame_buffer2.h
@@ -27,6 +27,8 @@
 #include "rtc_base/event.h"
 #include "rtc_base/experiments/rtt_mult_experiment.h"
 #include "rtc_base/numerics/sequence_number_util.h"
+#include "rtc_base/task_queue.h"
+#include "rtc_base/task_utils/repeating_task.h"
 #include "rtc_base/thread_annotations.h"
 
 namespace webrtc {
@@ -45,7 +47,13 @@
   FrameBuffer(Clock* clock,
               VCMJitterEstimator* jitter_estimator,
               VCMTiming* timing,
-              VCMReceiveStatisticsCallback* stats_proxy);
+              VCMReceiveStatisticsCallback* stats_callback);
+
+  FrameBuffer(Clock* clock,
+              TaskQueueFactory* task_queue_factory,
+              VCMJitterEstimator* jitter_estimator,
+              VCMTiming* timing,
+              VCMReceiveStatisticsCallback* stats_callback);
 
   virtual ~FrameBuffer();
 
@@ -54,6 +62,9 @@
   // TODO(philipel): Return a VideoLayerFrameId and not only the picture id.
   int64_t InsertFrame(std::unique_ptr<EncodedFrame> frame);
 
+  void InsertFrame(std::unique_ptr<EncodedFrame> frame,
+                   std::function<void(int64_t)> picture_id_handler);
+
   // Get the next frame for decoding. Will return at latest after
   // |max_wait_time_ms|.
   //  - If a frame is available within |max_wait_time_ms| it will return
@@ -64,6 +75,10 @@
   ReturnReason NextFrame(int64_t max_wait_time_ms,
                          std::unique_ptr<EncodedFrame>* frame_out,
                          bool keyframe_required = false);
+  void NextFrame(
+      int64_t max_wait_time_ms,
+      bool keyframe_required,
+      std::function<void(std::unique_ptr<EncodedFrame>, ReturnReason)> handler);
 
   // Tells the FrameBuffer which protection mode that is in use. Affects
   // the frame timing.
@@ -115,9 +130,16 @@
 
   using FrameMap = std::map<VideoLayerFrameId, FrameInfo>;
 
+  void SafePost(std::function<void()> func);
+
   // Check that the references of |frame| are valid.
   bool ValidReferences(const EncodedFrame& frame) const;
 
+  void NextFrameOnQueue() RTC_EXCLUSIVE_LOCKS_REQUIRED(crit_);
+  int64_t UpdateFramesToDecode(int64_t now_ms)
+      RTC_EXCLUSIVE_LOCKS_REQUIRED(crit_);
+  EncodedFrame* GetFrameToDecode() RTC_EXCLUSIVE_LOCKS_REQUIRED(crit_);
+
   // Update all directly dependent and indirectly dependent frames and mark
   // them as continuous if all their references has been fulfilled.
   void PropagateContinuity(FrameMap::iterator start)
@@ -158,9 +180,19 @@
   FrameMap frames_ RTC_GUARDED_BY(crit_);
   DecodedFramesHistory decoded_frames_history_ RTC_GUARDED_BY(crit_);
 
+  // TODO(srte): Remove this lock when always running on task queue.
   rtc::CriticalSection crit_;
   Clock* const clock_;
+  const bool use_task_queue_;
+
+  RepeatingTaskHandle callback_task_ RTC_GUARDED_BY(crit_);
+  std::function<void(std::unique_ptr<EncodedFrame>, ReturnReason)>
+      frame_handler_ RTC_GUARDED_BY(crit_);
+  int64_t latest_return_time_ms_ RTC_GUARDED_BY(crit_);
+  bool keyframe_required_ RTC_GUARDED_BY(crit_);
+
   rtc::Event new_continuous_frame_event_;
+
   VCMJitterEstimator* const jitter_estimator_ RTC_GUARDED_BY(crit_);
   VCMTiming* const timing_ RTC_GUARDED_BY(crit_);
   VCMInterFrameDelay inter_frame_delay_ RTC_GUARDED_BY(crit_);
@@ -174,6 +206,8 @@
 
   const bool add_rtt_to_playout_delay_;
 
+  // Defined last so it is destroyed before other members.
+  rtc::TaskQueue task_queue_;
   RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(FrameBuffer);
 };
 
diff --git a/video/video_receive_stream.cc b/video/video_receive_stream.cc
index 86b9a09..1ded35f 100644
--- a/video/video_receive_stream.cc
+++ b/video/video_receive_stream.cc
@@ -56,6 +56,10 @@
 namespace webrtc {
 
 namespace {
+
+using video_coding::EncodedFrame;
+using ReturnReason = video_coding::FrameBuffer::ReturnReason;
+
 constexpr int kMinBaseMinimumDelayMs = 0;
 constexpr int kMaxBaseMinimumDelayMs = 10000;
 
@@ -184,6 +188,8 @@
       num_cpu_cores_(num_cpu_cores),
       process_thread_(process_thread),
       clock_(clock),
+      use_task_queue_(
+          !field_trial::IsDisabled("WebRTC-Video-DecodeOnTaskQueue")),
       decode_thread_(&DecodeThreadFunction,
                      this,
                      "DecodingThread",
@@ -212,7 +218,10 @@
                                     .value_or(kMaxWaitForKeyFrameMs)),
       max_wait_for_frame_ms_(KeyframeIntervalSettings::ParseFromFieldTrials()
                                  .MaxWaitForFrameMs()
-                                 .value_or(kMaxWaitForFrameMs)) {
+                                 .value_or(kMaxWaitForFrameMs)),
+      decode_queue_(task_queue_factory_->CreateTaskQueue(
+          "DecodingQueue",
+          TaskQueueFactory::Priority::HIGH)) {
   RTC_LOG(LS_INFO) << "VideoReceiveStream: " << config_.ToString();
 
   RTC_DCHECK(config_.renderer);
@@ -237,7 +246,8 @@
 
   jitter_estimator_.reset(new VCMJitterEstimator(clock_));
   frame_buffer_.reset(new video_coding::FrameBuffer(
-      clock_, jitter_estimator_.get(), timing_.get(), &stats_proxy_));
+      clock_, use_task_queue_ ? task_queue_factory_ : nullptr,
+      jitter_estimator_.get(), timing_.get(), &stats_proxy_));
 
   process_thread_->RegisterModule(&rtp_stream_sync_, RTC_FROM_HERE);
 
@@ -308,7 +318,7 @@
 void VideoReceiveStream::Start() {
   RTC_DCHECK_CALLED_SEQUENTIALLY(&worker_sequence_checker_);
 
-  if (decode_thread_.IsRunning()) {
+  if (decoder_running_) {
     return;
   }
 
@@ -387,7 +397,17 @@
   // Start the decode thread
   video_receiver_.DecoderThreadStarting();
   stats_proxy_.DecoderThreadStarting();
-  decode_thread_.Start();
+  if (!use_task_queue_) {
+    decode_thread_.Start();
+  } else {
+    decode_queue_.PostTask([this] {
+      RTC_DCHECK_RUN_ON(&decode_queue_);
+      RTC_DCHECK(decoder_stopped_);
+      decoder_stopped_ = false;
+      StartNextDecode();
+    });
+  }
+  decoder_running_ = true;
   rtp_video_stream_receiver_.StartReceive();
 }
 
@@ -401,13 +421,24 @@
   frame_buffer_->Stop();
   call_stats_->DeregisterStatsObserver(this);
 
-  if (decode_thread_.IsRunning()) {
+  if (decoder_running_) {
     // TriggerDecoderShutdown will release any waiting decoder thread and make
     // it stop immediately, instead of waiting for a timeout. Needs to be called
     // before joining the decoder thread.
     video_receiver_.TriggerDecoderShutdown();
+    if (!use_task_queue_) {
+      decode_thread_.Stop();
+    } else {
+      rtc::Event done;
+      decode_queue_.PostTask([this, &done] {
+        RTC_DCHECK_RUN_ON(&decode_queue_);
+        decoder_stopped_ = true;
+        done.Set();
+      });
+      done.Wait(rtc::Event::kForever);
+    }
+    decoder_running_ = false;
 
-    decode_thread_.Stop();
     video_receiver_.DecoderThreadStopped();
     stats_proxy_.DecoderThreadStopped();
     // Deregister external decoders so they are no longer running during
@@ -511,10 +542,17 @@
     frame_maximum_playout_delay_ms_ = playout_delay.max_ms;
     UpdatePlayoutDelays();
   }
-
-  int64_t last_continuous_pid = frame_buffer_->InsertFrame(std::move(frame));
-  if (last_continuous_pid != -1)
-    rtp_video_stream_receiver_.FrameContinuous(last_continuous_pid);
+  if (!use_task_queue_) {
+    int64_t last_continuous_pid = frame_buffer_->InsertFrame(std::move(frame));
+    if (last_continuous_pid != -1)
+      rtp_video_stream_receiver_.FrameContinuous(last_continuous_pid);
+  } else {
+    frame_buffer_->InsertFrame(
+        std::move(frame), [this](int64_t last_continuous_pid) {
+          if (last_continuous_pid != -1)
+            rtp_video_stream_receiver_.FrameContinuous(last_continuous_pid);
+        });
+  }
 }
 
 void VideoReceiveStream::OnData(uint64_t channel_id,
@@ -562,6 +600,51 @@
   UpdatePlayoutDelays();
 }
 
+int64_t VideoReceiveStream::GetWaitMs() const {
+  return keyframe_required_ ? max_wait_for_keyframe_ms_
+                            : max_wait_for_frame_ms_;
+}
+
+void VideoReceiveStream::StartNextDecode() {
+  RTC_DCHECK(use_task_queue_);
+  TRACE_EVENT0("webrtc", "VideoReceiveStream::StartNextDecode");
+
+  struct DecodeTask {
+    void operator()() {
+      RTC_DCHECK_RUN_ON(&stream->decode_queue_);
+      if (stream->decoder_stopped_)
+        return;
+      if (frame) {
+        stream->HandleEncodedFrame(std::move(frame));
+      } else {
+        stream->HandleFrameBufferTimeout();
+      }
+    }
+    VideoReceiveStream* stream;
+    std::unique_ptr<EncodedFrame> frame;
+  };
+
+  // TODO(philipel): Call NextFrame with |keyframe_required| argument set when
+  //                 downstream project has been fixed.
+  frame_buffer_->NextFrame(
+      GetWaitMs(), /*keyframe_required*/ false,
+      [this](std::unique_ptr<EncodedFrame> frame, ReturnReason res) {
+        RTC_DCHECK_EQ(frame == nullptr, res == ReturnReason::kTimeout);
+        RTC_DCHECK_EQ(frame != nullptr, res == ReturnReason::kFrameFound);
+        decode_queue_.PostTask(DecodeTask{this, std::move(frame)});
+        // Start the next decode after a delay or when the previous decode is
+        // finished (as it will be blocked by the queue).
+        constexpr int kMinDecodeIntervalMs = 1;
+        decode_queue_.PostDelayedTask(
+            [this] {
+              RTC_DCHECK_RUN_ON(&decode_queue_);
+              if (!decoder_stopped_)
+                StartNextDecode();
+            },
+            kMinDecodeIntervalMs);
+      });
+}
+
 void VideoReceiveStream::DecodeThreadFunction(void* ptr) {
   ScopedRegisterThreadForDebugging thread_dbg(RTC_FROM_HERE);
   while (static_cast<VideoReceiveStream*>(ptr)->Decode()) {
@@ -569,82 +652,87 @@
 }
 
 bool VideoReceiveStream::Decode() {
+  RTC_DCHECK(!use_task_queue_);
   TRACE_EVENT0("webrtc", "VideoReceiveStream::Decode");
 
-  const int wait_ms =
-      keyframe_required_ ? max_wait_for_keyframe_ms_ : max_wait_for_frame_ms_;
   std::unique_ptr<video_coding::EncodedFrame> frame;
   // TODO(philipel): Call NextFrame with |keyframe_required| argument when
   //                 downstream project has been fixed.
   video_coding::FrameBuffer::ReturnReason res =
-      frame_buffer_->NextFrame(wait_ms, &frame);
-
-  if (res == video_coding::FrameBuffer::ReturnReason::kStopped) {
+      frame_buffer_->NextFrame(GetWaitMs(), &frame);
+  if (res == ReturnReason::kStopped) {
     return false;
   }
-
   if (frame) {
-    int64_t now_ms = clock_->TimeInMilliseconds();
-    RTC_DCHECK_EQ(res, video_coding::FrameBuffer::ReturnReason::kFrameFound);
-
-    // Current OnPreDecode only cares about QP for VP8.
-    int qp = -1;
-    if (frame->CodecSpecific()->codecType == kVideoCodecVP8) {
-      if (!vp8::GetQp(frame->data(), frame->size(), &qp)) {
-        RTC_LOG(LS_WARNING) << "Failed to extract QP from VP8 video frame";
-      }
-    }
-    stats_proxy_.OnPreDecode(frame->CodecSpecific()->codecType, qp);
-
-    int decode_result = video_receiver_.Decode(frame.get());
-    if (decode_result == WEBRTC_VIDEO_CODEC_OK ||
-        decode_result == WEBRTC_VIDEO_CODEC_OK_REQUEST_KEYFRAME) {
-      keyframe_required_ = false;
-      frame_decoded_ = true;
-      rtp_video_stream_receiver_.FrameDecoded(frame->id.picture_id);
-
-      if (decode_result == WEBRTC_VIDEO_CODEC_OK_REQUEST_KEYFRAME)
-        RequestKeyFrame();
-    } else if (!frame_decoded_ || !keyframe_required_ ||
-               (last_keyframe_request_ms_ + max_wait_for_keyframe_ms_ <
-                now_ms)) {
-      keyframe_required_ = true;
-      // TODO(philipel): Remove this keyframe request when downstream project
-      //                 has been fixed.
-      RequestKeyFrame();
-      last_keyframe_request_ms_ = now_ms;
-    }
+    RTC_DCHECK_EQ(res, ReturnReason::kFrameFound);
+    HandleEncodedFrame(std::move(frame));
   } else {
-    RTC_DCHECK_EQ(res, video_coding::FrameBuffer::ReturnReason::kTimeout);
-    int64_t now_ms = clock_->TimeInMilliseconds();
-    absl::optional<int64_t> last_packet_ms =
-        rtp_video_stream_receiver_.LastReceivedPacketMs();
-    absl::optional<int64_t> last_keyframe_packet_ms =
-        rtp_video_stream_receiver_.LastReceivedKeyframePacketMs();
-
-    // To avoid spamming keyframe requests for a stream that is not active we
-    // check if we have received a packet within the last 5 seconds.
-    bool stream_is_active = last_packet_ms && now_ms - *last_packet_ms < 5000;
-    if (!stream_is_active)
-      stats_proxy_.OnStreamInactive();
-
-    // If we recently have been receiving packets belonging to a keyframe then
-    // we assume a keyframe is currently being received.
-    bool receiving_keyframe =
-        last_keyframe_packet_ms &&
-        now_ms - *last_keyframe_packet_ms < max_wait_for_keyframe_ms_;
-
-    if (stream_is_active && !receiving_keyframe &&
-        (!config_.crypto_options.sframe.require_frame_encryption ||
-         rtp_video_stream_receiver_.IsDecryptable())) {
-      RTC_LOG(LS_WARNING) << "No decodable frame in " << wait_ms
-                          << " ms, requesting keyframe.";
-      RequestKeyFrame();
-    }
+    RTC_DCHECK_EQ(res, ReturnReason::kTimeout);
+    HandleFrameBufferTimeout();
   }
   return true;
 }
 
+void VideoReceiveStream::HandleEncodedFrame(
+    std::unique_ptr<EncodedFrame> frame) {
+  int64_t now_ms = clock_->TimeInMilliseconds();
+
+  // Current OnPreDecode only cares about QP for VP8.
+  int qp = -1;
+  if (frame->CodecSpecific()->codecType == kVideoCodecVP8) {
+    if (!vp8::GetQp(frame->data(), frame->size(), &qp)) {
+      RTC_LOG(LS_WARNING) << "Failed to extract QP from VP8 video frame";
+    }
+  }
+  stats_proxy_.OnPreDecode(frame->CodecSpecific()->codecType, qp);
+
+  int decode_result = video_receiver_.Decode(frame.get());
+  if (decode_result == WEBRTC_VIDEO_CODEC_OK ||
+      decode_result == WEBRTC_VIDEO_CODEC_OK_REQUEST_KEYFRAME) {
+    keyframe_required_ = false;
+    frame_decoded_ = true;
+    rtp_video_stream_receiver_.FrameDecoded(frame->id.picture_id);
+
+    if (decode_result == WEBRTC_VIDEO_CODEC_OK_REQUEST_KEYFRAME)
+      RequestKeyFrame();
+  } else if (!frame_decoded_ || !keyframe_required_ ||
+             (last_keyframe_request_ms_ + max_wait_for_keyframe_ms_ < now_ms)) {
+    keyframe_required_ = true;
+    // TODO(philipel): Remove this keyframe request when downstream project
+    //                 has been fixed.
+    RequestKeyFrame();
+    last_keyframe_request_ms_ = now_ms;
+  }
+}
+
+void VideoReceiveStream::HandleFrameBufferTimeout() {
+  int64_t now_ms = clock_->TimeInMilliseconds();
+  absl::optional<int64_t> last_packet_ms =
+      rtp_video_stream_receiver_.LastReceivedPacketMs();
+  absl::optional<int64_t> last_keyframe_packet_ms =
+      rtp_video_stream_receiver_.LastReceivedKeyframePacketMs();
+
+  // To avoid spamming keyframe requests for a stream that is not active we
+  // check if we have received a packet within the last 5 seconds.
+  bool stream_is_active = last_packet_ms && now_ms - *last_packet_ms < 5000;
+  if (!stream_is_active)
+    stats_proxy_.OnStreamInactive();
+
+  // If we recently have been receiving packets belonging to a keyframe then
+  // we assume a keyframe is currently being received.
+  bool receiving_keyframe =
+      last_keyframe_packet_ms &&
+      now_ms - *last_keyframe_packet_ms < max_wait_for_keyframe_ms_;
+
+  if (stream_is_active && !receiving_keyframe &&
+      (!config_.crypto_options.sframe.require_frame_encryption ||
+       rtp_video_stream_receiver_.IsDecryptable())) {
+    RTC_LOG(LS_WARNING) << "No decodable frame in " << GetWaitMs()
+                        << " ms, requesting keyframe.";
+    RequestKeyFrame();
+  }
+}
+
 void VideoReceiveStream::UpdatePlayoutDelays() const {
   const int minimum_delay_ms =
       std::max({frame_minimum_playout_delay_ms_, base_minimum_playout_delay_ms_,
diff --git a/video/video_receive_stream.h b/video/video_receive_stream.h
index 162ef8c..bc2469c 100644
--- a/video/video_receive_stream.h
+++ b/video/video_receive_stream.h
@@ -23,6 +23,7 @@
 #include "modules/video_coding/frame_buffer2.h"
 #include "modules/video_coding/video_coding_impl.h"
 #include "rtc_base/sequenced_task_checker.h"
+#include "rtc_base/task_queue.h"
 #include "system_wrappers/include/clock.h"
 #include "video/receive_statistics_proxy.h"
 #include "video/rtp_streams_synchronizer.h"
@@ -129,8 +130,13 @@
   std::vector<webrtc::RtpSource> GetSources() const override;
 
  private:
+  int64_t GetWaitMs() const;
+  void StartNextDecode() RTC_RUN_ON(decode_queue_);
   static void DecodeThreadFunction(void* ptr);
   bool Decode();
+  void HandleEncodedFrame(std::unique_ptr<video_coding::EncodedFrame> frame);
+  void HandleFrameBufferTimeout();
+
   void UpdatePlayoutDelays() const
       RTC_EXCLUSIVE_LOCKS_REQUIRED(playout_delay_lock_);
 
@@ -146,10 +152,15 @@
   ProcessThread* const process_thread_;
   Clock* const clock_;
 
+  const bool use_task_queue_;
+
   rtc::PlatformThread decode_thread_;
 
   CallStats* const call_stats_;
 
+  bool decoder_running_ RTC_GUARDED_BY(worker_sequence_checker_) = false;
+  bool decoder_stopped_ RTC_GUARDED_BY(decode_queue_) = true;
+
   ReceiveStatisticsProxy stats_proxy_;
   // Shared by media and rtx stream receivers, since the latter has no RtpRtcp
   // module of its own.
@@ -165,10 +176,10 @@
   // TODO(nisse, philipel): Creation and ownership of video encoders should be
   // moved to the new VideoStreamDecoder.
   std::vector<std::unique_ptr<VideoDecoder>> video_decoders_;
+  std::unique_ptr<video_coding::FrameBuffer> frame_buffer_;
 
   // Members for the new jitter buffer experiment.
   std::unique_ptr<VCMJitterEstimator> jitter_estimator_;
-  std::unique_ptr<video_coding::FrameBuffer> frame_buffer_;
 
   std::unique_ptr<RtpStreamReceiverInterface> media_receiver_;
   std::unique_ptr<RtxReceiveStream> rtx_receive_stream_;
@@ -204,6 +215,9 @@
 
   // Maximum delay as decided by the RTP playout delay extension.
   int frame_maximum_playout_delay_ms_ RTC_GUARDED_BY(playout_delay_lock_) = -1;
+
+  // Defined last so they are destroyed before all other members.
+  rtc::TaskQueue decode_queue_;
 };
 }  // namespace internal
 }  // namespace webrtc