VideoStreamEncoder: expect frame entry on the encoder queue.

This change switches the sequence used by the FrameCadenceAdapter
to be the encoder_queue, enabling VideoStreamEncoder::OnFrame to be
invoked directly on the encoder_queue and eliminates the contained
PostTasks.

Bug: chromium:1255737
Change-Id: Ib86fc96ad2be9a38585fef2535855e3f9cc7e57c
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/238171
Commit-Queue: Markus Handell <handellm@webrtc.org>
Reviewed-by: Ilya Nikolaevskiy <ilnik@webrtc.org>
Reviewed-by: Niels Moller <nisse@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#35380}
diff --git a/video/BUILD.gn b/video/BUILD.gn
index 4821699..84b2d98 100644
--- a/video/BUILD.gn
+++ b/video/BUILD.gn
@@ -275,6 +275,7 @@
     "../rtc_base/synchronization:mutex",
     "../rtc_base/task_utils:pending_task_safety_flag",
     "../rtc_base/task_utils:to_queued_task",
+    "../system_wrappers",
     "../system_wrappers:field_trial",
     "../system_wrappers:metrics",
   ]
diff --git a/video/frame_cadence_adapter.cc b/video/frame_cadence_adapter.cc
index 030814b..c82ab5a 100644
--- a/video/frame_cadence_adapter.cc
+++ b/video/frame_cadence_adapter.cc
@@ -10,6 +10,7 @@
 
 #include "video/frame_cadence_adapter.h"
 
+#include <atomic>
 #include <memory>
 #include <utility>
 
@@ -20,6 +21,7 @@
 #include "rtc_base/synchronization/mutex.h"
 #include "rtc_base/task_utils/pending_task_safety_flag.h"
 #include "rtc_base/task_utils/to_queued_task.h"
+#include "system_wrappers/include/clock.h"
 #include "system_wrappers/include/field_trial.h"
 #include "system_wrappers/include/metrics.h"
 
@@ -28,7 +30,7 @@
 
 class FrameCadenceAdapterImpl : public FrameCadenceAdapterInterface {
  public:
-  explicit FrameCadenceAdapterImpl(TaskQueueBase* worker_queue);
+  FrameCadenceAdapterImpl(Clock* clock, TaskQueueBase* queue);
 
   // FrameCadenceAdapterInterface overrides.
   void Initialize(Callback* callback) override;
@@ -42,12 +44,15 @@
 
  private:
   // Called from OnFrame in zero-hertz mode.
-  void OnFrameOnMainQueue(const VideoFrame& frame) RTC_RUN_ON(worker_queue_);
+  void OnFrameOnMainQueue(Timestamp post_time,
+                          int frames_scheduled_for_processing,
+                          const VideoFrame& frame) RTC_RUN_ON(queue_);
 
   // Called to report on constraint UMAs.
-  void MaybeReportFrameRateConstraintUmas() RTC_RUN_ON(&worker_queue_);
+  void MaybeReportFrameRateConstraintUmas() RTC_RUN_ON(&queue_);
 
-  TaskQueueBase* const worker_queue_;
+  Clock* const clock_;
+  TaskQueueBase* const queue_;
 
   // True if we support frame entry for screenshare with a minimum frequency of
   // 0 Hz.
@@ -58,34 +63,36 @@
 
   // The source's constraints.
   absl::optional<VideoTrackSourceConstraints> source_constraints_
-      RTC_GUARDED_BY(worker_queue_);
+      RTC_GUARDED_BY(queue_);
 
   // Whether zero-hertz and UMA reporting is enabled.
-  bool zero_hertz_and_uma_reporting_enabled_ RTC_GUARDED_BY(worker_queue_) =
-      false;
+  bool zero_hertz_and_uma_reporting_enabled_ RTC_GUARDED_BY(queue_) = false;
 
   // Race checker for incoming frames. This is the network thread in chromium,
   // but may vary from test contexts.
   rtc::RaceChecker incoming_frame_race_checker_;
-  bool has_reported_screenshare_frame_rate_umas_ RTC_GUARDED_BY(worker_queue_) =
-      false;
+  bool has_reported_screenshare_frame_rate_umas_ RTC_GUARDED_BY(queue_) = false;
 
-  ScopedTaskSafety safety_;
+  // Number of frames that are currently scheduled for processing on the
+  // |queue_|.
+  std::atomic<int> frames_scheduled_for_processing_{0};
+
+  ScopedTaskSafetyDetached safety_;
 };
 
-FrameCadenceAdapterImpl::FrameCadenceAdapterImpl(TaskQueueBase* worker_queue)
-    : worker_queue_(worker_queue),
+FrameCadenceAdapterImpl::FrameCadenceAdapterImpl(Clock* clock,
+                                                 TaskQueueBase* queue)
+    : clock_(clock),
+      queue_(queue),
       zero_hertz_screenshare_enabled_(
-          field_trial::IsEnabled("WebRTC-ZeroHertzScreenshare")) {
-  RTC_DCHECK_RUN_ON(worker_queue_);
-}
+          field_trial::IsEnabled("WebRTC-ZeroHertzScreenshare")) {}
 
 void FrameCadenceAdapterImpl::Initialize(Callback* callback) {
   callback_ = callback;
 }
 
 void FrameCadenceAdapterImpl::SetZeroHertzModeEnabled(bool enabled) {
-  RTC_DCHECK_RUN_ON(worker_queue_);
+  RTC_DCHECK_RUN_ON(queue_);
   if (enabled && !zero_hertz_and_uma_reporting_enabled_)
     has_reported_screenshare_frame_rate_umas_ = false;
   zero_hertz_and_uma_reporting_enabled_ = enabled;
@@ -95,9 +102,17 @@
   // This method is called on the network thread under Chromium, or other
   // various contexts in test.
   RTC_DCHECK_RUNS_SERIALIZED(&incoming_frame_race_checker_);
-  worker_queue_->PostTask(ToQueuedTask(safety_, [this, frame] {
-    RTC_DCHECK_RUN_ON(worker_queue_);
-    OnFrameOnMainQueue(std::move(frame));
+
+  // Local time in webrtc time base.
+  Timestamp post_time = clock_->CurrentTime();
+  frames_scheduled_for_processing_.fetch_add(1, std::memory_order_relaxed);
+  queue_->PostTask(ToQueuedTask(safety_.flag(), [this, post_time, frame] {
+    RTC_DCHECK_RUN_ON(queue_);
+    const int frames_scheduled_for_processing =
+        frames_scheduled_for_processing_.fetch_sub(1,
+                                                   std::memory_order_relaxed);
+    OnFrameOnMainQueue(post_time, frames_scheduled_for_processing,
+                       std::move(frame));
     MaybeReportFrameRateConstraintUmas();
   }));
 }
@@ -107,18 +122,21 @@
   RTC_LOG(LS_INFO) << __func__ << " min_fps "
                    << constraints.min_fps.value_or(-1) << " max_fps "
                    << constraints.max_fps.value_or(-1);
-  worker_queue_->PostTask(ToQueuedTask(safety_, [this, constraints] {
-    RTC_DCHECK_RUN_ON(worker_queue_);
+  queue_->PostTask(ToQueuedTask(safety_.flag(), [this, constraints] {
+    RTC_DCHECK_RUN_ON(queue_);
     source_constraints_ = constraints;
   }));
 }
 
-// RTC_RUN_ON(worker_queue_)
-void FrameCadenceAdapterImpl::OnFrameOnMainQueue(const VideoFrame& frame) {
-  callback_->OnFrame(frame);
+// RTC_RUN_ON(queue_)
+void FrameCadenceAdapterImpl::OnFrameOnMainQueue(
+    Timestamp post_time,
+    int frames_scheduled_for_processing,
+    const VideoFrame& frame) {
+  callback_->OnFrame(post_time, frames_scheduled_for_processing, frame);
 }
 
-// RTC_RUN_ON(worker_queue_)
+// RTC_RUN_ON(queue_)
 void FrameCadenceAdapterImpl::MaybeReportFrameRateConstraintUmas() {
   if (has_reported_screenshare_frame_rate_umas_)
     return;
@@ -175,8 +193,8 @@
 }  // namespace
 
 std::unique_ptr<FrameCadenceAdapterInterface>
-FrameCadenceAdapterInterface::Create(TaskQueueBase* worker_queue) {
-  return std::make_unique<FrameCadenceAdapterImpl>(worker_queue);
+FrameCadenceAdapterInterface::Create(Clock* clock, TaskQueueBase* queue) {
+  return std::make_unique<FrameCadenceAdapterImpl>(clock, queue);
 }
 
 }  // namespace webrtc
diff --git a/video/frame_cadence_adapter.h b/video/frame_cadence_adapter.h
index ca702cf..beb7396 100644
--- a/video/frame_cadence_adapter.h
+++ b/video/frame_cadence_adapter.h
@@ -18,13 +18,14 @@
 #include "api/video/video_sink_interface.h"
 #include "rtc_base/synchronization/mutex.h"
 #include "rtc_base/thread_annotations.h"
+#include "system_wrappers/include/clock.h"
 
 namespace webrtc {
 
 // A sink adapter implementing mutations to the received frame cadence.
-// With the exception of construction & destruction which has to happen on the
-// same sequence, this class is thread-safe because three different execution
-// contexts call into it.
+// With the exception of the constructor and the methods overridden in
+// VideoSinkInterface, the rest of the interface to this class (including dtor)
+// needs to happen on the queue passed in Create.
 class FrameCadenceAdapterInterface
     : public rtc::VideoSinkInterface<VideoFrame> {
  public:
@@ -33,8 +34,20 @@
    public:
     virtual ~Callback() = default;
 
-    // Called when a frame arrives.
-    virtual void OnFrame(const VideoFrame& frame) = 0;
+    // Called when a frame arrives on the |queue| specified in Create.
+    //
+    // The |post_time| parameter indicates the current time sampled when
+    // FrameCadenceAdapterInterface::OnFrame was called.
+    //
+    // |frames_scheduled_for_processing| indicates how many frames that have
+    // been scheduled for processing. During sequential conditions where
+    // FrameCadenceAdapterInterface::OnFrame is invoked and subsequently ending
+    // up in this callback, this value will read 1. Otherwise if the
+    // |queue| gets stalled for some reason, the value will increase
+    // beyond 1.
+    virtual void OnFrame(Timestamp post_time,
+                         int frames_scheduled_for_processing,
+                         const VideoFrame& frame) = 0;
 
     // Called when the source has discarded a frame.
     virtual void OnDiscardedFrame() = 0;
@@ -42,8 +55,11 @@
 
   // Factory function creating a production instance. Deletion of the returned
   // instance needs to happen on the same sequence that Create() was called on.
+  // Frames arriving in FrameCadenceAdapterInterface::OnFrame are posted to
+  // Callback::OnFrame on the |queue|.
   static std::unique_ptr<FrameCadenceAdapterInterface> Create(
-      TaskQueueBase* worker_queue);
+      Clock* clock,
+      TaskQueueBase* queue);
 
   // Call before using the rest of the API.
   virtual void Initialize(Callback* callback) = 0;
diff --git a/video/frame_cadence_adapter_unittest.cc b/video/frame_cadence_adapter_unittest.cc
index 56fa220..a6b6a87 100644
--- a/video/frame_cadence_adapter_unittest.cc
+++ b/video/frame_cadence_adapter_unittest.cc
@@ -26,6 +26,7 @@
 namespace webrtc {
 namespace {
 
+using ::testing::_;
 using ::testing::ElementsAre;
 using ::testing::Mock;
 using ::testing::Pair;
@@ -39,13 +40,13 @@
       .build();
 }
 
-std::unique_ptr<FrameCadenceAdapterInterface> CreateAdapter() {
-  return FrameCadenceAdapterInterface::Create(TaskQueueBase::Current());
+std::unique_ptr<FrameCadenceAdapterInterface> CreateAdapter(Clock* clock) {
+  return FrameCadenceAdapterInterface::Create(clock, TaskQueueBase::Current());
 }
 
 class MockCallback : public FrameCadenceAdapterInterface::Callback {
  public:
-  MOCK_METHOD(void, OnFrame, (const VideoFrame&), (override));
+  MOCK_METHOD(void, OnFrame, (Timestamp, int, const VideoFrame&), (override));
   MOCK_METHOD(void, OnDiscardedFrame, (), (override));
 };
 
@@ -61,7 +62,7 @@
   auto disabler = std::make_unique<ZeroHertzFieldTrialDisabler>();
   for (int i = 0; i != 2; i++) {
     MockCallback callback;
-    auto adapter = CreateAdapter();
+    auto adapter = CreateAdapter(time_controller.GetClock());
     adapter->Initialize(&callback);
     VideoFrame frame = CreateFrame();
     EXPECT_CALL(callback, OnFrame).Times(1);
@@ -76,6 +77,22 @@
   }
 }
 
+TEST(FrameCadenceAdapterTest, CountsOutstandingFramesToProcess) {
+  GlobalSimulatedTimeController time_controller(Timestamp::Millis(1));
+  MockCallback callback;
+  auto adapter = CreateAdapter(time_controller.GetClock());
+  adapter->Initialize(&callback);
+  EXPECT_CALL(callback, OnFrame(_, 2, _)).Times(1);
+  EXPECT_CALL(callback, OnFrame(_, 1, _)).Times(1);
+  auto frame = CreateFrame();
+  adapter->OnFrame(frame);
+  adapter->OnFrame(frame);
+  time_controller.AdvanceTime(TimeDelta::Zero());
+  EXPECT_CALL(callback, OnFrame(_, 1, _)).Times(1);
+  adapter->OnFrame(frame);
+  time_controller.AdvanceTime(TimeDelta::Zero());
+}
+
 class FrameCadenceAdapterMetricsTest : public ::testing::Test {
  public:
   FrameCadenceAdapterMetricsTest() : time_controller_(Timestamp::Millis(1)) {
@@ -83,13 +100,13 @@
   }
   void DepleteTaskQueues() { time_controller_.AdvanceTime(TimeDelta::Zero()); }
 
- private:
+ protected:
   GlobalSimulatedTimeController time_controller_;
 };
 
 TEST_F(FrameCadenceAdapterMetricsTest, RecordsNoUmasWithNoFrameTransfer) {
   MockCallback callback;
-  auto adapter = CreateAdapter();
+  auto adapter = CreateAdapter(nullptr);
   adapter->Initialize(&callback);
   adapter->OnConstraintsChanged(
       VideoTrackSourceConstraints{absl::nullopt, absl::nullopt});
@@ -129,7 +146,7 @@
 
 TEST_F(FrameCadenceAdapterMetricsTest, RecordsNoUmasWithoutEnabledContentType) {
   MockCallback callback;
-  auto adapter = CreateAdapter();
+  auto adapter = CreateAdapter(time_controller_.GetClock());
   adapter->Initialize(&callback);
   adapter->OnFrame(CreateFrame());
   adapter->OnConstraintsChanged(
@@ -170,7 +187,7 @@
 
 TEST_F(FrameCadenceAdapterMetricsTest, RecordsNoConstraintsIfUnsetOnFrame) {
   MockCallback callback;
-  auto adapter = CreateAdapter();
+  auto adapter = CreateAdapter(time_controller_.GetClock());
   adapter->Initialize(&callback);
   adapter->SetZeroHertzModeEnabled(true);
   adapter->OnFrame(CreateFrame());
@@ -182,7 +199,7 @@
 
 TEST_F(FrameCadenceAdapterMetricsTest, RecordsEmptyConstraintsIfSetOnFrame) {
   MockCallback callback;
-  auto adapter = CreateAdapter();
+  auto adapter = CreateAdapter(time_controller_.GetClock());
   adapter->Initialize(&callback);
   adapter->SetZeroHertzModeEnabled(true);
   adapter->OnConstraintsChanged(
@@ -221,7 +238,7 @@
 
 TEST_F(FrameCadenceAdapterMetricsTest, RecordsMaxConstraintIfSetOnFrame) {
   MockCallback callback;
-  auto adapter = CreateAdapter();
+  auto adapter = CreateAdapter(time_controller_.GetClock());
   adapter->Initialize(&callback);
   adapter->SetZeroHertzModeEnabled(true);
   adapter->OnConstraintsChanged(
@@ -257,7 +274,7 @@
 
 TEST_F(FrameCadenceAdapterMetricsTest, RecordsMinConstraintIfSetOnFrame) {
   MockCallback callback;
-  auto adapter = CreateAdapter();
+  auto adapter = CreateAdapter(time_controller_.GetClock());
   adapter->Initialize(&callback);
   adapter->SetZeroHertzModeEnabled(true);
   adapter->OnConstraintsChanged(
@@ -293,7 +310,7 @@
 
 TEST_F(FrameCadenceAdapterMetricsTest, RecordsMinGtMaxConstraintIfSetOnFrame) {
   MockCallback callback;
-  auto adapter = CreateAdapter();
+  auto adapter = CreateAdapter(time_controller_.GetClock());
   adapter->Initialize(&callback);
   adapter->SetZeroHertzModeEnabled(true);
   adapter->OnConstraintsChanged(VideoTrackSourceConstraints{5.0, 4.0});
@@ -328,7 +345,7 @@
 
 TEST_F(FrameCadenceAdapterMetricsTest, RecordsMinLtMaxConstraintIfSetOnFrame) {
   MockCallback callback;
-  auto adapter = CreateAdapter();
+  auto adapter = CreateAdapter(time_controller_.GetClock());
   adapter->Initialize(&callback);
   adapter->SetZeroHertzModeEnabled(true);
   adapter->OnConstraintsChanged(VideoTrackSourceConstraints{4.0, 5.0});
diff --git a/video/video_send_stream.cc b/video/video_send_stream.cc
index 05708d0..e78211b 100644
--- a/video/video_send_stream.cc
+++ b/video/video_send_stream.cc
@@ -12,6 +12,7 @@
 #include <utility>
 
 #include "api/array_view.h"
+#include "api/task_queue/task_queue_base.h"
 #include "api/video/video_stream_encoder_settings.h"
 #include "modules/rtp_rtcp/include/rtp_header_extension_map.h"
 #include "modules/rtp_rtcp/source/rtp_header_extension_size.h"
@@ -106,6 +107,25 @@
   return observers;
 }
 
+std::unique_ptr<VideoStreamEncoder> CreateVideoStreamEncoder(
+    Clock* clock,
+    int num_cpu_cores,
+    TaskQueueFactory* task_queue_factory,
+    SendStatisticsProxy* stats_proxy,
+    const VideoStreamEncoderSettings& encoder_settings,
+    VideoStreamEncoder::BitrateAllocationCallbackType
+        bitrate_allocation_callback_type) {
+  std::unique_ptr<TaskQueueBase, TaskQueueDeleter> encoder_queue =
+      task_queue_factory->CreateTaskQueue("EncoderQueue",
+                                          TaskQueueFactory::Priority::NORMAL);
+  TaskQueueBase* encoder_queue_ptr = encoder_queue.get();
+  return std::make_unique<VideoStreamEncoder>(
+      clock, num_cpu_cores, stats_proxy, encoder_settings,
+      std::make_unique<OveruseFrameDetector>(stats_proxy),
+      FrameCadenceAdapterInterface::Create(clock, encoder_queue_ptr),
+      std::move(encoder_queue), bitrate_allocation_callback_type);
+}
+
 }  // namespace
 
 namespace internal {
@@ -130,17 +150,13 @@
       stats_proxy_(clock, config, encoder_config.content_type),
       config_(std::move(config)),
       content_type_(encoder_config.content_type),
-      video_stream_encoder_(std::make_unique<VideoStreamEncoder>(
-          clock,
-          num_cpu_cores,
-          &stats_proxy_,
-          config_.encoder_settings,
-          std::make_unique<OveruseFrameDetector>(&stats_proxy_),
-          FrameCadenceAdapterInterface::Create(
-              /*worker_queue=*/TaskQueueBase::Current()),
-          task_queue_factory,
-          /*worker_queue=*/TaskQueueBase::Current(),
-          GetBitrateAllocationCallbackType(config_))),
+      video_stream_encoder_(
+          CreateVideoStreamEncoder(clock,
+                                   num_cpu_cores,
+                                   task_queue_factory,
+                                   &stats_proxy_,
+                                   config_.encoder_settings,
+                                   GetBitrateAllocationCallbackType(config_))),
       encoder_feedback_(
           clock,
           config_.rtp.ssrcs,
diff --git a/video/video_stream_encoder.cc b/video/video_stream_encoder.cc
index c6870a0..1c0de4b 100644
--- a/video/video_stream_encoder.cc
+++ b/video/video_stream_encoder.cc
@@ -593,8 +593,8 @@
     const VideoStreamEncoderSettings& settings,
     std::unique_ptr<OveruseFrameDetector> overuse_detector,
     std::unique_ptr<FrameCadenceAdapterInterface> frame_cadence_adapter,
-    TaskQueueFactory* task_queue_factory,
-    TaskQueueBase* worker_queue,
+    std::unique_ptr<webrtc::TaskQueueBase, webrtc::TaskQueueDeleter>
+        encoder_queue,
     BitrateAllocationCallbackType allocation_cb_type)
     : worker_queue_(TaskQueueBase::Current()),
       number_of_cores_(number_of_cores),
@@ -618,7 +618,6 @@
       was_encode_called_since_last_initialization_(false),
       encoder_failed_(false),
       clock_(clock),
-      posted_frames_waiting_for_encode_(0),
       last_captured_timestamp_(0),
       delta_ntp_internal_ms_(clock_->CurrentNtpInMilliseconds() -
                              clock_->TimeInMilliseconds()),
@@ -665,9 +664,7 @@
           !field_trial::IsEnabled("WebRTC-DefaultBitrateLimitsKillSwitch")),
       qp_parsing_allowed_(
           !field_trial::IsEnabled("WebRTC-QpParsingKillSwitch")),
-      encoder_queue_(task_queue_factory->CreateTaskQueue(
-          "EncoderQueue",
-          TaskQueueFactory::Priority::NORMAL)) {
+      encoder_queue_(std::move(encoder_queue)) {
   TRACE_EVENT0("webrtc", "VideoStreamEncoder::VideoStreamEncoder");
   RTC_DCHECK_RUN_ON(worker_queue_);
   RTC_DCHECK(encoder_stats_observer);
@@ -732,6 +729,7 @@
     rate_allocator_ = nullptr;
     ReleaseEncoder();
     encoder_ = nullptr;
+    frame_cadence_adapter_ = nullptr;
     shutdown_event.Set();
   });
   shutdown_event.Wait(rtc::Event::kForever);
@@ -824,14 +822,14 @@
 void VideoStreamEncoder::ConfigureEncoder(VideoEncoderConfig config,
                                           size_t max_data_payload_length) {
   RTC_DCHECK_RUN_ON(worker_queue_);
-  frame_cadence_adapter_->SetZeroHertzModeEnabled(
-      config.content_type == VideoEncoderConfig::ContentType::kScreen);
   encoder_queue_.PostTask(
       [this, config = std::move(config), max_data_payload_length]() mutable {
         RTC_DCHECK_RUN_ON(&encoder_queue_);
         RTC_DCHECK(sink_);
         RTC_LOG(LS_INFO) << "ConfigureEncoder requested.";
 
+        frame_cadence_adapter_->SetZeroHertzModeEnabled(
+            config.content_type == VideoEncoderConfig::ContentType::kScreen);
         pending_encoder_creation_ =
             (!encoder_ || encoder_config_.video_format != config.video_format ||
              max_data_payload_length_ != max_data_payload_length);
@@ -1261,19 +1259,18 @@
   degradation_preference_manager_->SetIsScreenshare(is_screenshare);
 }
 
-void VideoStreamEncoder::OnFrame(const VideoFrame& video_frame) {
-  RTC_DCHECK_RUN_ON(worker_queue_);
+void VideoStreamEncoder::OnFrame(Timestamp post_time,
+                                 int frames_scheduled_for_processing,
+                                 const VideoFrame& video_frame) {
+  RTC_DCHECK_RUN_ON(&encoder_queue_);
   VideoFrame incoming_frame = video_frame;
 
-  // Local time in webrtc time base.
-  Timestamp now = clock_->CurrentTime();
-
   // In some cases, e.g., when the frame from decoder is fed to encoder,
   // the timestamp may be set to the future. As the encoding pipeline assumes
   // capture time to be less than present time, we should reset the capture
   // timestamps here. Otherwise there may be issues with RTP send stream.
-  if (incoming_frame.timestamp_us() > now.us())
-    incoming_frame.set_timestamp_us(now.us());
+  if (incoming_frame.timestamp_us() > post_time.us())
+    incoming_frame.set_timestamp_us(post_time.us());
 
   // Capture time may come from clock with an offset and drift from clock_.
   int64_t capture_ntp_time_ms;
@@ -1282,7 +1279,7 @@
   } else if (video_frame.render_time_ms() != 0) {
     capture_ntp_time_ms = video_frame.render_time_ms() + delta_ntp_internal_ms_;
   } else {
-    capture_ntp_time_ms = now.ms() + delta_ntp_internal_ms_;
+    capture_ntp_time_ms = post_time.ms() + delta_ntp_internal_ms_;
   }
   incoming_frame.set_ntp_time_ms(capture_ntp_time_ms);
 
@@ -1306,62 +1303,51 @@
   }
 
   bool log_stats = false;
-  if (now.ms() - last_frame_log_ms_ > kFrameLogIntervalMs) {
-    last_frame_log_ms_ = now.ms();
+  if (post_time.ms() - last_frame_log_ms_ > kFrameLogIntervalMs) {
+    last_frame_log_ms_ = post_time.ms();
     log_stats = true;
   }
 
   last_captured_timestamp_ = incoming_frame.ntp_time_ms();
 
-  int64_t post_time_us = clock_->CurrentTime().us();
-  ++posted_frames_waiting_for_encode_;
-
-  encoder_queue_.PostTask(
-      [this, incoming_frame, post_time_us, log_stats]() {
-        RTC_DCHECK_RUN_ON(&encoder_queue_);
-        encoder_stats_observer_->OnIncomingFrame(incoming_frame.width(),
-                                                 incoming_frame.height());
-        ++captured_frame_count_;
-        const int posted_frames_waiting_for_encode =
-            posted_frames_waiting_for_encode_.fetch_sub(1);
-        RTC_DCHECK_GT(posted_frames_waiting_for_encode, 0);
-        CheckForAnimatedContent(incoming_frame, post_time_us);
-        bool cwnd_frame_drop =
-            cwnd_frame_drop_interval_ &&
-            (cwnd_frame_counter_++ % cwnd_frame_drop_interval_.value() == 0);
-        if (posted_frames_waiting_for_encode == 1 && !cwnd_frame_drop) {
-          MaybeEncodeVideoFrame(incoming_frame, post_time_us);
-        } else {
-          if (cwnd_frame_drop) {
-            // Frame drop by congestion window pushback. Do not encode this
-            // frame.
-            ++dropped_frame_cwnd_pushback_count_;
-            encoder_stats_observer_->OnFrameDropped(
-                VideoStreamEncoderObserver::DropReason::kCongestionWindow);
-          } else {
-            // There is a newer frame in flight. Do not encode this frame.
-            RTC_LOG(LS_VERBOSE)
-                << "Incoming frame dropped due to that the encoder is blocked.";
-            ++dropped_frame_encoder_block_count_;
-            encoder_stats_observer_->OnFrameDropped(
-                VideoStreamEncoderObserver::DropReason::kEncoderQueue);
-          }
-          accumulated_update_rect_.Union(incoming_frame.update_rect());
-          accumulated_update_rect_is_valid_ &= incoming_frame.has_update_rect();
-        }
-        if (log_stats) {
-          RTC_LOG(LS_INFO) << "Number of frames: captured "
-                           << captured_frame_count_
-                           << ", dropped (due to congestion window pushback) "
-                           << dropped_frame_cwnd_pushback_count_
-                           << ", dropped (due to encoder blocked) "
-                           << dropped_frame_encoder_block_count_
-                           << ", interval_ms " << kFrameLogIntervalMs;
-          captured_frame_count_ = 0;
-          dropped_frame_cwnd_pushback_count_ = 0;
-          dropped_frame_encoder_block_count_ = 0;
-        }
-      });
+  encoder_stats_observer_->OnIncomingFrame(incoming_frame.width(),
+                                           incoming_frame.height());
+  ++captured_frame_count_;
+  CheckForAnimatedContent(incoming_frame, post_time.us());
+  bool cwnd_frame_drop =
+      cwnd_frame_drop_interval_ &&
+      (cwnd_frame_counter_++ % cwnd_frame_drop_interval_.value() == 0);
+  if (frames_scheduled_for_processing == 1 && !cwnd_frame_drop) {
+    MaybeEncodeVideoFrame(incoming_frame, post_time.us());
+  } else {
+    if (cwnd_frame_drop) {
+      // Frame drop by congestion window pushback. Do not encode this
+      // frame.
+      ++dropped_frame_cwnd_pushback_count_;
+      encoder_stats_observer_->OnFrameDropped(
+          VideoStreamEncoderObserver::DropReason::kCongestionWindow);
+    } else {
+      // There is a newer frame in flight. Do not encode this frame.
+      RTC_LOG(LS_VERBOSE)
+          << "Incoming frame dropped due to that the encoder is blocked.";
+      ++dropped_frame_encoder_block_count_;
+      encoder_stats_observer_->OnFrameDropped(
+          VideoStreamEncoderObserver::DropReason::kEncoderQueue);
+    }
+    accumulated_update_rect_.Union(incoming_frame.update_rect());
+    accumulated_update_rect_is_valid_ &= incoming_frame.has_update_rect();
+  }
+  if (log_stats) {
+    RTC_LOG(LS_INFO) << "Number of frames: captured " << captured_frame_count_
+                     << ", dropped (due to congestion window pushback) "
+                     << dropped_frame_cwnd_pushback_count_
+                     << ", dropped (due to encoder blocked) "
+                     << dropped_frame_encoder_block_count_ << ", interval_ms "
+                     << kFrameLogIntervalMs;
+    captured_frame_count_ = 0;
+    dropped_frame_cwnd_pushback_count_ = 0;
+    dropped_frame_encoder_block_count_ = 0;
+  }
 }
 
 void VideoStreamEncoder::OnDiscardedFrame() {
diff --git a/video/video_stream_encoder.h b/video/video_stream_encoder.h
index e0eda70..4231f1b 100644
--- a/video/video_stream_encoder.h
+++ b/video/video_stream_encoder.h
@@ -77,8 +77,8 @@
       const VideoStreamEncoderSettings& settings,
       std::unique_ptr<OveruseFrameDetector> overuse_detector,
       std::unique_ptr<FrameCadenceAdapterInterface> frame_cadence_adapter,
-      TaskQueueFactory* task_queue_factory,
-      TaskQueueBase* worker_queue,
+      std::unique_ptr<webrtc::TaskQueueBase, webrtc::TaskQueueDeleter>
+          encoder_queue,
       BitrateAllocationCallbackType allocation_cb_type);
   ~VideoStreamEncoder() override;
 
@@ -146,8 +146,11 @@
     explicit CadenceCallback(VideoStreamEncoder& video_stream_encoder)
         : video_stream_encoder_(video_stream_encoder) {}
     // FrameCadenceAdapterInterface::Callback overrides.
-    void OnFrame(const VideoFrame& frame) override {
-      video_stream_encoder_.OnFrame(frame);
+    void OnFrame(Timestamp post_time,
+                 int frames_scheduled_for_processing,
+                 const VideoFrame& frame) override {
+      video_stream_encoder_.OnFrame(post_time, frames_scheduled_for_processing,
+                                    frame);
     }
     void OnDiscardedFrame() override {
       video_stream_encoder_.OnDiscardedFrame();
@@ -192,7 +195,9 @@
 
   void ReconfigureEncoder() RTC_RUN_ON(&encoder_queue_);
   void OnEncoderSettingsChanged() RTC_RUN_ON(&encoder_queue_);
-  void OnFrame(const VideoFrame& video_frame);
+  void OnFrame(Timestamp post_time,
+               int frames_scheduled_for_processing,
+               const VideoFrame& video_frame);
   void OnDiscardedFrame();
 
   void MaybeEncodeVideoFrame(const VideoFrame& frame,
@@ -260,7 +265,8 @@
   CadenceCallback cadence_callback_;
   // Frame cadence encoder adapter. Frames enter this adapter first, and it then
   // forwards them to our OnFrame method.
-  const std::unique_ptr<FrameCadenceAdapterInterface> frame_cadence_adapter_;
+  std::unique_ptr<FrameCadenceAdapterInterface> frame_cadence_adapter_
+      RTC_GUARDED_BY(&encoder_queue_) RTC_PT_GUARDED_BY(&encoder_queue_);
 
   VideoEncoderConfig encoder_config_ RTC_GUARDED_BY(&encoder_queue_);
   std::unique_ptr<VideoEncoder> encoder_ RTC_GUARDED_BY(&encoder_queue_)
@@ -296,13 +302,12 @@
   bool encoder_failed_ RTC_GUARDED_BY(&encoder_queue_);
   Clock* const clock_;
 
-  std::atomic<int> posted_frames_waiting_for_encode_;
   // Used to make sure incoming time stamp is increasing for every frame.
-  int64_t last_captured_timestamp_ RTC_GUARDED_BY(worker_queue_);
+  int64_t last_captured_timestamp_ RTC_GUARDED_BY(&encoder_queue_);
   // Delta used for translating between NTP and internal timestamps.
-  const int64_t delta_ntp_internal_ms_ RTC_GUARDED_BY(worker_queue_);
+  const int64_t delta_ntp_internal_ms_ RTC_GUARDED_BY(&encoder_queue_);
 
-  int64_t last_frame_log_ms_ RTC_GUARDED_BY(worker_queue_);
+  int64_t last_frame_log_ms_ RTC_GUARDED_BY(&encoder_queue_);
   int captured_frame_count_ RTC_GUARDED_BY(&encoder_queue_);
   int dropped_frame_cwnd_pushback_count_ RTC_GUARDED_BY(&encoder_queue_);
   int dropped_frame_encoder_block_count_ RTC_GUARDED_BY(&encoder_queue_);
diff --git a/video/video_stream_encoder_unittest.cc b/video/video_stream_encoder_unittest.cc
index ba29d0b..32d4f94 100644
--- a/video/video_stream_encoder_unittest.cc
+++ b/video/video_stream_encoder_unittest.cc
@@ -17,6 +17,7 @@
 #include <utility>
 
 #include "absl/memory/memory.h"
+#include "api/rtp_parameters.h"
 #include "api/task_queue/default_task_queue_factory.h"
 #include "api/task_queue/task_queue_factory.h"
 #include "api/test/mock_fec_controller_override.h"
@@ -346,24 +347,25 @@
 
 class VideoStreamEncoderUnderTest : public VideoStreamEncoder {
  public:
-  VideoStreamEncoderUnderTest(TimeController* time_controller,
-                              TaskQueueFactory* task_queue_factory,
-                              SendStatisticsProxy* stats_proxy,
-                              const VideoStreamEncoderSettings& settings,
-                              VideoStreamEncoder::BitrateAllocationCallbackType
-                                  allocation_callback_type)
-      : VideoStreamEncoder(
-            time_controller->GetClock(),
-            1 /* number_of_cores */,
-            stats_proxy,
-            settings,
-            std::unique_ptr<OveruseFrameDetector>(
-                overuse_detector_proxy_ =
-                    new CpuOveruseDetectorProxy(stats_proxy)),
-            FrameCadenceAdapterInterface::Create(TaskQueueBase::Current()),
-            task_queue_factory,
-            TaskQueueBase::Current(),
-            allocation_callback_type),
+  VideoStreamEncoderUnderTest(
+      TimeController* time_controller,
+      std::unique_ptr<FrameCadenceAdapterInterface> cadence_adapter,
+      std::unique_ptr<webrtc::TaskQueueBase, webrtc::TaskQueueDeleter>
+          encoder_queue,
+      SendStatisticsProxy* stats_proxy,
+      const VideoStreamEncoderSettings& settings,
+      VideoStreamEncoder::BitrateAllocationCallbackType
+          allocation_callback_type)
+      : VideoStreamEncoder(time_controller->GetClock(),
+                           1 /* number_of_cores */,
+                           stats_proxy,
+                           settings,
+                           std::unique_ptr<OveruseFrameDetector>(
+                               overuse_detector_proxy_ =
+                                   new CpuOveruseDetectorProxy(stats_proxy)),
+                           std::move(cadence_adapter),
+                           std::move(encoder_queue),
+                           allocation_callback_type),
         time_controller_(time_controller),
         fake_cpu_resource_(FakeResource::Create("FakeResource[CPU]")),
         fake_quality_resource_(FakeResource::Create("FakeResource[QP]")),
@@ -659,14 +661,17 @@
         /*number_of_cores=*/1,
         /*stats_proxy=*/stats_proxy_.get(), encoder_settings_,
         std::make_unique<CpuOveruseDetectorProxy>(/*stats_proxy=*/nullptr),
-        std::move(zero_hertz_adapter), task_queue_factory_.get(),
-        TaskQueueBase::Current(),
+        std::move(zero_hertz_adapter),
+        time_controller_.GetTaskQueueFactory()->CreateTaskQueue(
+            "EncoderQueue", TaskQueueFactory::Priority::NORMAL),
         VideoStreamEncoder::BitrateAllocationCallbackType::
             kVideoBitrateAllocation);
     result->SetSink(&sink_, /*rotation_applied=*/false);
     return result;
   }
 
+  void DepleteTaskQueues() { time_controller_.AdvanceTime(TimeDelta::Zero()); }
+
  private:
   class NullEncoderSink : public VideoStreamEncoderInterface::EncoderSink {
    public:
@@ -763,9 +768,17 @@
                   kVideoBitrateAllocationWhenScreenSharing) {
     if (video_stream_encoder_)
       video_stream_encoder_->Stop();
-    video_stream_encoder_.reset(new VideoStreamEncoderUnderTest(
-        &time_controller_, GetTaskQueueFactory(), stats_proxy_.get(),
-        video_send_config_.encoder_settings, allocation_callback_type));
+
+    auto encoder_queue = GetTaskQueueFactory()->CreateTaskQueue(
+        "EncoderQueue", TaskQueueFactory::Priority::NORMAL);
+    TaskQueueBase* encoder_queue_ptr = encoder_queue.get();
+    std::unique_ptr<FrameCadenceAdapterInterface> cadence_adapter =
+        FrameCadenceAdapterInterface::Create(time_controller_.GetClock(),
+                                             encoder_queue_ptr);
+    video_stream_encoder_ = std::make_unique<VideoStreamEncoderUnderTest>(
+        &time_controller_, std::move(cadence_adapter), std::move(encoder_queue),
+        stats_proxy_.get(), video_send_config_.encoder_settings,
+        allocation_callback_type);
     video_stream_encoder_->SetSink(&sink_, /*rotation_applied=*/false);
     video_stream_encoder_->SetSource(
         &video_source_, webrtc::DegradationPreference::MAINTAIN_FRAMERATE);
@@ -930,11 +943,6 @@
       RTC_DCHECK(time_controller_);
     }
 
-    void BlockNextEncode() {
-      MutexLock lock(&local_mutex_);
-      block_next_encode_ = true;
-    }
-
     VideoEncoder::EncoderInfo GetEncoderInfo() const override {
       MutexLock lock(&local_mutex_);
       EncoderInfo info = FakeEncoder::GetEncoderInfo();
@@ -1108,7 +1116,6 @@
    private:
     int32_t Encode(const VideoFrame& input_image,
                    const std::vector<VideoFrameType>* frame_types) override {
-      bool block_encode;
       {
         MutexLock lock(&local_mutex_);
         if (expect_null_frame_) {
@@ -1126,16 +1133,11 @@
         ntp_time_ms_ = input_image.ntp_time_ms();
         last_input_width_ = input_image.width();
         last_input_height_ = input_image.height();
-        block_encode = block_next_encode_;
-        block_next_encode_ = false;
         last_update_rect_ = input_image.update_rect();
         last_frame_types_ = *frame_types;
         last_input_pixel_format_ = input_image.video_frame_buffer()->type();
       }
       int32_t result = FakeEncoder::Encode(input_image, frame_types);
-      if (block_encode)
-        EXPECT_TRUE(continue_encode_event_.Wait(kDefaultTimeoutMs));
-
       return result;
     }
 
@@ -1212,7 +1214,6 @@
       kInitializationFailed,
       kInitialized
     } initialized_ RTC_GUARDED_BY(local_mutex_) = EncoderState::kUninitialized;
-    bool block_next_encode_ RTC_GUARDED_BY(local_mutex_) = false;
     rtc::Event continue_encode_event_;
     uint32_t timestamp_ RTC_GUARDED_BY(local_mutex_) = 0;
     int64_t ntp_time_ms_ RTC_GUARDED_BY(local_mutex_) = 0;
@@ -1592,20 +1593,10 @@
   EXPECT_TRUE(frame_destroyed_event.Wait(kDefaultTimeoutMs));
 }
 
-class VideoStreamEncoderBlockedTest : public VideoStreamEncoderTest {
- public:
-  VideoStreamEncoderBlockedTest() {}
-
-  TaskQueueFactory* GetTaskQueueFactory() override {
-    return task_queue_factory_.get();
-  }
-
- private:
-  std::unique_ptr<TaskQueueFactory> task_queue_factory_ =
-      CreateDefaultTaskQueueFactory();
-};
-
-TEST_F(VideoStreamEncoderBlockedTest, DropsPendingFramesOnSlowEncode) {
+TEST_F(VideoStreamEncoderTest, DropsPendingFramesOnSlowEncode) {
+  test::FrameForwarder source;
+  video_stream_encoder_->SetSource(&source,
+                                   DegradationPreference::MAINTAIN_FRAMERATE);
   video_stream_encoder_->OnBitrateUpdatedAndWaitForManagedResources(
       kTargetBitrate, kTargetBitrate, kTargetBitrate, 0, 0, 0);
 
@@ -1615,18 +1606,10 @@
         ++dropped_count;
       });
 
-  fake_encoder_.BlockNextEncode();
-  video_source_.IncomingCapturedFrame(CreateFrame(1, nullptr));
-  WaitForEncodedFrame(1);
-  // Here, the encoder thread will be blocked in the TestEncoder waiting for a
-  // call to ContinueEncode.
-  video_source_.IncomingCapturedFrame(CreateFrame(2, nullptr));
-  video_source_.IncomingCapturedFrame(CreateFrame(3, nullptr));
-  fake_encoder_.ContinueEncode();
-  WaitForEncodedFrame(3);
-
+  source.IncomingCapturedFrame(CreateFrame(1, nullptr));
+  source.IncomingCapturedFrame(CreateFrame(2, nullptr));
+  WaitForEncodedFrame(2);
   video_stream_encoder_->Stop();
-
   EXPECT_EQ(1, dropped_count);
 }
 
@@ -7125,14 +7108,15 @@
   video_stream_encoder_->Stop();
 }
 
-TEST_F(VideoStreamEncoderBlockedTest, AccumulatesUpdateRectOnDroppedFrames) {
+TEST_F(VideoStreamEncoderTest, AccumulatesUpdateRectOnDroppedFrames) {
   VideoFrame::UpdateRect rect;
+  test::FrameForwarder source;
+  video_stream_encoder_->SetSource(&source,
+                                   DegradationPreference::MAINTAIN_FRAMERATE);
   video_stream_encoder_->OnBitrateUpdatedAndWaitForManagedResources(
       kTargetBitrate, kTargetBitrate, kTargetBitrate, 0, 0, 0);
 
-  fake_encoder_.BlockNextEncode();
-  video_source_.IncomingCapturedFrame(
-      CreateFrameWithUpdatedPixel(1, nullptr, 0));
+  source.IncomingCapturedFrame(CreateFrameWithUpdatedPixel(1, nullptr, 0));
   WaitForEncodedFrame(1);
   // On the very first frame full update should be forced.
   rect = fake_encoder_.GetLastUpdateRect();
@@ -7140,15 +7124,10 @@
   EXPECT_EQ(rect.offset_y, 0);
   EXPECT_EQ(rect.height, codec_height_);
   EXPECT_EQ(rect.width, codec_width_);
-  // Here, the encoder thread will be blocked in the TestEncoder waiting for a
-  // call to ContinueEncode.
-  video_source_.IncomingCapturedFrame(
-      CreateFrameWithUpdatedPixel(2, nullptr, 1));
-  ExpectDroppedFrame();
-  video_source_.IncomingCapturedFrame(
-      CreateFrameWithUpdatedPixel(3, nullptr, 10));
-  ExpectDroppedFrame();
-  fake_encoder_.ContinueEncode();
+  // Frame with NTP timestamp 2 will be dropped due to outstanding frames
+  // scheduled for processing during encoder queue processing of frame 2.
+  source.IncomingCapturedFrame(CreateFrameWithUpdatedPixel(2, nullptr, 1));
+  source.IncomingCapturedFrame(CreateFrameWithUpdatedPixel(3, nullptr, 10));
   WaitForEncodedFrame(3);
   // Updates to pixels 1 and 10 should be accumulated to one 10x1 rect.
   rect = fake_encoder_.GetLastUpdateRect();
@@ -7157,8 +7136,7 @@
   EXPECT_EQ(rect.width, 10);
   EXPECT_EQ(rect.height, 1);
 
-  video_source_.IncomingCapturedFrame(
-      CreateFrameWithUpdatedPixel(4, nullptr, 0));
+  source.IncomingCapturedFrame(CreateFrameWithUpdatedPixel(4, nullptr, 0));
   WaitForEncodedFrame(4);
   // Previous frame was encoded, so no accumulation should happen.
   rect = fake_encoder_.GetLastUpdateRect();
@@ -8737,12 +8715,14 @@
   VideoEncoderConfig config;
   config.content_type = VideoEncoderConfig::ContentType::kScreen;
   video_stream_encoder->ConfigureEncoder(std::move(config), 0);
+  factory.DepleteTaskQueues();
   Mock::VerifyAndClearExpectations(adapter_ptr);
 
   EXPECT_CALL(*adapter_ptr, SetZeroHertzModeEnabled(false));
   VideoEncoderConfig config2;
   config2.content_type = VideoEncoderConfig::ContentType::kRealtimeVideo;
   video_stream_encoder->ConfigureEncoder(std::move(config2), 0);
+  factory.DepleteTaskQueues();
 }
 
 TEST(VideoStreamEncoderFrameCadenceTest,