Introduce Sync-Decoding based on Metronome

Adds new class DecodeSynchronizer that will coalesce the decoding
of received streams on the metronome. This feature is experimental and
is backed by a field trial WebRTC-FrameBuffer3.

This experiment now has 3 arms to it,

"WebRTC-FrameBuffer3/arm:FrameBuffer2/": Default, uses old frame buffer.
"WebRTC-FrameBuffer3/arm:FrameBuffer3/": Uses new frame buffer.
"WebRTC-FrameBuffer3/arm:SyncDecoding/": Uses new frame buffer with
frame scheduled on the metronome.

The SyncDecoding arm will not work until it is wired up in the follow-up
CL.

This change also makes the following modifications,
* Adds FakeMetronome utilities for tests using a metronome.
* Makes FrameDecodeScheduler an interface. The default implementation is
TaskQueueFrameDecodeScheduler.
* FrameDecodeScheduler now has a Stop() method, which must be called
before destruction.


TBR=philipel@webrtc.org

Change-Id: I58a306bb883604b0be3eb2a04b3d07dbdf185c71
Bug: webrtc:13658
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/250665
Reviewed-by: Henrik Boström <hbos@webrtc.org>
Reviewed-by: Ilya Nikolaevskiy <ilnik@webrtc.org>
Reviewed-by: Stefan Holmer <holmer@google.com>
Reviewed-by: Stefan Holmer <stefan@webrtc.org>
Commit-Queue: Evan Shrubsole <eshr@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#35988}
diff --git a/api/DEPS b/api/DEPS
index ce88d34..ff493bf 100644
--- a/api/DEPS
+++ b/api/DEPS
@@ -255,6 +255,13 @@
     "+rtc_base/ref_counted_object.h",
   ],
 
+  "fake_metronome\.h": [
+    "+rtc_base/synchronization/mutex.h",
+    "+rtc_base/task_queue.h",
+    "+rtc_base/task_utils/repeating_task.h",
+    "+rtc_base/thread_annotations.h",
+  ],
+
   "mock.*\.h": [
     "+test/gmock.h",
   ],
diff --git a/api/metronome/test/BUILD.gn b/api/metronome/test/BUILD.gn
new file mode 100644
index 0000000..d25d5a8
--- /dev/null
+++ b/api/metronome/test/BUILD.gn
@@ -0,0 +1,30 @@
+# Copyright (c) 2022 The WebRTC project authors. All Rights Reserved.
+#
+# Use of this source code is governed by a BSD-style license
+# that can be found in the LICENSE file in the root of the source
+# tree. An additional intellectual property rights grant can be found
+# in the file PATENTS.  All contributing project authors may
+# be found in the AUTHORS file in the root of the source tree.
+
+import("../../../webrtc.gni")
+
+rtc_library("fake_metronome") {
+  testonly = true
+  sources = [
+    "fake_metronome.cc",
+    "fake_metronome.h",
+  ]
+  deps = [
+    "..:metronome",
+    "../..:priority",
+    "../..:sequence_checker",
+    "../../../rtc_base:macromagic",
+    "../../../rtc_base:rtc_event",
+    "../../../rtc_base:rtc_task_queue",
+    "../../../rtc_base/synchronization:mutex",
+    "../../../rtc_base/task_utils:repeating_task",
+    "../../../rtc_base/task_utils:to_queued_task",
+    "../../task_queue",
+    "../../units:time_delta",
+  ]
+}
diff --git a/api/metronome/test/fake_metronome.cc b/api/metronome/test/fake_metronome.cc
new file mode 100644
index 0000000..83b5ea7
--- /dev/null
+++ b/api/metronome/test/fake_metronome.cc
@@ -0,0 +1,93 @@
+/*
+ *  Copyright (c) 2022 The WebRTC project authors. All Rights Reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "api/metronome/test/fake_metronome.h"
+
+#include "api/priority.h"
+#include "api/sequence_checker.h"
+#include "api/task_queue/task_queue_factory.h"
+#include "api/units/time_delta.h"
+#include "rtc_base/event.h"
+#include "rtc_base/task_utils/repeating_task.h"
+#include "rtc_base/task_utils/to_queued_task.h"
+
+namespace webrtc::test {
+
+ForcedTickMetronome::ForcedTickMetronome(TimeDelta tick_period)
+    : tick_period_(tick_period) {}
+
+void ForcedTickMetronome::AddListener(TickListener* listener) {
+  listeners_.insert(listener);
+}
+
+void ForcedTickMetronome::RemoveListener(TickListener* listener) {
+  listeners_.erase(listener);
+}
+
+TimeDelta ForcedTickMetronome::TickPeriod() const {
+  return tick_period_;
+}
+
+size_t ForcedTickMetronome::NumListeners() {
+  return listeners_.size();
+}
+
+void ForcedTickMetronome::Tick() {
+  for (auto* listener : listeners_) {
+    listener->OnTickTaskQueue()->PostTask(
+        ToQueuedTask([listener] { listener->OnTick(); }));
+  }
+}
+
+FakeMetronome::FakeMetronome(TaskQueueFactory* factory, TimeDelta tick_period)
+    : tick_period_(tick_period),
+      queue_(factory->CreateTaskQueue("MetronomeQueue",
+                                      TaskQueueFactory::Priority::HIGH)) {}
+
+FakeMetronome::~FakeMetronome() {
+  RTC_DCHECK(listeners_.empty());
+}
+
+void FakeMetronome::AddListener(TickListener* listener) {
+  MutexLock lock(&mutex_);
+  listeners_.insert(listener);
+  if (!started_) {
+    tick_task_ = RepeatingTaskHandle::Start(queue_.Get(), [this] {
+      MutexLock lock(&mutex_);
+      // Stop if empty.
+      if (listeners_.empty())
+        return TimeDelta::PlusInfinity();
+      for (auto* listener : listeners_) {
+        listener->OnTickTaskQueue()->PostTask(
+            ToQueuedTask([listener] { listener->OnTick(); }));
+      }
+      return tick_period_;
+    });
+    started_ = true;
+  }
+}
+
+void FakeMetronome::RemoveListener(TickListener* listener) {
+  MutexLock lock(&mutex_);
+  listeners_.erase(listener);
+}
+
+void FakeMetronome::Stop() {
+  MutexLock lock(&mutex_);
+  RTC_DCHECK(listeners_.empty());
+  if (started_)
+    queue_.PostTask([this] { tick_task_.Stop(); });
+}
+
+TimeDelta FakeMetronome::TickPeriod() const {
+  return tick_period_;
+}
+
+}  // namespace webrtc::test
diff --git a/api/metronome/test/fake_metronome.h b/api/metronome/test/fake_metronome.h
new file mode 100644
index 0000000..28a79e0
--- /dev/null
+++ b/api/metronome/test/fake_metronome.h
@@ -0,0 +1,77 @@
+/*
+ *  Copyright (c) 2022 The WebRTC project authors. All Rights Reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+
+#ifndef API_METRONOME_TEST_FAKE_METRONOME_H_
+#define API_METRONOME_TEST_FAKE_METRONOME_H_
+
+#include <memory>
+#include <set>
+
+#include "api/metronome/metronome.h"
+#include "api/task_queue/task_queue_base.h"
+#include "api/task_queue/task_queue_factory.h"
+#include "api/units/time_delta.h"
+#include "rtc_base/synchronization/mutex.h"
+#include "rtc_base/task_queue.h"
+#include "rtc_base/task_utils/repeating_task.h"
+#include "rtc_base/thread_annotations.h"
+
+namespace webrtc::test {
+
+// ForcedTickMetronome is a Metronome that ticks when `Tick()` is invoked.
+// The constructor argument `tick_period` returned in `TickPeriod()`.
+class ForcedTickMetronome : public Metronome {
+ public:
+  explicit ForcedTickMetronome(TimeDelta tick_period);
+
+  // Forces all TickListeners to run `OnTick`.
+  void Tick();
+  size_t NumListeners();
+
+  // Metronome implementation.
+  void AddListener(TickListener* listener) override;
+  void RemoveListener(TickListener* listener) override;
+  TimeDelta TickPeriod() const override;
+
+ private:
+  const TimeDelta tick_period_;
+  std::set<TickListener*> listeners_;
+};
+
+// FakeMetronome is a metronome that ticks based on a repeating task at the
+// `tick_period` provided in the constructor. It is designed for use with
+// simulated task queues for unit tests.
+//
+// `Stop()` must be called before destruction, as it cancels the metronome tick
+// on the proper task queue.
+class FakeMetronome : public Metronome {
+ public:
+  FakeMetronome(TaskQueueFactory* factory, TimeDelta tick_period);
+  ~FakeMetronome() override;
+
+  // Metronome implementation.
+  void AddListener(TickListener* listener) override;
+  void RemoveListener(TickListener* listener) override;
+  TimeDelta TickPeriod() const override;
+
+  void Stop();
+
+ private:
+  const TimeDelta tick_period_;
+  RepeatingTaskHandle tick_task_;
+  bool started_ RTC_GUARDED_BY(mutex_) = false;
+  std::set<TickListener*> listeners_ RTC_GUARDED_BY(mutex_);
+  Mutex mutex_;
+  rtc::TaskQueue queue_;
+};
+
+}  // namespace webrtc::test
+
+#endif  // API_METRONOME_TEST_FAKE_METRONOME_H_
diff --git a/video/BUILD.gn b/video/BUILD.gn
index b89e5e4..1a247f4 100644
--- a/video/BUILD.gn
+++ b/video/BUILD.gn
@@ -301,10 +301,12 @@
     "frame_buffer_proxy.h",
   ]
   deps = [
-    ":frame_decode_scheduler",
+    ":decode_synchronizer",
     ":frame_decode_timing",
+    ":task_queue_frame_decode_scheduler",
     ":video_receive_stream_timeout_tracker",
     "../api:sequence_checker",
+    "../api/metronome",
     "../api/task_queue",
     "../api/video:encoded_frame",
     "../modules/video_coding",
@@ -312,6 +314,7 @@
     "../modules/video_coding:frame_helpers",
     "../modules/video_coding:timing",
     "../modules/video_coding:video_codec_interface",
+    "../rtc_base:checks",
     "../rtc_base:logging",
     "../rtc_base:macromagic",
     "../rtc_base:rtc_task_queue",
@@ -324,16 +327,27 @@
   ]
 }
 
-rtc_library("frame_decode_scheduler") {
+rtc_source_set("frame_decode_scheduler") {
+  sources = [ "frame_decode_scheduler.h" ]
+  deps = [
+    ":frame_decode_timing",
+    "../api/units:timestamp",
+  ]
+  absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ]
+}
+
+rtc_library("task_queue_frame_decode_scheduler") {
   sources = [
-    "frame_decode_scheduler.cc",
-    "frame_decode_scheduler.h",
+    "task_queue_frame_decode_scheduler.cc",
+    "task_queue_frame_decode_scheduler.h",
   ]
   deps = [
+    ":frame_decode_scheduler",
     ":frame_decode_timing",
     "../api:sequence_checker",
     "../api/task_queue",
     "../api/units:timestamp",
+    "../rtc_base:checks",
     "../rtc_base/task_utils:pending_task_safety_flag",
     "../rtc_base/task_utils:to_queued_task",
     "../system_wrappers",
@@ -369,6 +383,26 @@
   ]
 }
 
+rtc_library("decode_synchronizer") {
+  sources = [
+    "decode_synchronizer.cc",
+    "decode_synchronizer.h",
+  ]
+  deps = [
+    ":frame_decode_scheduler",
+    ":frame_decode_timing",
+    "../api:sequence_checker",
+    "../api/metronome",
+    "../api/task_queue",
+    "../api/units:time_delta",
+    "../api/units:timestamp",
+    "../rtc_base:checks",
+    "../rtc_base:logging",
+    "../rtc_base:macromagic",
+  ]
+  absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ]
+}
+
 rtc_library("video_stream_encoder_impl") {
   visibility = [ "*" ]
 
@@ -702,6 +736,7 @@
       "buffered_frame_decryptor_unittest.cc",
       "call_stats2_unittest.cc",
       "cpu_scaling_tests.cc",
+      "decode_synchronizer_unittest.cc",
       "encoder_bitrate_adjuster_unittest.cc",
       "encoder_overshoot_detector_unittest.cc",
       "encoder_rtcp_feedback_unittest.cc",
@@ -726,7 +761,6 @@
       "end_to_end_tests/transport_feedback_tests.cc",
       "frame_buffer_proxy_unittest.cc",
       "frame_cadence_adapter_unittest.cc",
-      "frame_decode_scheduler_unittest.cc",
       "frame_decode_timing_unittest.cc",
       "frame_encode_metadata_writer_unittest.cc",
       "picture_id_tests.cc",
@@ -741,6 +775,7 @@
       "send_statistics_proxy_unittest.cc",
       "stats_counter_unittest.cc",
       "stream_synchronization_unittest.cc",
+      "task_queue_frame_decode_scheduler_unittest.cc",
       "video_receive_stream2_unittest.cc",
       "video_receive_stream_timeout_tracker_unittest.cc",
       "video_send_stream_impl_unittest.cc",
@@ -750,10 +785,12 @@
       "video_stream_encoder_unittest.cc",
     ]
     deps = [
+      ":decode_synchronizer",
       ":frame_buffer_proxy",
       ":frame_cadence_adapter",
       ":frame_decode_scheduler",
       ":frame_decode_timing",
+      ":task_queue_frame_decode_scheduler",
       ":video",
       ":video_mocks",
       ":video_receive_stream_timeout_tracker",
@@ -777,6 +814,7 @@
       "../api:transport_api",
       "../api/adaptation:resource_adaptation_api",
       "../api/crypto:options",
+      "../api/metronome/test:fake_metronome",
       "../api/rtc_event_log",
       "../api/task_queue",
       "../api/task_queue:default_task_queue_factory",
@@ -870,6 +908,7 @@
     ]
     absl_deps = [
       "//third_party/abseil-cpp/absl/algorithm:container",
+      "//third_party/abseil-cpp/absl/functional:bind_front",
       "//third_party/abseil-cpp/absl/memory",
       "//third_party/abseil-cpp/absl/types:optional",
       "//third_party/abseil-cpp/absl/types:variant",
diff --git a/video/decode_synchronizer.cc b/video/decode_synchronizer.cc
new file mode 100644
index 0000000..9f22c49
--- /dev/null
+++ b/video/decode_synchronizer.cc
@@ -0,0 +1,186 @@
+/*
+ *  Copyright (c) 2022 The WebRTC project authors. All Rights Reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "video/decode_synchronizer.h"
+
+#include <iterator>
+#include <memory>
+#include <utility>
+#include <vector>
+
+#include "api/sequence_checker.h"
+#include "api/units/time_delta.h"
+#include "api/units/timestamp.h"
+#include "rtc_base/checks.h"
+#include "rtc_base/logging.h"
+#include "video/frame_decode_scheduler.h"
+#include "video/frame_decode_timing.h"
+
+namespace webrtc {
+
+DecodeSynchronizer::ScheduledFrame::ScheduledFrame(
+    uint32_t rtp_timestamp,
+    FrameDecodeTiming::FrameSchedule schedule,
+    FrameDecodeScheduler::FrameReleaseCallback callback)
+    : rtp_timestamp_(rtp_timestamp),
+      schedule_(std::move(schedule)),
+      callback_(std::move(callback)) {}
+
+void DecodeSynchronizer::ScheduledFrame::RunFrameReleaseCallback() && {
+  // Inspiration from Chromium base::OnceCallback. Move `*this` to a local
+  // before execution to ensure internal state is cleared after callback
+  // execution.
+  auto sf = std::move(*this);
+  sf.callback_(sf.rtp_timestamp_, sf.schedule_.render_time);
+}
+
+Timestamp DecodeSynchronizer::ScheduledFrame::LatestDecodeTime() const {
+  return schedule_.latest_decode_time;
+}
+
+DecodeSynchronizer::SynchronizedFrameDecodeScheduler::
+    SynchronizedFrameDecodeScheduler(DecodeSynchronizer* sync)
+    : sync_(sync) {
+  RTC_DCHECK(sync_);
+}
+
+DecodeSynchronizer::SynchronizedFrameDecodeScheduler::
+    ~SynchronizedFrameDecodeScheduler() {
+  RTC_DCHECK(!next_frame_);
+  RTC_DCHECK(stopped_);
+}
+
+absl::optional<uint32_t>
+DecodeSynchronizer::SynchronizedFrameDecodeScheduler::ScheduledRtpTimestamp() {
+  return next_frame_.has_value()
+             ? absl::make_optional(next_frame_->rtp_timestamp())
+             : absl::nullopt;
+}
+
+DecodeSynchronizer::ScheduledFrame
+DecodeSynchronizer::SynchronizedFrameDecodeScheduler::ReleaseNextFrame() {
+  RTC_DCHECK(next_frame_);
+  auto res = std::move(*next_frame_);
+  next_frame_.reset();
+  return res;
+}
+
+Timestamp
+DecodeSynchronizer::SynchronizedFrameDecodeScheduler::LatestDecodeTime() {
+  RTC_DCHECK(next_frame_);
+  return next_frame_->LatestDecodeTime();
+}
+
+void DecodeSynchronizer::SynchronizedFrameDecodeScheduler::ScheduleFrame(
+    uint32_t rtp,
+    FrameDecodeTiming::FrameSchedule schedule,
+    FrameReleaseCallback cb) {
+  RTC_DCHECK(!next_frame_) << "Can not schedule two frames at once.";
+  next_frame_ = ScheduledFrame(rtp, std::move(schedule), std::move(cb));
+  sync_->OnFrameScheduled(this);
+}
+
+void DecodeSynchronizer::SynchronizedFrameDecodeScheduler::CancelOutstanding() {
+  next_frame_.reset();
+}
+
+void DecodeSynchronizer::SynchronizedFrameDecodeScheduler::Stop() {
+  CancelOutstanding();
+  stopped_ = true;
+  sync_->RemoveFrameScheduler(this);
+}
+
+DecodeSynchronizer::DecodeSynchronizer(Clock* clock,
+                                       Metronome* metronome,
+                                       TaskQueueBase* worker_queue)
+    : clock_(clock), worker_queue_(worker_queue), metronome_(metronome) {
+  RTC_DCHECK(metronome_);
+  RTC_DCHECK(worker_queue_);
+}
+
+DecodeSynchronizer::~DecodeSynchronizer() {
+  RTC_DCHECK(schedulers_.empty());
+}
+
+std::unique_ptr<FrameDecodeScheduler>
+DecodeSynchronizer::CreateSynchronizedFrameScheduler() {
+  RTC_DCHECK_RUN_ON(worker_queue_);
+  auto scheduler = std::make_unique<SynchronizedFrameDecodeScheduler>(this);
+  auto [it, inserted] = schedulers_.emplace(scheduler.get());
+  // If this is the first `scheduler` added, start listening to the metronome.
+  if (inserted && schedulers_.size() == 1) {
+    RTC_DLOG(LS_VERBOSE) << "Listening to metronome";
+    metronome_->AddListener(this);
+  }
+
+  return std::move(scheduler);
+}
+
+void DecodeSynchronizer::OnFrameScheduled(
+    SynchronizedFrameDecodeScheduler* scheduler) {
+  RTC_DCHECK_RUN_ON(worker_queue_);
+  RTC_DCHECK(scheduler->ScheduledRtpTimestamp());
+
+  Timestamp now = clock_->CurrentTime();
+  Timestamp next_tick = expected_next_tick_;
+  // If no tick has registered yet assume it will occur in the tick period.
+  if (next_tick.IsInfinite()) {
+    next_tick = now + metronome_->TickPeriod();
+  }
+
+  // Release the frame right away if the decode time is too soon. Otherwise
+  // the stream may fall behind too much.
+  bool decode_before_next_tick =
+      scheduler->LatestDecodeTime() <
+      (next_tick - FrameDecodeTiming::kMaxAllowedFrameDelay);
+  // Decode immediately if the decode time is in the past.
+  bool decode_time_in_past = scheduler->LatestDecodeTime() < now;
+
+  if (decode_before_next_tick || decode_time_in_past) {
+    ScheduledFrame scheduled_frame = scheduler->ReleaseNextFrame();
+    std::move(scheduled_frame).RunFrameReleaseCallback();
+  }
+}
+
+void DecodeSynchronizer::RemoveFrameScheduler(
+    SynchronizedFrameDecodeScheduler* scheduler) {
+  RTC_DCHECK_RUN_ON(worker_queue_);
+  RTC_DCHECK(scheduler);
+  auto it = schedulers_.find(scheduler);
+  if (it == schedulers_.end()) {
+    return;
+  }
+  schedulers_.erase(it);
+  // If there are no more schedulers active, stop listening for metronome ticks.
+  if (schedulers_.empty()) {
+    RTC_DLOG(LS_VERBOSE) << "Not listening to metronome";
+    metronome_->RemoveListener(this);
+    expected_next_tick_ = Timestamp::PlusInfinity();
+  }
+}
+
+void DecodeSynchronizer::OnTick() {
+  RTC_DCHECK_RUN_ON(worker_queue_);
+  expected_next_tick_ = clock_->CurrentTime() + metronome_->TickPeriod();
+
+  for (auto* scheduler : schedulers_) {
+    if (scheduler->ScheduledRtpTimestamp() &&
+        scheduler->LatestDecodeTime() < expected_next_tick_) {
+      auto scheduled_frame = scheduler->ReleaseNextFrame();
+      std::move(scheduled_frame).RunFrameReleaseCallback();
+    }
+  }
+}
+
+TaskQueueBase* DecodeSynchronizer::OnTickTaskQueue() {
+  return worker_queue_;
+}
+
+}  // namespace webrtc
diff --git a/video/decode_synchronizer.h b/video/decode_synchronizer.h
new file mode 100644
index 0000000..bcbde4f
--- /dev/null
+++ b/video/decode_synchronizer.h
@@ -0,0 +1,137 @@
+/*
+ *  Copyright (c) 2022 The WebRTC project authors. All Rights Reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+
+#ifndef VIDEO_DECODE_SYNCHRONIZER_H_
+#define VIDEO_DECODE_SYNCHRONIZER_H_
+
+#include <stdint.h>
+
+#include <functional>
+#include <memory>
+#include <set>
+#include <utility>
+
+#include "absl/types/optional.h"
+#include "api/metronome/metronome.h"
+#include "api/sequence_checker.h"
+#include "api/task_queue/task_queue_base.h"
+#include "api/units/timestamp.h"
+#include "rtc_base/checks.h"
+#include "rtc_base/thread_annotations.h"
+#include "video/frame_decode_scheduler.h"
+#include "video/frame_decode_timing.h"
+
+namespace webrtc {
+
+// DecodeSynchronizer synchronizes the frame scheduling by coalescing decoding
+// on the metronome.
+//
+// A video receive stream can use the DecodeSynchronizer by receiving a
+// FrameDecodeScheduler instance with `CreateSynchronizedFrameScheduler()`.
+// This instance implements FrameDecodeScheduler and can be used as a normal
+// scheduler. This instance is owned by the receive stream, and is borrowed by
+// the DecodeSynchronizer. The DecodeSynchronizer will stop borrowing the
+// instance when `FrameDecodeScheduler::Stop()` is called, after which the
+// scheduler may be destroyed by the receive stream.
+//
+// When a frame is scheduled for decode by a receive stream using the
+// DecodeSynchronizer, it will instead be executed on the metronome during the
+// tick interval where `max_decode_time` occurs. For example, if a frame is
+// scheduled for decode in 50ms and the tick interval is 20ms, then the frame
+// will be released for decoding in 2 ticks. See below for illustation,
+//
+// In the case where the decode time is in the past, or must occur before the
+// next metronome tick then the frame will be released right away, allowing a
+// delayed stream to catch up quickly.
+//
+// DecodeSynchronizer is single threaded - all method calls must run on the
+// `worker_queue_`.
+class DecodeSynchronizer : private Metronome::TickListener {
+ public:
+  DecodeSynchronizer(Clock* clock,
+                     Metronome* metronome,
+                     TaskQueueBase* worker_queue);
+  ~DecodeSynchronizer() override;
+  DecodeSynchronizer(const DecodeSynchronizer&) = delete;
+  DecodeSynchronizer& operator=(const DecodeSynchronizer&) = delete;
+
+  std::unique_ptr<FrameDecodeScheduler> CreateSynchronizedFrameScheduler();
+
+ private:
+  class ScheduledFrame {
+   public:
+    ScheduledFrame(uint32_t rtp_timestamp,
+                   FrameDecodeTiming::FrameSchedule schedule,
+                   FrameDecodeScheduler::FrameReleaseCallback callback);
+
+    // Disallow copy since `callback` should only be moved.
+    ScheduledFrame(const ScheduledFrame&) = delete;
+    ScheduledFrame& operator=(const ScheduledFrame&) = delete;
+    ScheduledFrame(ScheduledFrame&&) = default;
+    ScheduledFrame& operator=(ScheduledFrame&&) = default;
+
+    // Executes `callback_`.
+    void RunFrameReleaseCallback() &&;
+
+    uint32_t rtp_timestamp() const { return rtp_timestamp_; }
+    Timestamp LatestDecodeTime() const;
+
+   private:
+    uint32_t rtp_timestamp_;
+    FrameDecodeTiming::FrameSchedule schedule_;
+    FrameDecodeScheduler::FrameReleaseCallback callback_;
+  };
+
+  class SynchronizedFrameDecodeScheduler : public FrameDecodeScheduler {
+   public:
+    explicit SynchronizedFrameDecodeScheduler(DecodeSynchronizer* sync);
+    ~SynchronizedFrameDecodeScheduler() override;
+
+    // Releases the outstanding frame for decoding. This invalidates
+    // `next_frame_`. There must be a frame scheduled.
+    ScheduledFrame ReleaseNextFrame();
+
+    // Returns `next_frame_.schedule.max_decode_time`. There must be a frame
+    // scheduled when this is called.
+    Timestamp LatestDecodeTime();
+
+    // FrameDecodeScheduler implementation.
+    absl::optional<uint32_t> ScheduledRtpTimestamp() override;
+    void ScheduleFrame(uint32_t rtp,
+                       FrameDecodeTiming::FrameSchedule schedule,
+                       FrameReleaseCallback cb) override;
+    void CancelOutstanding() override;
+    void Stop() override;
+
+   private:
+    DecodeSynchronizer* sync_;
+    absl::optional<ScheduledFrame> next_frame_;
+    bool stopped_ = false;
+  };
+
+  void OnFrameScheduled(SynchronizedFrameDecodeScheduler* scheduler);
+  void RemoveFrameScheduler(SynchronizedFrameDecodeScheduler* scheduler);
+
+  // Metronome::TickListener implementation.
+  void OnTick() override;
+  TaskQueueBase* OnTickTaskQueue() override;
+
+  Clock* const clock_;
+  TaskQueueBase* const worker_queue_;
+  Metronome* const metronome_;
+
+  Timestamp expected_next_tick_ = Timestamp::PlusInfinity();
+  std::set<SynchronizedFrameDecodeScheduler*> schedulers_
+      RTC_GUARDED_BY(worker_queue_);
+};
+
+}  // namespace webrtc
+
+#endif  // VIDEO_DECODE_SYNCHRONIZER_H_
diff --git a/video/decode_synchronizer_unittest.cc b/video/decode_synchronizer_unittest.cc
new file mode 100644
index 0000000..db9540f
--- /dev/null
+++ b/video/decode_synchronizer_unittest.cc
@@ -0,0 +1,232 @@
+/*
+ *  Copyright (c) 2022 The WebRTC project authors. All Rights Reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "video/decode_synchronizer.h"
+
+#include <stddef.h>
+
+#include <memory>
+#include <utility>
+
+#include "api/metronome/test/fake_metronome.h"
+#include "api/units/time_delta.h"
+#include "test/gmock.h"
+#include "test/gtest.h"
+#include "test/run_loop.h"
+#include "test/time_controller/simulated_time_controller.h"
+#include "video/frame_decode_scheduler.h"
+#include "video/frame_decode_timing.h"
+
+using ::testing::_;
+using ::testing::Eq;
+
+namespace webrtc {
+
+class DecodeSynchronizerTest : public ::testing::Test {
+ public:
+  static constexpr TimeDelta kTickPeriod = TimeDelta::Millis(33);
+
+  DecodeSynchronizerTest()
+      : time_controller_(Timestamp::Millis(1337)),
+        clock_(time_controller_.GetClock()),
+        metronome_(kTickPeriod),
+        decode_synchronizer_(clock_, &metronome_, run_loop_.task_queue()) {}
+
+ protected:
+  GlobalSimulatedTimeController time_controller_;
+  Clock* clock_;
+  test::RunLoop run_loop_;
+  test::ForcedTickMetronome metronome_;
+  DecodeSynchronizer decode_synchronizer_;
+};
+
+TEST_F(DecodeSynchronizerTest, AllFramesReadyBeforeNextTickDecoded) {
+  ::testing::MockFunction<void(uint32_t, Timestamp)> mock_callback1;
+  auto scheduler1 = decode_synchronizer_.CreateSynchronizedFrameScheduler();
+
+  testing::MockFunction<void(unsigned int, Timestamp)> mock_callback2;
+  auto scheduler2 = decode_synchronizer_.CreateSynchronizedFrameScheduler();
+
+  {
+    uint32_t frame_rtp = 90000;
+    FrameDecodeTiming::FrameSchedule frame_sched{
+        .latest_decode_time =
+            clock_->CurrentTime() + kTickPeriod - TimeDelta::Millis(3),
+        .render_time = clock_->CurrentTime() + TimeDelta::Millis(60)};
+    scheduler1->ScheduleFrame(frame_rtp, frame_sched,
+                              mock_callback1.AsStdFunction());
+    EXPECT_CALL(mock_callback1,
+                Call(Eq(frame_rtp), Eq(frame_sched.render_time)));
+  }
+  {
+    uint32_t frame_rtp = 123456;
+    FrameDecodeTiming::FrameSchedule frame_sched{
+        .latest_decode_time =
+            clock_->CurrentTime() + kTickPeriod - TimeDelta::Millis(2),
+        .render_time = clock_->CurrentTime() + TimeDelta::Millis(70)};
+    scheduler2->ScheduleFrame(frame_rtp, frame_sched,
+                              mock_callback2.AsStdFunction());
+    EXPECT_CALL(mock_callback2,
+                Call(Eq(frame_rtp), Eq(frame_sched.render_time)));
+  }
+  metronome_.Tick();
+  run_loop_.Flush();
+
+  // Cleanup
+  scheduler1->Stop();
+  scheduler2->Stop();
+}
+
+TEST_F(DecodeSynchronizerTest, FramesNotDecodedIfDecodeTimeIsInNextInterval) {
+  ::testing::MockFunction<void(unsigned int, Timestamp)> mock_callback;
+  auto scheduler = decode_synchronizer_.CreateSynchronizedFrameScheduler();
+
+  uint32_t frame_rtp = 90000;
+  FrameDecodeTiming::FrameSchedule frame_sched{
+      .latest_decode_time =
+          clock_->CurrentTime() + kTickPeriod + TimeDelta::Millis(10),
+      .render_time =
+          clock_->CurrentTime() + kTickPeriod + TimeDelta::Millis(30)};
+  scheduler->ScheduleFrame(frame_rtp, frame_sched,
+                           mock_callback.AsStdFunction());
+
+  metronome_.Tick();
+  run_loop_.Flush();
+  // No decodes should have happened in this tick.
+  ::testing::Mock::VerifyAndClearExpectations(&mock_callback);
+
+  // Decode should happen on next tick.
+  EXPECT_CALL(mock_callback, Call(Eq(frame_rtp), Eq(frame_sched.render_time)));
+  time_controller_.AdvanceTime(kTickPeriod);
+  metronome_.Tick();
+  run_loop_.Flush();
+
+  // Cleanup
+  scheduler->Stop();
+}
+
+TEST_F(DecodeSynchronizerTest, FrameDecodedOnce) {
+  ::testing::MockFunction<void(unsigned int, Timestamp)> mock_callback;
+  auto scheduler = decode_synchronizer_.CreateSynchronizedFrameScheduler();
+
+  uint32_t frame_rtp = 90000;
+  FrameDecodeTiming::FrameSchedule frame_sched{
+      .latest_decode_time = clock_->CurrentTime() + TimeDelta::Millis(30),
+      .render_time = clock_->CurrentTime() + TimeDelta::Millis(60)};
+  scheduler->ScheduleFrame(frame_rtp, frame_sched,
+                           mock_callback.AsStdFunction());
+  EXPECT_CALL(mock_callback, Call(_, _)).Times(1);
+  metronome_.Tick();
+  run_loop_.Flush();
+  ::testing::Mock::VerifyAndClearExpectations(&mock_callback);
+
+  // Trigger tick again. No frame should be decoded now.
+  time_controller_.AdvanceTime(kTickPeriod);
+  metronome_.Tick();
+  run_loop_.Flush();
+
+  // Cleanup
+  scheduler->Stop();
+}
+
+TEST_F(DecodeSynchronizerTest, FrameWithDecodeTimeInPastDecodedImmediately) {
+  ::testing::MockFunction<void(unsigned int, Timestamp)> mock_callback;
+  auto scheduler = decode_synchronizer_.CreateSynchronizedFrameScheduler();
+
+  uint32_t frame_rtp = 90000;
+  FrameDecodeTiming::FrameSchedule frame_sched{
+      .latest_decode_time = clock_->CurrentTime() - TimeDelta::Millis(5),
+      .render_time = clock_->CurrentTime() + TimeDelta::Millis(30)};
+  EXPECT_CALL(mock_callback, Call(Eq(90000u), _)).Times(1);
+  scheduler->ScheduleFrame(frame_rtp, frame_sched,
+                           mock_callback.AsStdFunction());
+  // Verify the callback was invoked already.
+  ::testing::Mock::VerifyAndClearExpectations(&mock_callback);
+
+  metronome_.Tick();
+  run_loop_.Flush();
+
+  // Cleanup
+  scheduler->Stop();
+}
+
+TEST_F(DecodeSynchronizerTest,
+       FrameWithDecodeTimeFarBeforeNextTickDecodedImmediately) {
+  ::testing::MockFunction<void(unsigned int, Timestamp)> mock_callback;
+  auto scheduler = decode_synchronizer_.CreateSynchronizedFrameScheduler();
+
+  // Frame which would be behind by more than kMaxAllowedFrameDelay after
+  // the next tick.
+  FrameDecodeTiming::FrameSchedule frame_sched{
+      .latest_decode_time = clock_->CurrentTime() + kTickPeriod -
+                            FrameDecodeTiming::kMaxAllowedFrameDelay -
+                            TimeDelta::Millis(1),
+      .render_time = clock_->CurrentTime() + TimeDelta::Millis(30)};
+  EXPECT_CALL(mock_callback, Call(Eq(90000u), _)).Times(1);
+  scheduler->ScheduleFrame(90000, frame_sched, mock_callback.AsStdFunction());
+  // Verify the callback was invoked already.
+  ::testing::Mock::VerifyAndClearExpectations(&mock_callback);
+
+  time_controller_.AdvanceTime(kTickPeriod);
+  metronome_.Tick();
+  run_loop_.Flush();
+
+  // A frame that would be behind by exactly kMaxAllowedFrameDelay after next
+  // tick should decode at the next tick.
+  FrameDecodeTiming::FrameSchedule queued_frame{
+      .latest_decode_time = clock_->CurrentTime() + kTickPeriod -
+                            FrameDecodeTiming::kMaxAllowedFrameDelay,
+      .render_time = clock_->CurrentTime() + TimeDelta::Millis(30)};
+  scheduler->ScheduleFrame(180000, queued_frame, mock_callback.AsStdFunction());
+  // Verify the callback was invoked already.
+  ::testing::Mock::VerifyAndClearExpectations(&mock_callback);
+
+  EXPECT_CALL(mock_callback, Call(Eq(180000u), _)).Times(1);
+  time_controller_.AdvanceTime(kTickPeriod);
+  metronome_.Tick();
+  run_loop_.Flush();
+
+  // Cleanup
+  scheduler->Stop();
+}
+
+TEST_F(DecodeSynchronizerTest, FramesNotReleasedAfterStop) {
+  ::testing::MockFunction<void(unsigned int, Timestamp)> mock_callback;
+  auto scheduler = decode_synchronizer_.CreateSynchronizedFrameScheduler();
+
+  uint32_t frame_rtp = 90000;
+  FrameDecodeTiming::FrameSchedule frame_sched{
+      .latest_decode_time = clock_->CurrentTime() + TimeDelta::Millis(30),
+      .render_time = clock_->CurrentTime() + TimeDelta::Millis(60)};
+  scheduler->ScheduleFrame(frame_rtp, frame_sched,
+                           mock_callback.AsStdFunction());
+  // Cleanup
+  scheduler->Stop();
+
+  // No callback should occur on this tick since Stop() was called before.
+  metronome_.Tick();
+  run_loop_.Flush();
+}
+
+TEST_F(DecodeSynchronizerTest, MetronomeNotListenedWhenNoStreamsAreActive) {
+  EXPECT_EQ(0u, metronome_.NumListeners());
+
+  auto scheduler = decode_synchronizer_.CreateSynchronizedFrameScheduler();
+  EXPECT_EQ(1u, metronome_.NumListeners());
+  auto scheduler2 = decode_synchronizer_.CreateSynchronizedFrameScheduler();
+  EXPECT_EQ(1u, metronome_.NumListeners());
+
+  scheduler->Stop();
+  EXPECT_EQ(1u, metronome_.NumListeners());
+  scheduler2->Stop();
+  EXPECT_EQ(0u, metronome_.NumListeners());
+}
+
+}  // namespace webrtc
diff --git a/video/frame_buffer_proxy.cc b/video/frame_buffer_proxy.cc
index 3182bf9..d294eeb 100644
--- a/video/frame_buffer_proxy.cc
+++ b/video/frame_buffer_proxy.cc
@@ -14,20 +14,24 @@
 #include <memory>
 #include <utility>
 
+#include "absl/base/attributes.h"
 #include "absl/functional/bind_front.h"
 #include "api/sequence_checker.h"
 #include "modules/video_coding/frame_buffer2.h"
 #include "modules/video_coding/frame_buffer3.h"
 #include "modules/video_coding/frame_helpers.h"
+#include "rtc_base/checks.h"
 #include "rtc_base/logging.h"
 #include "rtc_base/thread_annotations.h"
 #include "system_wrappers/include/field_trial.h"
-#include "video/frame_decode_scheduler.h"
 #include "video/frame_decode_timing.h"
+#include "video/task_queue_frame_decode_scheduler.h"
 #include "video/video_receive_stream_timeout_tracker.h"
 
 namespace webrtc {
 
+namespace {
+
 class FrameBuffer2Proxy : public FrameBufferProxy {
  public:
   FrameBuffer2Proxy(Clock* clock,
@@ -140,14 +144,16 @@
 // accounting are moved into this pro
 class FrameBuffer3Proxy : public FrameBufferProxy {
  public:
-  FrameBuffer3Proxy(Clock* clock,
-                    TaskQueueBase* worker_queue,
-                    VCMTiming* timing,
-                    VCMReceiveStatisticsCallback* stats_proxy,
-                    rtc::TaskQueue* decode_queue,
-                    FrameSchedulingReceiver* receiver,
-                    TimeDelta max_wait_for_keyframe,
-                    TimeDelta max_wait_for_frame)
+  FrameBuffer3Proxy(
+      Clock* clock,
+      TaskQueueBase* worker_queue,
+      VCMTiming* timing,
+      VCMReceiveStatisticsCallback* stats_proxy,
+      rtc::TaskQueue* decode_queue,
+      FrameSchedulingReceiver* receiver,
+      TimeDelta max_wait_for_keyframe,
+      TimeDelta max_wait_for_frame,
+      std::unique_ptr<FrameDecodeScheduler> frame_decode_scheduler)
       : max_wait_for_keyframe_(max_wait_for_keyframe),
         max_wait_for_frame_(max_wait_for_frame),
         clock_(clock),
@@ -156,15 +162,11 @@
         stats_proxy_(stats_proxy),
         receiver_(receiver),
         timing_(timing),
+        frame_decode_scheduler_(std::move(frame_decode_scheduler)),
         jitter_estimator_(clock_),
         inter_frame_delay_(clock_->TimeInMilliseconds()),
         buffer_(std::make_unique<FrameBuffer>(kMaxFramesBuffered,
                                               kMaxFramesHistory)),
-        frame_decode_scheduler_(
-            clock_,
-            worker_queue,
-            absl::bind_front(&FrameBuffer3Proxy::OnFrameReadyForExtraction,
-                             this)),
         decode_timing_(clock_, timing_),
         timeout_tracker_(clock_,
                          worker_queue_,
@@ -181,6 +183,7 @@
     RTC_DCHECK(timing_);
     RTC_DCHECK(worker_queue_);
     RTC_DCHECK(clock_);
+    RTC_DCHECK(frame_decode_scheduler_);
     RTC_LOG(LS_WARNING) << "Using FrameBuffer3";
 
     ParseFieldTrial({&zero_playout_delay_max_decode_queue_size_},
@@ -190,8 +193,8 @@
   // FrameBufferProxy implementation.
   void StopOnWorker() override {
     RTC_DCHECK_RUN_ON(&worker_sequence_checker_);
+    frame_decode_scheduler_->Stop();
     timeout_tracker_.Stop();
-    frame_decode_scheduler_.CancelOutstanding();
     decoder_ready_for_new_frame_ = false;
     decode_queue_->PostTask([this] {
       RTC_DCHECK_RUN_ON(decode_queue_);
@@ -209,7 +212,7 @@
     stats_proxy_->OnDroppedFrames(buffer_->CurrentSize());
     buffer_ =
         std::make_unique<FrameBuffer>(kMaxFramesBuffered, kMaxFramesHistory);
-    frame_decode_scheduler_.CancelOutstanding();
+    frame_decode_scheduler_->CancelOutstanding();
   }
 
   absl::optional<int64_t> InsertFrame(
@@ -342,8 +345,7 @@
   }
 
  private:
-  void OnFrameReadyForExtraction(uint32_t rtp_timestamp,
-                                 Timestamp render_time) {
+  void FrameReadyForDecode(uint32_t rtp_timestamp, Timestamp render_time) {
     RTC_DCHECK_RUN_ON(&worker_sequence_checker_);
     RTC_DCHECK(buffer_->NextDecodableTemporalUnitRtpTimestamp() ==
                rtp_timestamp)
@@ -430,7 +432,7 @@
     auto last_rtp = *buffer_->LastDecodableTemporalUnitRtpTimestamp();
 
     // If already scheduled then abort.
-    if (frame_decode_scheduler_.scheduled_rtp() ==
+    if (frame_decode_scheduler_->ScheduledRtpTimestamp() ==
         buffer_->NextDecodableTemporalUnitRtpTimestamp())
       return;
 
@@ -441,9 +443,11 @@
                                                      IsTooManyFramesQueued());
       if (schedule) {
         // Don't schedule if already waiting for the same frame.
-        if (frame_decode_scheduler_.scheduled_rtp() != next_rtp) {
-          frame_decode_scheduler_.CancelOutstanding();
-          frame_decode_scheduler_.ScheduleFrame(next_rtp, *schedule);
+        if (frame_decode_scheduler_->ScheduledRtpTimestamp() != next_rtp) {
+          frame_decode_scheduler_->CancelOutstanding();
+          frame_decode_scheduler_->ScheduleFrame(
+              next_rtp, *schedule,
+              absl::bind_front(&FrameBuffer3Proxy::FrameReadyForDecode, this));
         }
         return;
       }
@@ -463,6 +467,8 @@
   VCMReceiveStatisticsCallback* const stats_proxy_;
   FrameSchedulingReceiver* const receiver_;
   VCMTiming* const timing_;
+  const std::unique_ptr<FrameDecodeScheduler> frame_decode_scheduler_
+      RTC_GUARDED_BY(&worker_sequence_checker_);
 
   VCMJitterEstimator jitter_estimator_
       RTC_GUARDED_BY(&worker_sequence_checker_);
@@ -471,8 +477,6 @@
   bool keyframe_required_ RTC_GUARDED_BY(&worker_sequence_checker_) = false;
   std::unique_ptr<FrameBuffer> buffer_
       RTC_GUARDED_BY(&worker_sequence_checker_);
-  FrameDecodeScheduler frame_decode_scheduler_
-      RTC_GUARDED_BY(&worker_sequence_checker_);
   FrameDecodeTiming decode_timing_ RTC_GUARDED_BY(&worker_sequence_checker_);
   VideoReceiveStreamTimeoutTracker timeout_tracker_
       RTC_GUARDED_BY(&worker_sequence_checker_);
@@ -500,6 +504,28 @@
   ScopedTaskSafety worker_safety_;
 };
 
+enum class FrameBufferArm {
+  kFrameBuffer2,
+  kFrameBuffer3,
+  kSyncDecode,
+};
+
+constexpr const char* kFrameBufferFieldTrial = "WebRTC-FrameBuffer3";
+
+FrameBufferArm ParseFrameBufferFieldTrial() {
+  webrtc::FieldTrialEnum<FrameBufferArm> arm(
+      "arm", FrameBufferArm::kFrameBuffer2,
+      {
+          {"FrameBuffer2", FrameBufferArm::kFrameBuffer2},
+          {"FrameBuffer3", FrameBufferArm::kFrameBuffer3},
+          {"SyncDecoding", FrameBufferArm::kSyncDecode},
+      });
+  ParseFieldTrial({&arm}, field_trial::FindFullName(kFrameBufferFieldTrial));
+  return arm.Get();
+}
+
+}  // namespace
+
 std::unique_ptr<FrameBufferProxy> FrameBufferProxy::CreateFromFieldTrial(
     Clock* clock,
     TaskQueueBase* worker_queue,
@@ -508,14 +534,39 @@
     rtc::TaskQueue* decode_queue,
     FrameSchedulingReceiver* receiver,
     TimeDelta max_wait_for_keyframe,
-    TimeDelta max_wait_for_frame) {
-  if (field_trial::IsEnabled("WebRTC-FrameBuffer3"))
-    return std::make_unique<FrameBuffer3Proxy>(
-        clock, worker_queue, timing, stats_proxy, decode_queue, receiver,
-        max_wait_for_keyframe, max_wait_for_frame);
-  return std::make_unique<FrameBuffer2Proxy>(
-      clock, timing, stats_proxy, decode_queue, receiver, max_wait_for_keyframe,
-      max_wait_for_frame);
+    TimeDelta max_wait_for_frame,
+    DecodeSynchronizer* decode_sync) {
+  switch (ParseFrameBufferFieldTrial()) {
+    case FrameBufferArm::kFrameBuffer3: {
+      auto scheduler =
+          std::make_unique<TaskQueueFrameDecodeScheduler>(clock, worker_queue);
+      return std::make_unique<FrameBuffer3Proxy>(
+          clock, worker_queue, timing, stats_proxy, decode_queue, receiver,
+          max_wait_for_keyframe, max_wait_for_frame, std::move(scheduler));
+    }
+    case FrameBufferArm::kSyncDecode: {
+      std::unique_ptr<FrameDecodeScheduler> scheduler;
+      if (decode_sync) {
+        scheduler = decode_sync->CreateSynchronizedFrameScheduler();
+      } else {
+        RTC_LOG(LS_ERROR) << "In FrameBuffer with sync decode trial, but "
+                             "no DecodeSynchronizer was present!";
+        // Crash in debug, but in production use the task queue scheduler.
+        RTC_DCHECK_NOTREACHED();
+        scheduler = std::make_unique<TaskQueueFrameDecodeScheduler>(
+            clock, worker_queue);
+      }
+      return std::make_unique<FrameBuffer3Proxy>(
+          clock, worker_queue, timing, stats_proxy, decode_queue, receiver,
+          max_wait_for_keyframe, max_wait_for_frame, std::move(scheduler));
+    }
+    case FrameBufferArm::kFrameBuffer2:
+      ABSL_FALLTHROUGH_INTENDED;
+    default:
+      return std::make_unique<FrameBuffer2Proxy>(
+          clock, timing, stats_proxy, decode_queue, receiver,
+          max_wait_for_keyframe, max_wait_for_frame);
+  }
 }
 
 }  // namespace webrtc
diff --git a/video/frame_buffer_proxy.h b/video/frame_buffer_proxy.h
index cab0bd2..b419aed 100644
--- a/video/frame_buffer_proxy.h
+++ b/video/frame_buffer_proxy.h
@@ -13,12 +13,15 @@
 
 #include <memory>
 
+#include "api/metronome/metronome.h"
 #include "api/task_queue/task_queue_base.h"
 #include "api/video/encoded_frame.h"
 #include "modules/video_coding/include/video_coding_defines.h"
 #include "modules/video_coding/timing.h"
 #include "rtc_base/task_queue.h"
 #include "system_wrappers/include/clock.h"
+#include "video/decode_synchronizer.h"
+
 namespace webrtc {
 
 class FrameSchedulingReceiver {
@@ -43,7 +46,8 @@
       rtc::TaskQueue* decode_queue,
       FrameSchedulingReceiver* receiver,
       TimeDelta max_wait_for_keyframe,
-      TimeDelta max_wait_for_frame);
+      TimeDelta max_wait_for_frame,
+      DecodeSynchronizer* decode_sync);
   virtual ~FrameBufferProxy() = default;
 
   // Run on the worker thread.
diff --git a/video/frame_buffer_proxy_unittest.cc b/video/frame_buffer_proxy_unittest.cc
index 408fac9..fc266d2 100644
--- a/video/frame_buffer_proxy_unittest.cc
+++ b/video/frame_buffer_proxy_unittest.cc
@@ -20,6 +20,7 @@
 
 #include "absl/types/optional.h"
 #include "absl/types/variant.h"
+#include "api/metronome/test/fake_metronome.h"
 #include "api/units/frequency.h"
 #include "api/units/time_delta.h"
 #include "api/units/timestamp.h"
@@ -32,6 +33,7 @@
 #include "test/gtest.h"
 #include "test/run_loop.h"
 #include "test/time_controller/simulated_time_controller.h"
+#include "video/decode_synchronizer.h"
 
 using ::testing::_;
 using ::testing::AllOf;
@@ -209,6 +211,9 @@
         decode_queue_(time_controller_.GetTaskQueueFactory()->CreateTaskQueue(
             "decode_queue",
             TaskQueueFactory::Priority::NORMAL)),
+        fake_metronome_(time_controller_.GetTaskQueueFactory(),
+                        TimeDelta::Millis(16)),
+        decode_sync_(clock_, &fake_metronome_, run_loop_.task_queue()),
         timing_(clock_),
         proxy_(FrameBufferProxy::CreateFromFieldTrial(clock_,
                                                       run_loop_.task_queue(),
@@ -217,7 +222,8 @@
                                                       &decode_queue_,
                                                       this,
                                                       kMaxWaitForKeyframe,
-                                                      kMaxWaitForFrame)) {
+                                                      kMaxWaitForFrame,
+                                                      &decode_sync_)) {
     // Avoid starting with negative render times.
     timing_.set_min_playout_delay(10);
 
@@ -230,6 +236,8 @@
     if (proxy_) {
       proxy_->StopOnWorker();
     }
+    fake_metronome_.Stop();
+    time_controller_.AdvanceTime(TimeDelta::Zero());
   }
 
   void OnEncodedFrame(std::unique_ptr<EncodedFrame> frame) override {
@@ -291,7 +299,10 @@
   Clock* const clock_;
   test::RunLoop run_loop_;
   rtc::TaskQueue decode_queue_;
+  test::FakeMetronome fake_metronome_;
+  DecodeSynchronizer decode_sync_;
   VCMTiming timing_;
+
   ::testing::NiceMock<VCMReceiveStatisticsCallbackMock> stats_callback_;
   std::unique_ptr<FrameBufferProxy> proxy_;
 
@@ -544,7 +555,7 @@
   proxy_->InsertFrame(Builder().Id(0).Time(0).AsLast().Build());
   EXPECT_THAT(WaitForFrameOrTimeout(TimeDelta::Zero()), Frame(WithId(0)));
 
-  time_controller_.AdvanceTime(kFps30Delay);
+  time_controller_.AdvanceTime(kFps30Delay / 2);
   proxy_->InsertFrame(
       Builder().Id(1).Time(kFps30Rtp).Refs({0}).AsLast().Build());
   StartNextDecode();
@@ -598,6 +609,8 @@
   StartNextDecode();
 
   EXPECT_THAT(WaitForFrameOrTimeout(kFps30Delay), Frame(WithId(33)));
+  StartNextDecode();
+  EXPECT_THAT(WaitForFrameOrTimeout(kMaxWaitForFrame), TimedOut());
   EXPECT_EQ(dropped_frames(), 1);
 }
 
@@ -617,6 +630,10 @@
   time_controller_.AdvanceTime(TimeDelta::Zero());
 }
 
+// Note: This test takes a long time to run if the fake metronome is active.
+// Since the test needs to wait for the timestamp to rollover, it has a fake
+// delay of around 6.5 hours. Even though time is simulated, this will be
+// around 1,500,000 metronome tick invocations.
 TEST_P(FrameBufferProxyTest, NextFrameWithOldTimestamp) {
   // Test inserting 31 frames and pause the stream for a long time before
   // frame 32.
@@ -668,16 +685,19 @@
                           .AsLast()
                           .Build());
   // FrameBuffer2 drops the frame, while FrameBuffer3 will continue the stream.
-  if (field_trial::IsEnabled("WebRTC-FrameBuffer3")) {
+  if (field_trial::FindFullName("WebRTC-FrameBuffer3")
+          .find("arm:FrameBuffer2") == std::string::npos) {
     EXPECT_THAT(WaitForFrameOrTimeout(kFps30Delay), Frame(WithId(2)));
   } else {
     EXPECT_THAT(WaitForFrameOrTimeout(kMaxWaitForFrame), TimedOut());
   }
 }
 
-INSTANTIATE_TEST_SUITE_P(FrameBufferProxy,
-                         FrameBufferProxyTest,
-                         ::testing::Values("WebRTC-FrameBuffer3/Disabled/",
-                                           "WebRTC-FrameBuffer3/Enabled/"));
+INSTANTIATE_TEST_SUITE_P(
+    FrameBufferProxy,
+    FrameBufferProxyTest,
+    ::testing::Values("WebRTC-FrameBuffer3/arm:FrameBuffer2/",
+                      "WebRTC-FrameBuffer3/arm:FrameBuffer3/",
+                      "WebRTC-FrameBuffer3/arm:SyncDecoding/"));
 
 }  // namespace webrtc
diff --git a/video/frame_decode_scheduler.cc b/video/frame_decode_scheduler.cc
deleted file mode 100644
index 5696e10..0000000
--- a/video/frame_decode_scheduler.cc
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- *  Copyright (c) 2022 The WebRTC project authors. All Rights Reserved.
- *
- *  Use of this source code is governed by a BSD-style license
- *  that can be found in the LICENSE file in the root of the source
- *  tree. An additional intellectual property rights grant can be found
- *  in the file PATENTS.  All contributing project authors may
- *  be found in the AUTHORS file in the root of the source tree.
- */
-
-#include "video/frame_decode_scheduler.h"
-
-#include <algorithm>
-#include <utility>
-
-#include "api/sequence_checker.h"
-#include "rtc_base/task_utils/to_queued_task.h"
-
-namespace webrtc {
-
-FrameDecodeScheduler::FrameDecodeScheduler(
-    Clock* clock,
-    TaskQueueBase* const bookkeeping_queue,
-    FrameReleaseCallback callback)
-    : clock_(clock),
-      bookkeeping_queue_(bookkeeping_queue),
-      callback_(std::move(callback)) {
-  RTC_DCHECK(clock_);
-  RTC_DCHECK(bookkeeping_queue_);
-}
-
-FrameDecodeScheduler::~FrameDecodeScheduler() {
-  RTC_DCHECK(!scheduled_rtp_) << "Outstanding scheduled rtp=" << *scheduled_rtp_
-                              << ". Call CancelOutstanding before destruction.";
-}
-
-void FrameDecodeScheduler::ScheduleFrame(
-    uint32_t rtp,
-    FrameDecodeTiming::FrameSchedule schedule) {
-  RTC_DCHECK(!scheduled_rtp_.has_value())
-      << "Can not schedule two frames for release at the same time.";
-  scheduled_rtp_ = rtp;
-
-  TimeDelta wait = std::max(TimeDelta::Zero(),
-                            schedule.max_decode_time - clock_->CurrentTime());
-  bookkeeping_queue_->PostDelayedTask(
-      ToQueuedTask(task_safety_.flag(),
-                   [this, rtp, schedule] {
-                     RTC_DCHECK_RUN_ON(bookkeeping_queue_);
-                     // If the next frame rtp  has changed since this task was
-                     // this scheduled  release should be skipped.
-                     if (scheduled_rtp_ != rtp)
-                       return;
-                     scheduled_rtp_ = absl::nullopt;
-                     callback_(rtp, schedule.render_time);
-                   }),
-      wait.ms());
-}
-
-void FrameDecodeScheduler::CancelOutstanding() {
-  scheduled_rtp_ = absl::nullopt;
-}
-
-}  // namespace webrtc
diff --git a/video/frame_decode_scheduler.h b/video/frame_decode_scheduler.h
index 6e1c1bd..5387e54 100644
--- a/video/frame_decode_scheduler.h
+++ b/video/frame_decode_scheduler.h
@@ -16,10 +16,7 @@
 #include <functional>
 
 #include "absl/types/optional.h"
-#include "api/task_queue/task_queue_base.h"
 #include "api/units/timestamp.h"
-#include "rtc_base/task_utils/pending_task_safety_flag.h"
-#include "system_wrappers/include/clock.h"
 #include "video/frame_decode_timing.h"
 
 namespace webrtc {
@@ -30,25 +27,23 @@
   using FrameReleaseCallback =
       std::function<void(uint32_t rtp_timestamp, Timestamp render_time)>;
 
-  FrameDecodeScheduler(Clock* clock,
-                       TaskQueueBase* const bookkeeping_queue,
-                       FrameReleaseCallback callback);
-  ~FrameDecodeScheduler();
-  FrameDecodeScheduler(const FrameDecodeScheduler&) = delete;
-  FrameDecodeScheduler& operator=(const FrameDecodeScheduler&) = delete;
+  virtual ~FrameDecodeScheduler() = default;
 
-  absl::optional<uint32_t> scheduled_rtp() const { return scheduled_rtp_; }
+  // Returns the rtp timestamp of the next frame scheduled for release, or
+  // `nullopt` if no frame is currently scheduled.
+  virtual absl::optional<uint32_t> ScheduledRtpTimestamp() = 0;
 
-  void ScheduleFrame(uint32_t rtp, FrameDecodeTiming::FrameSchedule schedule);
-  void CancelOutstanding();
+  // Shedules a frame for release based on `schedule`. When released, `callback`
+  // will be invoked with the `rtp` timestamp of the frame and the `render_time`
+  virtual void ScheduleFrame(uint32_t rtp,
+                             FrameDecodeTiming::FrameSchedule schedule,
+                             FrameReleaseCallback callback) = 0;
 
- private:
-  Clock* const clock_;
-  TaskQueueBase* const bookkeeping_queue_;
-  const FrameReleaseCallback callback_;
+  // Cancels all scheduled frames.
+  virtual void CancelOutstanding() = 0;
 
-  absl::optional<uint32_t> scheduled_rtp_;
-  ScopedTaskSafetyDetached task_safety_;
+  // Stop() Must be called before destruction.
+  virtual void Stop() = 0;
 };
 
 }  // namespace webrtc
diff --git a/video/frame_decode_timing.cc b/video/frame_decode_timing.cc
index 7150bbc..02567ba 100644
--- a/video/frame_decode_timing.cc
+++ b/video/frame_decode_timing.cc
@@ -15,12 +15,6 @@
 
 namespace webrtc {
 
-namespace {
-
-constexpr TimeDelta kMaxAllowedFrameDelay = TimeDelta::Millis(5);
-
-}
-
 FrameDecodeTiming::FrameDecodeTiming(Clock* clock,
                                      webrtc::VCMTiming const* timing)
     : clock_(clock), timing_(timing) {
@@ -51,7 +45,7 @@
   RTC_DLOG(LS_VERBOSE) << "Selected frame with rtp " << next_temporal_unit_rtp
                        << " render time " << render_time.ms()
                        << " with a max wait of " << max_wait.ms() << "ms";
-  return FrameSchedule{.max_decode_time = now + max_wait,
+  return FrameSchedule{.latest_decode_time = now + max_wait,
                        .render_time = render_time};
 }
 
diff --git a/video/frame_decode_timing.h b/video/frame_decode_timing.h
index 8c7353e..ff67ace 100644
--- a/video/frame_decode_timing.h
+++ b/video/frame_decode_timing.h
@@ -29,8 +29,12 @@
   FrameDecodeTiming(const FrameDecodeTiming&) = delete;
   FrameDecodeTiming& operator=(const FrameDecodeTiming&) = delete;
 
+  // Any frame that has decode delay more than this in the past can be
+  // fast-forwarded.
+  static constexpr TimeDelta kMaxAllowedFrameDelay = TimeDelta::Millis(5);
+
   struct FrameSchedule {
-    Timestamp max_decode_time;
+    Timestamp latest_decode_time;
     Timestamp render_time;
   };
 
diff --git a/video/frame_decode_timing_unittest.cc b/video/frame_decode_timing_unittest.cc
index ec77ed4..1932e85 100644
--- a/video/frame_decode_timing_unittest.cc
+++ b/video/frame_decode_timing_unittest.cc
@@ -81,10 +81,11 @@
 
   EXPECT_THAT(
       frame_decode_scheduler_.OnFrameBufferUpdated(90000, 180000, false),
-      Optional(AllOf(Field(&FrameDecodeTiming::FrameSchedule::max_decode_time,
-                           Eq(clock_.CurrentTime() + decode_delay)),
-                     Field(&FrameDecodeTiming::FrameSchedule::render_time,
-                           Eq(render_time)))));
+      Optional(
+          AllOf(Field(&FrameDecodeTiming::FrameSchedule::latest_decode_time,
+                      Eq(clock_.CurrentTime() + decode_delay)),
+                Field(&FrameDecodeTiming::FrameSchedule::render_time,
+                      Eq(render_time)))));
 }
 
 TEST_F(FrameDecodeTimingTest, FastForwardsFrameTooFarInThePast) {
@@ -102,12 +103,12 @@
   const Timestamp render_time = clock_.CurrentTime();
   timing_.SetTimes(90000, render_time, decode_delay);
 
-  EXPECT_THAT(
-      frame_decode_scheduler_.OnFrameBufferUpdated(90000, 90000, false),
-      Optional(AllOf(Field(&FrameDecodeTiming::FrameSchedule::max_decode_time,
-                           Eq(clock_.CurrentTime() + decode_delay)),
-                     Field(&FrameDecodeTiming::FrameSchedule::render_time,
-                           Eq(render_time)))));
+  EXPECT_THAT(frame_decode_scheduler_.OnFrameBufferUpdated(90000, 90000, false),
+              Optional(AllOf(
+                  Field(&FrameDecodeTiming::FrameSchedule::latest_decode_time,
+                        Eq(clock_.CurrentTime() + decode_delay)),
+                  Field(&FrameDecodeTiming::FrameSchedule::render_time,
+                        Eq(render_time)))));
 }
 
 }  // namespace webrtc
diff --git a/video/task_queue_frame_decode_scheduler.cc b/video/task_queue_frame_decode_scheduler.cc
new file mode 100644
index 0000000..72de3c3
--- /dev/null
+++ b/video/task_queue_frame_decode_scheduler.cc
@@ -0,0 +1,76 @@
+/*
+ *  Copyright (c) 2022 The WebRTC project authors. All Rights Reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "video/task_queue_frame_decode_scheduler.h"
+
+#include <algorithm>
+#include <utility>
+
+#include "api/sequence_checker.h"
+#include "rtc_base/checks.h"
+#include "rtc_base/task_utils/to_queued_task.h"
+
+namespace webrtc {
+
+TaskQueueFrameDecodeScheduler::TaskQueueFrameDecodeScheduler(
+    Clock* clock,
+    TaskQueueBase* const bookkeeping_queue)
+    : clock_(clock), bookkeeping_queue_(bookkeeping_queue) {
+  RTC_DCHECK(clock_);
+  RTC_DCHECK(bookkeeping_queue_);
+}
+
+TaskQueueFrameDecodeScheduler::~TaskQueueFrameDecodeScheduler() {
+  RTC_DCHECK(stopped_);
+  RTC_DCHECK(!scheduled_rtp_) << "Outstanding scheduled rtp=" << *scheduled_rtp_
+                              << ". Call CancelOutstanding before destruction.";
+}
+
+void TaskQueueFrameDecodeScheduler::ScheduleFrame(
+    uint32_t rtp,
+    FrameDecodeTiming::FrameSchedule schedule,
+    FrameReleaseCallback cb) {
+  RTC_DCHECK(!stopped_) << "Can not schedule frames after stopped.";
+  RTC_DCHECK(!scheduled_rtp_.has_value())
+      << "Can not schedule two frames for release at the same time.";
+  RTC_DCHECK(cb);
+  scheduled_rtp_ = rtp;
+
+  TimeDelta wait = std::max(
+      TimeDelta::Zero(), schedule.latest_decode_time - clock_->CurrentTime());
+  bookkeeping_queue_->PostDelayedTask(
+      ToQueuedTask(task_safety_.flag(),
+                   [this, rtp, schedule, cb = std::move(cb)] {
+                     RTC_DCHECK_RUN_ON(bookkeeping_queue_);
+                     // If the next frame rtp  has changed since this task was
+                     // this scheduled  release should be skipped.
+                     if (scheduled_rtp_ != rtp)
+                       return;
+                     scheduled_rtp_ = absl::nullopt;
+                     cb(rtp, schedule.render_time);
+                   }),
+      wait.ms());
+}
+
+void TaskQueueFrameDecodeScheduler::CancelOutstanding() {
+  scheduled_rtp_ = absl::nullopt;
+}
+
+absl::optional<uint32_t>
+TaskQueueFrameDecodeScheduler::ScheduledRtpTimestamp() {
+  return scheduled_rtp_;
+}
+
+void TaskQueueFrameDecodeScheduler::Stop() {
+  CancelOutstanding();
+  stopped_ = true;
+}
+
+}  // namespace webrtc
diff --git a/video/task_queue_frame_decode_scheduler.h b/video/task_queue_frame_decode_scheduler.h
new file mode 100644
index 0000000..69c6dae
--- /dev/null
+++ b/video/task_queue_frame_decode_scheduler.h
@@ -0,0 +1,48 @@
+/*
+ *  Copyright (c) 2022 The WebRTC project authors. All Rights Reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+
+#ifndef VIDEO_TASK_QUEUE_FRAME_DECODE_SCHEDULER_H_
+#define VIDEO_TASK_QUEUE_FRAME_DECODE_SCHEDULER_H_
+
+#include "video/frame_decode_scheduler.h"
+
+namespace webrtc {
+
+// An implementation of FrameDecodeScheduler that is based on TaskQueues. This
+// is the default implementation for general use.
+class TaskQueueFrameDecodeScheduler : public FrameDecodeScheduler {
+ public:
+  TaskQueueFrameDecodeScheduler(Clock* clock,
+                                TaskQueueBase* const bookkeeping_queue);
+  ~TaskQueueFrameDecodeScheduler() override;
+  TaskQueueFrameDecodeScheduler(const TaskQueueFrameDecodeScheduler&) = delete;
+  TaskQueueFrameDecodeScheduler& operator=(
+      const TaskQueueFrameDecodeScheduler&) = delete;
+
+  // FrameDecodeScheduler implementation.
+  absl::optional<uint32_t> ScheduledRtpTimestamp() override;
+  void ScheduleFrame(uint32_t rtp,
+                     FrameDecodeTiming::FrameSchedule schedule,
+                     FrameReleaseCallback cb) override;
+  void CancelOutstanding() override;
+  void Stop() override;
+
+ private:
+  Clock* const clock_;
+  TaskQueueBase* const bookkeeping_queue_;
+
+  absl::optional<uint32_t> scheduled_rtp_;
+  ScopedTaskSafetyDetached task_safety_;
+  bool stopped_ = false;
+};
+
+}  // namespace webrtc
+
+#endif  // VIDEO_TASK_QUEUE_FRAME_DECODE_SCHEDULER_H_
diff --git a/video/frame_decode_scheduler_unittest.cc b/video/task_queue_frame_decode_scheduler_unittest.cc
similarity index 60%
rename from video/frame_decode_scheduler_unittest.cc
rename to video/task_queue_frame_decode_scheduler_unittest.cc
index e30863c..2807e65 100644
--- a/video/frame_decode_scheduler_unittest.cc
+++ b/video/task_queue_frame_decode_scheduler_unittest.cc
@@ -8,13 +8,14 @@
  *  be found in the AUTHORS file in the root of the source tree.
  */
 
-#include "video/frame_decode_scheduler.h"
+#include "video/task_queue_frame_decode_scheduler.h"
 
 #include <stddef.h>
 
 #include <memory>
 #include <utility>
 
+#include "absl/functional/bind_front.h"
 #include "absl/types/optional.h"
 #include "api/units/time_delta.h"
 #include "api/units/timestamp.h"
@@ -28,29 +29,31 @@
 using ::testing::Eq;
 using ::testing::Optional;
 
-class FrameDecodeSchedulerTest : public ::testing::Test {
+class TaskQueueFrameDecodeSchedulerTest : public ::testing::Test {
  public:
-  FrameDecodeSchedulerTest()
+  TaskQueueFrameDecodeSchedulerTest()
       : time_controller_(Timestamp::Millis(2000)),
         task_queue_(time_controller_.GetTaskQueueFactory()->CreateTaskQueue(
             "scheduler",
             TaskQueueFactory::Priority::NORMAL)),
-        scheduler_(std::make_unique<FrameDecodeScheduler>(
+        scheduler_(std::make_unique<TaskQueueFrameDecodeScheduler>(
             time_controller_.GetClock(),
-            task_queue_.Get(),
-            [this](uint32_t rtp, Timestamp render_time) {
-              OnFrame(rtp, render_time);
-            })) {}
+            task_queue_.Get())) {}
 
-  ~FrameDecodeSchedulerTest() override {
+  ~TaskQueueFrameDecodeSchedulerTest() override {
     if (scheduler_) {
       OnQueue([&] {
-        scheduler_->CancelOutstanding();
+        scheduler_->Stop();
         scheduler_ = nullptr;
       });
     }
   }
 
+  void FrameReadyForDecode(uint32_t timestamp, Timestamp render_time) {
+    last_rtp_ = timestamp;
+    last_render_time_ = render_time;
+  }
+
  protected:
   template <class Task>
   void OnQueue(Task&& t) {
@@ -63,23 +66,21 @@
   std::unique_ptr<FrameDecodeScheduler> scheduler_;
   absl::optional<uint32_t> last_rtp_;
   absl::optional<Timestamp> last_render_time_;
-
- private:
-  void OnFrame(uint32_t timestamp, Timestamp render_time) {
-    last_rtp_ = timestamp;
-    last_render_time_ = render_time;
-  }
 };
 
-TEST_F(FrameDecodeSchedulerTest, FrameYieldedAfterSpecifiedPeriod) {
+TEST_F(TaskQueueFrameDecodeSchedulerTest, FrameYieldedAfterSpecifiedPeriod) {
   constexpr TimeDelta decode_delay = TimeDelta::Millis(5);
   const Timestamp now = time_controller_.GetClock()->CurrentTime();
   const uint32_t rtp = 90000;
   const Timestamp render_time = now + TimeDelta::Millis(15);
+  FrameDecodeTiming::FrameSchedule schedule = {
+      .latest_decode_time = now + decode_delay, .render_time = render_time};
   OnQueue([&] {
-    scheduler_->ScheduleFrame(rtp, {.max_decode_time = now + decode_delay,
-                                    .render_time = render_time});
-    EXPECT_THAT(scheduler_->scheduled_rtp(), Optional(rtp));
+    scheduler_->ScheduleFrame(
+        rtp, schedule,
+        absl::bind_front(
+            &TaskQueueFrameDecodeSchedulerTest::FrameReadyForDecode, this));
+    EXPECT_THAT(scheduler_->ScheduledRtpTimestamp(), Optional(rtp));
   });
   EXPECT_THAT(last_rtp_, Eq(absl::nullopt));
 
@@ -88,35 +89,43 @@
   EXPECT_THAT(last_render_time_, Optional(render_time));
 }
 
-TEST_F(FrameDecodeSchedulerTest, NegativeDecodeDelayIsRoundedToZero) {
+TEST_F(TaskQueueFrameDecodeSchedulerTest, NegativeDecodeDelayIsRoundedToZero) {
   constexpr TimeDelta decode_delay = TimeDelta::Millis(-5);
   const Timestamp now = time_controller_.GetClock()->CurrentTime();
   const uint32_t rtp = 90000;
   const Timestamp render_time = now + TimeDelta::Millis(15);
+  FrameDecodeTiming::FrameSchedule schedule = {
+      .latest_decode_time = now + decode_delay, .render_time = render_time};
   OnQueue([&] {
-    scheduler_->ScheduleFrame(rtp, {.max_decode_time = now + decode_delay,
-                                    .render_time = render_time});
-    EXPECT_THAT(scheduler_->scheduled_rtp(), Optional(rtp));
+    scheduler_->ScheduleFrame(
+        rtp, schedule,
+        absl::bind_front(
+            &TaskQueueFrameDecodeSchedulerTest::FrameReadyForDecode, this));
+    EXPECT_THAT(scheduler_->ScheduledRtpTimestamp(), Optional(rtp));
   });
   EXPECT_THAT(last_rtp_, Optional(rtp));
   EXPECT_THAT(last_render_time_, Optional(render_time));
 }
 
-TEST_F(FrameDecodeSchedulerTest, CancelOutstanding) {
+TEST_F(TaskQueueFrameDecodeSchedulerTest, CancelOutstanding) {
   constexpr TimeDelta decode_delay = TimeDelta::Millis(50);
   const Timestamp now = time_controller_.GetClock()->CurrentTime();
   const uint32_t rtp = 90000;
+  FrameDecodeTiming::FrameSchedule schedule = {
+      .latest_decode_time = now + decode_delay,
+      .render_time = now + TimeDelta::Millis(75)};
   OnQueue([&] {
-    scheduler_->ScheduleFrame(rtp,
-                              {.max_decode_time = now + decode_delay,
-                               .render_time = now + TimeDelta::Millis(75)});
-    EXPECT_THAT(scheduler_->scheduled_rtp(), Optional(rtp));
+    scheduler_->ScheduleFrame(
+        rtp, schedule,
+        absl::bind_front(
+            &TaskQueueFrameDecodeSchedulerTest::FrameReadyForDecode, this));
+    EXPECT_THAT(scheduler_->ScheduledRtpTimestamp(), Optional(rtp));
   });
   time_controller_.AdvanceTime(decode_delay / 2);
   OnQueue([&] {
-    EXPECT_THAT(scheduler_->scheduled_rtp(), Optional(rtp));
+    EXPECT_THAT(scheduler_->ScheduledRtpTimestamp(), Optional(rtp));
     scheduler_->CancelOutstanding();
-    EXPECT_THAT(scheduler_->scheduled_rtp(), Eq(absl::nullopt));
+    EXPECT_THAT(scheduler_->ScheduledRtpTimestamp(), Eq(absl::nullopt));
   });
   time_controller_.AdvanceTime(decode_delay / 2);
   EXPECT_THAT(last_rtp_, Eq(absl::nullopt));
diff --git a/video/video_receive_stream2.cc b/video/video_receive_stream2.cc
index 918413f..ef9692e 100644
--- a/video/video_receive_stream2.cc
+++ b/video/video_receive_stream2.cc
@@ -272,7 +272,7 @@
   frame_buffer_ = FrameBufferProxy::CreateFromFieldTrial(
       clock_, call_->worker_thread(), timing_.get(), &stats_proxy_,
       &decode_queue_, this, TimeDelta::Millis(max_wait_for_keyframe_ms_),
-      TimeDelta::Millis(max_wait_for_frame_ms_));
+      TimeDelta::Millis(max_wait_for_frame_ms_), nullptr);
 
   if (config_.rtp.rtx_ssrc) {
     rtx_receive_stream_ = std::make_unique<RtxReceiveStream>(