Introduce Sync-Decoding based on Metronome
Adds new class DecodeSynchronizer that will coalesce the decoding
of received streams on the metronome. This feature is experimental and
is backed by a field trial WebRTC-FrameBuffer3.
This experiment now has 3 arms to it,
"WebRTC-FrameBuffer3/arm:FrameBuffer2/": Default, uses old frame buffer.
"WebRTC-FrameBuffer3/arm:FrameBuffer3/": Uses new frame buffer.
"WebRTC-FrameBuffer3/arm:SyncDecoding/": Uses new frame buffer with
frame scheduled on the metronome.
The SyncDecoding arm will not work until it is wired up in the follow-up
CL.
This change also makes the following modifications,
* Adds FakeMetronome utilities for tests using a metronome.
* Makes FrameDecodeScheduler an interface. The default implementation is
TaskQueueFrameDecodeScheduler.
* FrameDecodeScheduler now has a Stop() method, which must be called
before destruction.
TBR=philipel@webrtc.org
Change-Id: I58a306bb883604b0be3eb2a04b3d07dbdf185c71
Bug: webrtc:13658
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/250665
Reviewed-by: Henrik Boström <hbos@webrtc.org>
Reviewed-by: Ilya Nikolaevskiy <ilnik@webrtc.org>
Reviewed-by: Stefan Holmer <holmer@google.com>
Reviewed-by: Stefan Holmer <stefan@webrtc.org>
Commit-Queue: Evan Shrubsole <eshr@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#35988}
diff --git a/api/DEPS b/api/DEPS
index ce88d34..ff493bf 100644
--- a/api/DEPS
+++ b/api/DEPS
@@ -255,6 +255,13 @@
"+rtc_base/ref_counted_object.h",
],
+ "fake_metronome\.h": [
+ "+rtc_base/synchronization/mutex.h",
+ "+rtc_base/task_queue.h",
+ "+rtc_base/task_utils/repeating_task.h",
+ "+rtc_base/thread_annotations.h",
+ ],
+
"mock.*\.h": [
"+test/gmock.h",
],
diff --git a/api/metronome/test/BUILD.gn b/api/metronome/test/BUILD.gn
new file mode 100644
index 0000000..d25d5a8
--- /dev/null
+++ b/api/metronome/test/BUILD.gn
@@ -0,0 +1,30 @@
+# Copyright (c) 2022 The WebRTC project authors. All Rights Reserved.
+#
+# Use of this source code is governed by a BSD-style license
+# that can be found in the LICENSE file in the root of the source
+# tree. An additional intellectual property rights grant can be found
+# in the file PATENTS. All contributing project authors may
+# be found in the AUTHORS file in the root of the source tree.
+
+import("../../../webrtc.gni")
+
+rtc_library("fake_metronome") {
+ testonly = true
+ sources = [
+ "fake_metronome.cc",
+ "fake_metronome.h",
+ ]
+ deps = [
+ "..:metronome",
+ "../..:priority",
+ "../..:sequence_checker",
+ "../../../rtc_base:macromagic",
+ "../../../rtc_base:rtc_event",
+ "../../../rtc_base:rtc_task_queue",
+ "../../../rtc_base/synchronization:mutex",
+ "../../../rtc_base/task_utils:repeating_task",
+ "../../../rtc_base/task_utils:to_queued_task",
+ "../../task_queue",
+ "../../units:time_delta",
+ ]
+}
diff --git a/api/metronome/test/fake_metronome.cc b/api/metronome/test/fake_metronome.cc
new file mode 100644
index 0000000..83b5ea7
--- /dev/null
+++ b/api/metronome/test/fake_metronome.cc
@@ -0,0 +1,93 @@
+/*
+ * Copyright (c) 2022 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "api/metronome/test/fake_metronome.h"
+
+#include "api/priority.h"
+#include "api/sequence_checker.h"
+#include "api/task_queue/task_queue_factory.h"
+#include "api/units/time_delta.h"
+#include "rtc_base/event.h"
+#include "rtc_base/task_utils/repeating_task.h"
+#include "rtc_base/task_utils/to_queued_task.h"
+
+namespace webrtc::test {
+
+ForcedTickMetronome::ForcedTickMetronome(TimeDelta tick_period)
+ : tick_period_(tick_period) {}
+
+void ForcedTickMetronome::AddListener(TickListener* listener) {
+ listeners_.insert(listener);
+}
+
+void ForcedTickMetronome::RemoveListener(TickListener* listener) {
+ listeners_.erase(listener);
+}
+
+TimeDelta ForcedTickMetronome::TickPeriod() const {
+ return tick_period_;
+}
+
+size_t ForcedTickMetronome::NumListeners() {
+ return listeners_.size();
+}
+
+void ForcedTickMetronome::Tick() {
+ for (auto* listener : listeners_) {
+ listener->OnTickTaskQueue()->PostTask(
+ ToQueuedTask([listener] { listener->OnTick(); }));
+ }
+}
+
+FakeMetronome::FakeMetronome(TaskQueueFactory* factory, TimeDelta tick_period)
+ : tick_period_(tick_period),
+ queue_(factory->CreateTaskQueue("MetronomeQueue",
+ TaskQueueFactory::Priority::HIGH)) {}
+
+FakeMetronome::~FakeMetronome() {
+ RTC_DCHECK(listeners_.empty());
+}
+
+void FakeMetronome::AddListener(TickListener* listener) {
+ MutexLock lock(&mutex_);
+ listeners_.insert(listener);
+ if (!started_) {
+ tick_task_ = RepeatingTaskHandle::Start(queue_.Get(), [this] {
+ MutexLock lock(&mutex_);
+ // Stop if empty.
+ if (listeners_.empty())
+ return TimeDelta::PlusInfinity();
+ for (auto* listener : listeners_) {
+ listener->OnTickTaskQueue()->PostTask(
+ ToQueuedTask([listener] { listener->OnTick(); }));
+ }
+ return tick_period_;
+ });
+ started_ = true;
+ }
+}
+
+void FakeMetronome::RemoveListener(TickListener* listener) {
+ MutexLock lock(&mutex_);
+ listeners_.erase(listener);
+}
+
+void FakeMetronome::Stop() {
+ MutexLock lock(&mutex_);
+ RTC_DCHECK(listeners_.empty());
+ if (started_)
+ queue_.PostTask([this] { tick_task_.Stop(); });
+}
+
+TimeDelta FakeMetronome::TickPeriod() const {
+ return tick_period_;
+}
+
+} // namespace webrtc::test
diff --git a/api/metronome/test/fake_metronome.h b/api/metronome/test/fake_metronome.h
new file mode 100644
index 0000000..28a79e0
--- /dev/null
+++ b/api/metronome/test/fake_metronome.h
@@ -0,0 +1,77 @@
+/*
+ * Copyright (c) 2022 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#ifndef API_METRONOME_TEST_FAKE_METRONOME_H_
+#define API_METRONOME_TEST_FAKE_METRONOME_H_
+
+#include <memory>
+#include <set>
+
+#include "api/metronome/metronome.h"
+#include "api/task_queue/task_queue_base.h"
+#include "api/task_queue/task_queue_factory.h"
+#include "api/units/time_delta.h"
+#include "rtc_base/synchronization/mutex.h"
+#include "rtc_base/task_queue.h"
+#include "rtc_base/task_utils/repeating_task.h"
+#include "rtc_base/thread_annotations.h"
+
+namespace webrtc::test {
+
+// ForcedTickMetronome is a Metronome that ticks when `Tick()` is invoked.
+// The constructor argument `tick_period` returned in `TickPeriod()`.
+class ForcedTickMetronome : public Metronome {
+ public:
+ explicit ForcedTickMetronome(TimeDelta tick_period);
+
+ // Forces all TickListeners to run `OnTick`.
+ void Tick();
+ size_t NumListeners();
+
+ // Metronome implementation.
+ void AddListener(TickListener* listener) override;
+ void RemoveListener(TickListener* listener) override;
+ TimeDelta TickPeriod() const override;
+
+ private:
+ const TimeDelta tick_period_;
+ std::set<TickListener*> listeners_;
+};
+
+// FakeMetronome is a metronome that ticks based on a repeating task at the
+// `tick_period` provided in the constructor. It is designed for use with
+// simulated task queues for unit tests.
+//
+// `Stop()` must be called before destruction, as it cancels the metronome tick
+// on the proper task queue.
+class FakeMetronome : public Metronome {
+ public:
+ FakeMetronome(TaskQueueFactory* factory, TimeDelta tick_period);
+ ~FakeMetronome() override;
+
+ // Metronome implementation.
+ void AddListener(TickListener* listener) override;
+ void RemoveListener(TickListener* listener) override;
+ TimeDelta TickPeriod() const override;
+
+ void Stop();
+
+ private:
+ const TimeDelta tick_period_;
+ RepeatingTaskHandle tick_task_;
+ bool started_ RTC_GUARDED_BY(mutex_) = false;
+ std::set<TickListener*> listeners_ RTC_GUARDED_BY(mutex_);
+ Mutex mutex_;
+ rtc::TaskQueue queue_;
+};
+
+} // namespace webrtc::test
+
+#endif // API_METRONOME_TEST_FAKE_METRONOME_H_
diff --git a/video/BUILD.gn b/video/BUILD.gn
index b89e5e4..1a247f4 100644
--- a/video/BUILD.gn
+++ b/video/BUILD.gn
@@ -301,10 +301,12 @@
"frame_buffer_proxy.h",
]
deps = [
- ":frame_decode_scheduler",
+ ":decode_synchronizer",
":frame_decode_timing",
+ ":task_queue_frame_decode_scheduler",
":video_receive_stream_timeout_tracker",
"../api:sequence_checker",
+ "../api/metronome",
"../api/task_queue",
"../api/video:encoded_frame",
"../modules/video_coding",
@@ -312,6 +314,7 @@
"../modules/video_coding:frame_helpers",
"../modules/video_coding:timing",
"../modules/video_coding:video_codec_interface",
+ "../rtc_base:checks",
"../rtc_base:logging",
"../rtc_base:macromagic",
"../rtc_base:rtc_task_queue",
@@ -324,16 +327,27 @@
]
}
-rtc_library("frame_decode_scheduler") {
+rtc_source_set("frame_decode_scheduler") {
+ sources = [ "frame_decode_scheduler.h" ]
+ deps = [
+ ":frame_decode_timing",
+ "../api/units:timestamp",
+ ]
+ absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ]
+}
+
+rtc_library("task_queue_frame_decode_scheduler") {
sources = [
- "frame_decode_scheduler.cc",
- "frame_decode_scheduler.h",
+ "task_queue_frame_decode_scheduler.cc",
+ "task_queue_frame_decode_scheduler.h",
]
deps = [
+ ":frame_decode_scheduler",
":frame_decode_timing",
"../api:sequence_checker",
"../api/task_queue",
"../api/units:timestamp",
+ "../rtc_base:checks",
"../rtc_base/task_utils:pending_task_safety_flag",
"../rtc_base/task_utils:to_queued_task",
"../system_wrappers",
@@ -369,6 +383,26 @@
]
}
+rtc_library("decode_synchronizer") {
+ sources = [
+ "decode_synchronizer.cc",
+ "decode_synchronizer.h",
+ ]
+ deps = [
+ ":frame_decode_scheduler",
+ ":frame_decode_timing",
+ "../api:sequence_checker",
+ "../api/metronome",
+ "../api/task_queue",
+ "../api/units:time_delta",
+ "../api/units:timestamp",
+ "../rtc_base:checks",
+ "../rtc_base:logging",
+ "../rtc_base:macromagic",
+ ]
+ absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ]
+}
+
rtc_library("video_stream_encoder_impl") {
visibility = [ "*" ]
@@ -702,6 +736,7 @@
"buffered_frame_decryptor_unittest.cc",
"call_stats2_unittest.cc",
"cpu_scaling_tests.cc",
+ "decode_synchronizer_unittest.cc",
"encoder_bitrate_adjuster_unittest.cc",
"encoder_overshoot_detector_unittest.cc",
"encoder_rtcp_feedback_unittest.cc",
@@ -726,7 +761,6 @@
"end_to_end_tests/transport_feedback_tests.cc",
"frame_buffer_proxy_unittest.cc",
"frame_cadence_adapter_unittest.cc",
- "frame_decode_scheduler_unittest.cc",
"frame_decode_timing_unittest.cc",
"frame_encode_metadata_writer_unittest.cc",
"picture_id_tests.cc",
@@ -741,6 +775,7 @@
"send_statistics_proxy_unittest.cc",
"stats_counter_unittest.cc",
"stream_synchronization_unittest.cc",
+ "task_queue_frame_decode_scheduler_unittest.cc",
"video_receive_stream2_unittest.cc",
"video_receive_stream_timeout_tracker_unittest.cc",
"video_send_stream_impl_unittest.cc",
@@ -750,10 +785,12 @@
"video_stream_encoder_unittest.cc",
]
deps = [
+ ":decode_synchronizer",
":frame_buffer_proxy",
":frame_cadence_adapter",
":frame_decode_scheduler",
":frame_decode_timing",
+ ":task_queue_frame_decode_scheduler",
":video",
":video_mocks",
":video_receive_stream_timeout_tracker",
@@ -777,6 +814,7 @@
"../api:transport_api",
"../api/adaptation:resource_adaptation_api",
"../api/crypto:options",
+ "../api/metronome/test:fake_metronome",
"../api/rtc_event_log",
"../api/task_queue",
"../api/task_queue:default_task_queue_factory",
@@ -870,6 +908,7 @@
]
absl_deps = [
"//third_party/abseil-cpp/absl/algorithm:container",
+ "//third_party/abseil-cpp/absl/functional:bind_front",
"//third_party/abseil-cpp/absl/memory",
"//third_party/abseil-cpp/absl/types:optional",
"//third_party/abseil-cpp/absl/types:variant",
diff --git a/video/decode_synchronizer.cc b/video/decode_synchronizer.cc
new file mode 100644
index 0000000..9f22c49
--- /dev/null
+++ b/video/decode_synchronizer.cc
@@ -0,0 +1,186 @@
+/*
+ * Copyright (c) 2022 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "video/decode_synchronizer.h"
+
+#include <iterator>
+#include <memory>
+#include <utility>
+#include <vector>
+
+#include "api/sequence_checker.h"
+#include "api/units/time_delta.h"
+#include "api/units/timestamp.h"
+#include "rtc_base/checks.h"
+#include "rtc_base/logging.h"
+#include "video/frame_decode_scheduler.h"
+#include "video/frame_decode_timing.h"
+
+namespace webrtc {
+
+DecodeSynchronizer::ScheduledFrame::ScheduledFrame(
+ uint32_t rtp_timestamp,
+ FrameDecodeTiming::FrameSchedule schedule,
+ FrameDecodeScheduler::FrameReleaseCallback callback)
+ : rtp_timestamp_(rtp_timestamp),
+ schedule_(std::move(schedule)),
+ callback_(std::move(callback)) {}
+
+void DecodeSynchronizer::ScheduledFrame::RunFrameReleaseCallback() && {
+ // Inspiration from Chromium base::OnceCallback. Move `*this` to a local
+ // before execution to ensure internal state is cleared after callback
+ // execution.
+ auto sf = std::move(*this);
+ sf.callback_(sf.rtp_timestamp_, sf.schedule_.render_time);
+}
+
+Timestamp DecodeSynchronizer::ScheduledFrame::LatestDecodeTime() const {
+ return schedule_.latest_decode_time;
+}
+
+DecodeSynchronizer::SynchronizedFrameDecodeScheduler::
+ SynchronizedFrameDecodeScheduler(DecodeSynchronizer* sync)
+ : sync_(sync) {
+ RTC_DCHECK(sync_);
+}
+
+DecodeSynchronizer::SynchronizedFrameDecodeScheduler::
+ ~SynchronizedFrameDecodeScheduler() {
+ RTC_DCHECK(!next_frame_);
+ RTC_DCHECK(stopped_);
+}
+
+absl::optional<uint32_t>
+DecodeSynchronizer::SynchronizedFrameDecodeScheduler::ScheduledRtpTimestamp() {
+ return next_frame_.has_value()
+ ? absl::make_optional(next_frame_->rtp_timestamp())
+ : absl::nullopt;
+}
+
+DecodeSynchronizer::ScheduledFrame
+DecodeSynchronizer::SynchronizedFrameDecodeScheduler::ReleaseNextFrame() {
+ RTC_DCHECK(next_frame_);
+ auto res = std::move(*next_frame_);
+ next_frame_.reset();
+ return res;
+}
+
+Timestamp
+DecodeSynchronizer::SynchronizedFrameDecodeScheduler::LatestDecodeTime() {
+ RTC_DCHECK(next_frame_);
+ return next_frame_->LatestDecodeTime();
+}
+
+void DecodeSynchronizer::SynchronizedFrameDecodeScheduler::ScheduleFrame(
+ uint32_t rtp,
+ FrameDecodeTiming::FrameSchedule schedule,
+ FrameReleaseCallback cb) {
+ RTC_DCHECK(!next_frame_) << "Can not schedule two frames at once.";
+ next_frame_ = ScheduledFrame(rtp, std::move(schedule), std::move(cb));
+ sync_->OnFrameScheduled(this);
+}
+
+void DecodeSynchronizer::SynchronizedFrameDecodeScheduler::CancelOutstanding() {
+ next_frame_.reset();
+}
+
+void DecodeSynchronizer::SynchronizedFrameDecodeScheduler::Stop() {
+ CancelOutstanding();
+ stopped_ = true;
+ sync_->RemoveFrameScheduler(this);
+}
+
+DecodeSynchronizer::DecodeSynchronizer(Clock* clock,
+ Metronome* metronome,
+ TaskQueueBase* worker_queue)
+ : clock_(clock), worker_queue_(worker_queue), metronome_(metronome) {
+ RTC_DCHECK(metronome_);
+ RTC_DCHECK(worker_queue_);
+}
+
+DecodeSynchronizer::~DecodeSynchronizer() {
+ RTC_DCHECK(schedulers_.empty());
+}
+
+std::unique_ptr<FrameDecodeScheduler>
+DecodeSynchronizer::CreateSynchronizedFrameScheduler() {
+ RTC_DCHECK_RUN_ON(worker_queue_);
+ auto scheduler = std::make_unique<SynchronizedFrameDecodeScheduler>(this);
+ auto [it, inserted] = schedulers_.emplace(scheduler.get());
+ // If this is the first `scheduler` added, start listening to the metronome.
+ if (inserted && schedulers_.size() == 1) {
+ RTC_DLOG(LS_VERBOSE) << "Listening to metronome";
+ metronome_->AddListener(this);
+ }
+
+ return std::move(scheduler);
+}
+
+void DecodeSynchronizer::OnFrameScheduled(
+ SynchronizedFrameDecodeScheduler* scheduler) {
+ RTC_DCHECK_RUN_ON(worker_queue_);
+ RTC_DCHECK(scheduler->ScheduledRtpTimestamp());
+
+ Timestamp now = clock_->CurrentTime();
+ Timestamp next_tick = expected_next_tick_;
+ // If no tick has registered yet assume it will occur in the tick period.
+ if (next_tick.IsInfinite()) {
+ next_tick = now + metronome_->TickPeriod();
+ }
+
+ // Release the frame right away if the decode time is too soon. Otherwise
+ // the stream may fall behind too much.
+ bool decode_before_next_tick =
+ scheduler->LatestDecodeTime() <
+ (next_tick - FrameDecodeTiming::kMaxAllowedFrameDelay);
+ // Decode immediately if the decode time is in the past.
+ bool decode_time_in_past = scheduler->LatestDecodeTime() < now;
+
+ if (decode_before_next_tick || decode_time_in_past) {
+ ScheduledFrame scheduled_frame = scheduler->ReleaseNextFrame();
+ std::move(scheduled_frame).RunFrameReleaseCallback();
+ }
+}
+
+void DecodeSynchronizer::RemoveFrameScheduler(
+ SynchronizedFrameDecodeScheduler* scheduler) {
+ RTC_DCHECK_RUN_ON(worker_queue_);
+ RTC_DCHECK(scheduler);
+ auto it = schedulers_.find(scheduler);
+ if (it == schedulers_.end()) {
+ return;
+ }
+ schedulers_.erase(it);
+ // If there are no more schedulers active, stop listening for metronome ticks.
+ if (schedulers_.empty()) {
+ RTC_DLOG(LS_VERBOSE) << "Not listening to metronome";
+ metronome_->RemoveListener(this);
+ expected_next_tick_ = Timestamp::PlusInfinity();
+ }
+}
+
+void DecodeSynchronizer::OnTick() {
+ RTC_DCHECK_RUN_ON(worker_queue_);
+ expected_next_tick_ = clock_->CurrentTime() + metronome_->TickPeriod();
+
+ for (auto* scheduler : schedulers_) {
+ if (scheduler->ScheduledRtpTimestamp() &&
+ scheduler->LatestDecodeTime() < expected_next_tick_) {
+ auto scheduled_frame = scheduler->ReleaseNextFrame();
+ std::move(scheduled_frame).RunFrameReleaseCallback();
+ }
+ }
+}
+
+TaskQueueBase* DecodeSynchronizer::OnTickTaskQueue() {
+ return worker_queue_;
+}
+
+} // namespace webrtc
diff --git a/video/decode_synchronizer.h b/video/decode_synchronizer.h
new file mode 100644
index 0000000..bcbde4f
--- /dev/null
+++ b/video/decode_synchronizer.h
@@ -0,0 +1,137 @@
+/*
+ * Copyright (c) 2022 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#ifndef VIDEO_DECODE_SYNCHRONIZER_H_
+#define VIDEO_DECODE_SYNCHRONIZER_H_
+
+#include <stdint.h>
+
+#include <functional>
+#include <memory>
+#include <set>
+#include <utility>
+
+#include "absl/types/optional.h"
+#include "api/metronome/metronome.h"
+#include "api/sequence_checker.h"
+#include "api/task_queue/task_queue_base.h"
+#include "api/units/timestamp.h"
+#include "rtc_base/checks.h"
+#include "rtc_base/thread_annotations.h"
+#include "video/frame_decode_scheduler.h"
+#include "video/frame_decode_timing.h"
+
+namespace webrtc {
+
+// DecodeSynchronizer synchronizes the frame scheduling by coalescing decoding
+// on the metronome.
+//
+// A video receive stream can use the DecodeSynchronizer by receiving a
+// FrameDecodeScheduler instance with `CreateSynchronizedFrameScheduler()`.
+// This instance implements FrameDecodeScheduler and can be used as a normal
+// scheduler. This instance is owned by the receive stream, and is borrowed by
+// the DecodeSynchronizer. The DecodeSynchronizer will stop borrowing the
+// instance when `FrameDecodeScheduler::Stop()` is called, after which the
+// scheduler may be destroyed by the receive stream.
+//
+// When a frame is scheduled for decode by a receive stream using the
+// DecodeSynchronizer, it will instead be executed on the metronome during the
+// tick interval where `max_decode_time` occurs. For example, if a frame is
+// scheduled for decode in 50ms and the tick interval is 20ms, then the frame
+// will be released for decoding in 2 ticks. See below for illustation,
+//
+// In the case where the decode time is in the past, or must occur before the
+// next metronome tick then the frame will be released right away, allowing a
+// delayed stream to catch up quickly.
+//
+// DecodeSynchronizer is single threaded - all method calls must run on the
+// `worker_queue_`.
+class DecodeSynchronizer : private Metronome::TickListener {
+ public:
+ DecodeSynchronizer(Clock* clock,
+ Metronome* metronome,
+ TaskQueueBase* worker_queue);
+ ~DecodeSynchronizer() override;
+ DecodeSynchronizer(const DecodeSynchronizer&) = delete;
+ DecodeSynchronizer& operator=(const DecodeSynchronizer&) = delete;
+
+ std::unique_ptr<FrameDecodeScheduler> CreateSynchronizedFrameScheduler();
+
+ private:
+ class ScheduledFrame {
+ public:
+ ScheduledFrame(uint32_t rtp_timestamp,
+ FrameDecodeTiming::FrameSchedule schedule,
+ FrameDecodeScheduler::FrameReleaseCallback callback);
+
+ // Disallow copy since `callback` should only be moved.
+ ScheduledFrame(const ScheduledFrame&) = delete;
+ ScheduledFrame& operator=(const ScheduledFrame&) = delete;
+ ScheduledFrame(ScheduledFrame&&) = default;
+ ScheduledFrame& operator=(ScheduledFrame&&) = default;
+
+ // Executes `callback_`.
+ void RunFrameReleaseCallback() &&;
+
+ uint32_t rtp_timestamp() const { return rtp_timestamp_; }
+ Timestamp LatestDecodeTime() const;
+
+ private:
+ uint32_t rtp_timestamp_;
+ FrameDecodeTiming::FrameSchedule schedule_;
+ FrameDecodeScheduler::FrameReleaseCallback callback_;
+ };
+
+ class SynchronizedFrameDecodeScheduler : public FrameDecodeScheduler {
+ public:
+ explicit SynchronizedFrameDecodeScheduler(DecodeSynchronizer* sync);
+ ~SynchronizedFrameDecodeScheduler() override;
+
+ // Releases the outstanding frame for decoding. This invalidates
+ // `next_frame_`. There must be a frame scheduled.
+ ScheduledFrame ReleaseNextFrame();
+
+ // Returns `next_frame_.schedule.max_decode_time`. There must be a frame
+ // scheduled when this is called.
+ Timestamp LatestDecodeTime();
+
+ // FrameDecodeScheduler implementation.
+ absl::optional<uint32_t> ScheduledRtpTimestamp() override;
+ void ScheduleFrame(uint32_t rtp,
+ FrameDecodeTiming::FrameSchedule schedule,
+ FrameReleaseCallback cb) override;
+ void CancelOutstanding() override;
+ void Stop() override;
+
+ private:
+ DecodeSynchronizer* sync_;
+ absl::optional<ScheduledFrame> next_frame_;
+ bool stopped_ = false;
+ };
+
+ void OnFrameScheduled(SynchronizedFrameDecodeScheduler* scheduler);
+ void RemoveFrameScheduler(SynchronizedFrameDecodeScheduler* scheduler);
+
+ // Metronome::TickListener implementation.
+ void OnTick() override;
+ TaskQueueBase* OnTickTaskQueue() override;
+
+ Clock* const clock_;
+ TaskQueueBase* const worker_queue_;
+ Metronome* const metronome_;
+
+ Timestamp expected_next_tick_ = Timestamp::PlusInfinity();
+ std::set<SynchronizedFrameDecodeScheduler*> schedulers_
+ RTC_GUARDED_BY(worker_queue_);
+};
+
+} // namespace webrtc
+
+#endif // VIDEO_DECODE_SYNCHRONIZER_H_
diff --git a/video/decode_synchronizer_unittest.cc b/video/decode_synchronizer_unittest.cc
new file mode 100644
index 0000000..db9540f
--- /dev/null
+++ b/video/decode_synchronizer_unittest.cc
@@ -0,0 +1,232 @@
+/*
+ * Copyright (c) 2022 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "video/decode_synchronizer.h"
+
+#include <stddef.h>
+
+#include <memory>
+#include <utility>
+
+#include "api/metronome/test/fake_metronome.h"
+#include "api/units/time_delta.h"
+#include "test/gmock.h"
+#include "test/gtest.h"
+#include "test/run_loop.h"
+#include "test/time_controller/simulated_time_controller.h"
+#include "video/frame_decode_scheduler.h"
+#include "video/frame_decode_timing.h"
+
+using ::testing::_;
+using ::testing::Eq;
+
+namespace webrtc {
+
+class DecodeSynchronizerTest : public ::testing::Test {
+ public:
+ static constexpr TimeDelta kTickPeriod = TimeDelta::Millis(33);
+
+ DecodeSynchronizerTest()
+ : time_controller_(Timestamp::Millis(1337)),
+ clock_(time_controller_.GetClock()),
+ metronome_(kTickPeriod),
+ decode_synchronizer_(clock_, &metronome_, run_loop_.task_queue()) {}
+
+ protected:
+ GlobalSimulatedTimeController time_controller_;
+ Clock* clock_;
+ test::RunLoop run_loop_;
+ test::ForcedTickMetronome metronome_;
+ DecodeSynchronizer decode_synchronizer_;
+};
+
+TEST_F(DecodeSynchronizerTest, AllFramesReadyBeforeNextTickDecoded) {
+ ::testing::MockFunction<void(uint32_t, Timestamp)> mock_callback1;
+ auto scheduler1 = decode_synchronizer_.CreateSynchronizedFrameScheduler();
+
+ testing::MockFunction<void(unsigned int, Timestamp)> mock_callback2;
+ auto scheduler2 = decode_synchronizer_.CreateSynchronizedFrameScheduler();
+
+ {
+ uint32_t frame_rtp = 90000;
+ FrameDecodeTiming::FrameSchedule frame_sched{
+ .latest_decode_time =
+ clock_->CurrentTime() + kTickPeriod - TimeDelta::Millis(3),
+ .render_time = clock_->CurrentTime() + TimeDelta::Millis(60)};
+ scheduler1->ScheduleFrame(frame_rtp, frame_sched,
+ mock_callback1.AsStdFunction());
+ EXPECT_CALL(mock_callback1,
+ Call(Eq(frame_rtp), Eq(frame_sched.render_time)));
+ }
+ {
+ uint32_t frame_rtp = 123456;
+ FrameDecodeTiming::FrameSchedule frame_sched{
+ .latest_decode_time =
+ clock_->CurrentTime() + kTickPeriod - TimeDelta::Millis(2),
+ .render_time = clock_->CurrentTime() + TimeDelta::Millis(70)};
+ scheduler2->ScheduleFrame(frame_rtp, frame_sched,
+ mock_callback2.AsStdFunction());
+ EXPECT_CALL(mock_callback2,
+ Call(Eq(frame_rtp), Eq(frame_sched.render_time)));
+ }
+ metronome_.Tick();
+ run_loop_.Flush();
+
+ // Cleanup
+ scheduler1->Stop();
+ scheduler2->Stop();
+}
+
+TEST_F(DecodeSynchronizerTest, FramesNotDecodedIfDecodeTimeIsInNextInterval) {
+ ::testing::MockFunction<void(unsigned int, Timestamp)> mock_callback;
+ auto scheduler = decode_synchronizer_.CreateSynchronizedFrameScheduler();
+
+ uint32_t frame_rtp = 90000;
+ FrameDecodeTiming::FrameSchedule frame_sched{
+ .latest_decode_time =
+ clock_->CurrentTime() + kTickPeriod + TimeDelta::Millis(10),
+ .render_time =
+ clock_->CurrentTime() + kTickPeriod + TimeDelta::Millis(30)};
+ scheduler->ScheduleFrame(frame_rtp, frame_sched,
+ mock_callback.AsStdFunction());
+
+ metronome_.Tick();
+ run_loop_.Flush();
+ // No decodes should have happened in this tick.
+ ::testing::Mock::VerifyAndClearExpectations(&mock_callback);
+
+ // Decode should happen on next tick.
+ EXPECT_CALL(mock_callback, Call(Eq(frame_rtp), Eq(frame_sched.render_time)));
+ time_controller_.AdvanceTime(kTickPeriod);
+ metronome_.Tick();
+ run_loop_.Flush();
+
+ // Cleanup
+ scheduler->Stop();
+}
+
+TEST_F(DecodeSynchronizerTest, FrameDecodedOnce) {
+ ::testing::MockFunction<void(unsigned int, Timestamp)> mock_callback;
+ auto scheduler = decode_synchronizer_.CreateSynchronizedFrameScheduler();
+
+ uint32_t frame_rtp = 90000;
+ FrameDecodeTiming::FrameSchedule frame_sched{
+ .latest_decode_time = clock_->CurrentTime() + TimeDelta::Millis(30),
+ .render_time = clock_->CurrentTime() + TimeDelta::Millis(60)};
+ scheduler->ScheduleFrame(frame_rtp, frame_sched,
+ mock_callback.AsStdFunction());
+ EXPECT_CALL(mock_callback, Call(_, _)).Times(1);
+ metronome_.Tick();
+ run_loop_.Flush();
+ ::testing::Mock::VerifyAndClearExpectations(&mock_callback);
+
+ // Trigger tick again. No frame should be decoded now.
+ time_controller_.AdvanceTime(kTickPeriod);
+ metronome_.Tick();
+ run_loop_.Flush();
+
+ // Cleanup
+ scheduler->Stop();
+}
+
+TEST_F(DecodeSynchronizerTest, FrameWithDecodeTimeInPastDecodedImmediately) {
+ ::testing::MockFunction<void(unsigned int, Timestamp)> mock_callback;
+ auto scheduler = decode_synchronizer_.CreateSynchronizedFrameScheduler();
+
+ uint32_t frame_rtp = 90000;
+ FrameDecodeTiming::FrameSchedule frame_sched{
+ .latest_decode_time = clock_->CurrentTime() - TimeDelta::Millis(5),
+ .render_time = clock_->CurrentTime() + TimeDelta::Millis(30)};
+ EXPECT_CALL(mock_callback, Call(Eq(90000u), _)).Times(1);
+ scheduler->ScheduleFrame(frame_rtp, frame_sched,
+ mock_callback.AsStdFunction());
+ // Verify the callback was invoked already.
+ ::testing::Mock::VerifyAndClearExpectations(&mock_callback);
+
+ metronome_.Tick();
+ run_loop_.Flush();
+
+ // Cleanup
+ scheduler->Stop();
+}
+
+TEST_F(DecodeSynchronizerTest,
+ FrameWithDecodeTimeFarBeforeNextTickDecodedImmediately) {
+ ::testing::MockFunction<void(unsigned int, Timestamp)> mock_callback;
+ auto scheduler = decode_synchronizer_.CreateSynchronizedFrameScheduler();
+
+ // Frame which would be behind by more than kMaxAllowedFrameDelay after
+ // the next tick.
+ FrameDecodeTiming::FrameSchedule frame_sched{
+ .latest_decode_time = clock_->CurrentTime() + kTickPeriod -
+ FrameDecodeTiming::kMaxAllowedFrameDelay -
+ TimeDelta::Millis(1),
+ .render_time = clock_->CurrentTime() + TimeDelta::Millis(30)};
+ EXPECT_CALL(mock_callback, Call(Eq(90000u), _)).Times(1);
+ scheduler->ScheduleFrame(90000, frame_sched, mock_callback.AsStdFunction());
+ // Verify the callback was invoked already.
+ ::testing::Mock::VerifyAndClearExpectations(&mock_callback);
+
+ time_controller_.AdvanceTime(kTickPeriod);
+ metronome_.Tick();
+ run_loop_.Flush();
+
+ // A frame that would be behind by exactly kMaxAllowedFrameDelay after next
+ // tick should decode at the next tick.
+ FrameDecodeTiming::FrameSchedule queued_frame{
+ .latest_decode_time = clock_->CurrentTime() + kTickPeriod -
+ FrameDecodeTiming::kMaxAllowedFrameDelay,
+ .render_time = clock_->CurrentTime() + TimeDelta::Millis(30)};
+ scheduler->ScheduleFrame(180000, queued_frame, mock_callback.AsStdFunction());
+ // Verify the callback was invoked already.
+ ::testing::Mock::VerifyAndClearExpectations(&mock_callback);
+
+ EXPECT_CALL(mock_callback, Call(Eq(180000u), _)).Times(1);
+ time_controller_.AdvanceTime(kTickPeriod);
+ metronome_.Tick();
+ run_loop_.Flush();
+
+ // Cleanup
+ scheduler->Stop();
+}
+
+TEST_F(DecodeSynchronizerTest, FramesNotReleasedAfterStop) {
+ ::testing::MockFunction<void(unsigned int, Timestamp)> mock_callback;
+ auto scheduler = decode_synchronizer_.CreateSynchronizedFrameScheduler();
+
+ uint32_t frame_rtp = 90000;
+ FrameDecodeTiming::FrameSchedule frame_sched{
+ .latest_decode_time = clock_->CurrentTime() + TimeDelta::Millis(30),
+ .render_time = clock_->CurrentTime() + TimeDelta::Millis(60)};
+ scheduler->ScheduleFrame(frame_rtp, frame_sched,
+ mock_callback.AsStdFunction());
+ // Cleanup
+ scheduler->Stop();
+
+ // No callback should occur on this tick since Stop() was called before.
+ metronome_.Tick();
+ run_loop_.Flush();
+}
+
+TEST_F(DecodeSynchronizerTest, MetronomeNotListenedWhenNoStreamsAreActive) {
+ EXPECT_EQ(0u, metronome_.NumListeners());
+
+ auto scheduler = decode_synchronizer_.CreateSynchronizedFrameScheduler();
+ EXPECT_EQ(1u, metronome_.NumListeners());
+ auto scheduler2 = decode_synchronizer_.CreateSynchronizedFrameScheduler();
+ EXPECT_EQ(1u, metronome_.NumListeners());
+
+ scheduler->Stop();
+ EXPECT_EQ(1u, metronome_.NumListeners());
+ scheduler2->Stop();
+ EXPECT_EQ(0u, metronome_.NumListeners());
+}
+
+} // namespace webrtc
diff --git a/video/frame_buffer_proxy.cc b/video/frame_buffer_proxy.cc
index 3182bf9..d294eeb 100644
--- a/video/frame_buffer_proxy.cc
+++ b/video/frame_buffer_proxy.cc
@@ -14,20 +14,24 @@
#include <memory>
#include <utility>
+#include "absl/base/attributes.h"
#include "absl/functional/bind_front.h"
#include "api/sequence_checker.h"
#include "modules/video_coding/frame_buffer2.h"
#include "modules/video_coding/frame_buffer3.h"
#include "modules/video_coding/frame_helpers.h"
+#include "rtc_base/checks.h"
#include "rtc_base/logging.h"
#include "rtc_base/thread_annotations.h"
#include "system_wrappers/include/field_trial.h"
-#include "video/frame_decode_scheduler.h"
#include "video/frame_decode_timing.h"
+#include "video/task_queue_frame_decode_scheduler.h"
#include "video/video_receive_stream_timeout_tracker.h"
namespace webrtc {
+namespace {
+
class FrameBuffer2Proxy : public FrameBufferProxy {
public:
FrameBuffer2Proxy(Clock* clock,
@@ -140,14 +144,16 @@
// accounting are moved into this pro
class FrameBuffer3Proxy : public FrameBufferProxy {
public:
- FrameBuffer3Proxy(Clock* clock,
- TaskQueueBase* worker_queue,
- VCMTiming* timing,
- VCMReceiveStatisticsCallback* stats_proxy,
- rtc::TaskQueue* decode_queue,
- FrameSchedulingReceiver* receiver,
- TimeDelta max_wait_for_keyframe,
- TimeDelta max_wait_for_frame)
+ FrameBuffer3Proxy(
+ Clock* clock,
+ TaskQueueBase* worker_queue,
+ VCMTiming* timing,
+ VCMReceiveStatisticsCallback* stats_proxy,
+ rtc::TaskQueue* decode_queue,
+ FrameSchedulingReceiver* receiver,
+ TimeDelta max_wait_for_keyframe,
+ TimeDelta max_wait_for_frame,
+ std::unique_ptr<FrameDecodeScheduler> frame_decode_scheduler)
: max_wait_for_keyframe_(max_wait_for_keyframe),
max_wait_for_frame_(max_wait_for_frame),
clock_(clock),
@@ -156,15 +162,11 @@
stats_proxy_(stats_proxy),
receiver_(receiver),
timing_(timing),
+ frame_decode_scheduler_(std::move(frame_decode_scheduler)),
jitter_estimator_(clock_),
inter_frame_delay_(clock_->TimeInMilliseconds()),
buffer_(std::make_unique<FrameBuffer>(kMaxFramesBuffered,
kMaxFramesHistory)),
- frame_decode_scheduler_(
- clock_,
- worker_queue,
- absl::bind_front(&FrameBuffer3Proxy::OnFrameReadyForExtraction,
- this)),
decode_timing_(clock_, timing_),
timeout_tracker_(clock_,
worker_queue_,
@@ -181,6 +183,7 @@
RTC_DCHECK(timing_);
RTC_DCHECK(worker_queue_);
RTC_DCHECK(clock_);
+ RTC_DCHECK(frame_decode_scheduler_);
RTC_LOG(LS_WARNING) << "Using FrameBuffer3";
ParseFieldTrial({&zero_playout_delay_max_decode_queue_size_},
@@ -190,8 +193,8 @@
// FrameBufferProxy implementation.
void StopOnWorker() override {
RTC_DCHECK_RUN_ON(&worker_sequence_checker_);
+ frame_decode_scheduler_->Stop();
timeout_tracker_.Stop();
- frame_decode_scheduler_.CancelOutstanding();
decoder_ready_for_new_frame_ = false;
decode_queue_->PostTask([this] {
RTC_DCHECK_RUN_ON(decode_queue_);
@@ -209,7 +212,7 @@
stats_proxy_->OnDroppedFrames(buffer_->CurrentSize());
buffer_ =
std::make_unique<FrameBuffer>(kMaxFramesBuffered, kMaxFramesHistory);
- frame_decode_scheduler_.CancelOutstanding();
+ frame_decode_scheduler_->CancelOutstanding();
}
absl::optional<int64_t> InsertFrame(
@@ -342,8 +345,7 @@
}
private:
- void OnFrameReadyForExtraction(uint32_t rtp_timestamp,
- Timestamp render_time) {
+ void FrameReadyForDecode(uint32_t rtp_timestamp, Timestamp render_time) {
RTC_DCHECK_RUN_ON(&worker_sequence_checker_);
RTC_DCHECK(buffer_->NextDecodableTemporalUnitRtpTimestamp() ==
rtp_timestamp)
@@ -430,7 +432,7 @@
auto last_rtp = *buffer_->LastDecodableTemporalUnitRtpTimestamp();
// If already scheduled then abort.
- if (frame_decode_scheduler_.scheduled_rtp() ==
+ if (frame_decode_scheduler_->ScheduledRtpTimestamp() ==
buffer_->NextDecodableTemporalUnitRtpTimestamp())
return;
@@ -441,9 +443,11 @@
IsTooManyFramesQueued());
if (schedule) {
// Don't schedule if already waiting for the same frame.
- if (frame_decode_scheduler_.scheduled_rtp() != next_rtp) {
- frame_decode_scheduler_.CancelOutstanding();
- frame_decode_scheduler_.ScheduleFrame(next_rtp, *schedule);
+ if (frame_decode_scheduler_->ScheduledRtpTimestamp() != next_rtp) {
+ frame_decode_scheduler_->CancelOutstanding();
+ frame_decode_scheduler_->ScheduleFrame(
+ next_rtp, *schedule,
+ absl::bind_front(&FrameBuffer3Proxy::FrameReadyForDecode, this));
}
return;
}
@@ -463,6 +467,8 @@
VCMReceiveStatisticsCallback* const stats_proxy_;
FrameSchedulingReceiver* const receiver_;
VCMTiming* const timing_;
+ const std::unique_ptr<FrameDecodeScheduler> frame_decode_scheduler_
+ RTC_GUARDED_BY(&worker_sequence_checker_);
VCMJitterEstimator jitter_estimator_
RTC_GUARDED_BY(&worker_sequence_checker_);
@@ -471,8 +477,6 @@
bool keyframe_required_ RTC_GUARDED_BY(&worker_sequence_checker_) = false;
std::unique_ptr<FrameBuffer> buffer_
RTC_GUARDED_BY(&worker_sequence_checker_);
- FrameDecodeScheduler frame_decode_scheduler_
- RTC_GUARDED_BY(&worker_sequence_checker_);
FrameDecodeTiming decode_timing_ RTC_GUARDED_BY(&worker_sequence_checker_);
VideoReceiveStreamTimeoutTracker timeout_tracker_
RTC_GUARDED_BY(&worker_sequence_checker_);
@@ -500,6 +504,28 @@
ScopedTaskSafety worker_safety_;
};
+enum class FrameBufferArm {
+ kFrameBuffer2,
+ kFrameBuffer3,
+ kSyncDecode,
+};
+
+constexpr const char* kFrameBufferFieldTrial = "WebRTC-FrameBuffer3";
+
+FrameBufferArm ParseFrameBufferFieldTrial() {
+ webrtc::FieldTrialEnum<FrameBufferArm> arm(
+ "arm", FrameBufferArm::kFrameBuffer2,
+ {
+ {"FrameBuffer2", FrameBufferArm::kFrameBuffer2},
+ {"FrameBuffer3", FrameBufferArm::kFrameBuffer3},
+ {"SyncDecoding", FrameBufferArm::kSyncDecode},
+ });
+ ParseFieldTrial({&arm}, field_trial::FindFullName(kFrameBufferFieldTrial));
+ return arm.Get();
+}
+
+} // namespace
+
std::unique_ptr<FrameBufferProxy> FrameBufferProxy::CreateFromFieldTrial(
Clock* clock,
TaskQueueBase* worker_queue,
@@ -508,14 +534,39 @@
rtc::TaskQueue* decode_queue,
FrameSchedulingReceiver* receiver,
TimeDelta max_wait_for_keyframe,
- TimeDelta max_wait_for_frame) {
- if (field_trial::IsEnabled("WebRTC-FrameBuffer3"))
- return std::make_unique<FrameBuffer3Proxy>(
- clock, worker_queue, timing, stats_proxy, decode_queue, receiver,
- max_wait_for_keyframe, max_wait_for_frame);
- return std::make_unique<FrameBuffer2Proxy>(
- clock, timing, stats_proxy, decode_queue, receiver, max_wait_for_keyframe,
- max_wait_for_frame);
+ TimeDelta max_wait_for_frame,
+ DecodeSynchronizer* decode_sync) {
+ switch (ParseFrameBufferFieldTrial()) {
+ case FrameBufferArm::kFrameBuffer3: {
+ auto scheduler =
+ std::make_unique<TaskQueueFrameDecodeScheduler>(clock, worker_queue);
+ return std::make_unique<FrameBuffer3Proxy>(
+ clock, worker_queue, timing, stats_proxy, decode_queue, receiver,
+ max_wait_for_keyframe, max_wait_for_frame, std::move(scheduler));
+ }
+ case FrameBufferArm::kSyncDecode: {
+ std::unique_ptr<FrameDecodeScheduler> scheduler;
+ if (decode_sync) {
+ scheduler = decode_sync->CreateSynchronizedFrameScheduler();
+ } else {
+ RTC_LOG(LS_ERROR) << "In FrameBuffer with sync decode trial, but "
+ "no DecodeSynchronizer was present!";
+ // Crash in debug, but in production use the task queue scheduler.
+ RTC_DCHECK_NOTREACHED();
+ scheduler = std::make_unique<TaskQueueFrameDecodeScheduler>(
+ clock, worker_queue);
+ }
+ return std::make_unique<FrameBuffer3Proxy>(
+ clock, worker_queue, timing, stats_proxy, decode_queue, receiver,
+ max_wait_for_keyframe, max_wait_for_frame, std::move(scheduler));
+ }
+ case FrameBufferArm::kFrameBuffer2:
+ ABSL_FALLTHROUGH_INTENDED;
+ default:
+ return std::make_unique<FrameBuffer2Proxy>(
+ clock, timing, stats_proxy, decode_queue, receiver,
+ max_wait_for_keyframe, max_wait_for_frame);
+ }
}
} // namespace webrtc
diff --git a/video/frame_buffer_proxy.h b/video/frame_buffer_proxy.h
index cab0bd2..b419aed 100644
--- a/video/frame_buffer_proxy.h
+++ b/video/frame_buffer_proxy.h
@@ -13,12 +13,15 @@
#include <memory>
+#include "api/metronome/metronome.h"
#include "api/task_queue/task_queue_base.h"
#include "api/video/encoded_frame.h"
#include "modules/video_coding/include/video_coding_defines.h"
#include "modules/video_coding/timing.h"
#include "rtc_base/task_queue.h"
#include "system_wrappers/include/clock.h"
+#include "video/decode_synchronizer.h"
+
namespace webrtc {
class FrameSchedulingReceiver {
@@ -43,7 +46,8 @@
rtc::TaskQueue* decode_queue,
FrameSchedulingReceiver* receiver,
TimeDelta max_wait_for_keyframe,
- TimeDelta max_wait_for_frame);
+ TimeDelta max_wait_for_frame,
+ DecodeSynchronizer* decode_sync);
virtual ~FrameBufferProxy() = default;
// Run on the worker thread.
diff --git a/video/frame_buffer_proxy_unittest.cc b/video/frame_buffer_proxy_unittest.cc
index 408fac9..fc266d2 100644
--- a/video/frame_buffer_proxy_unittest.cc
+++ b/video/frame_buffer_proxy_unittest.cc
@@ -20,6 +20,7 @@
#include "absl/types/optional.h"
#include "absl/types/variant.h"
+#include "api/metronome/test/fake_metronome.h"
#include "api/units/frequency.h"
#include "api/units/time_delta.h"
#include "api/units/timestamp.h"
@@ -32,6 +33,7 @@
#include "test/gtest.h"
#include "test/run_loop.h"
#include "test/time_controller/simulated_time_controller.h"
+#include "video/decode_synchronizer.h"
using ::testing::_;
using ::testing::AllOf;
@@ -209,6 +211,9 @@
decode_queue_(time_controller_.GetTaskQueueFactory()->CreateTaskQueue(
"decode_queue",
TaskQueueFactory::Priority::NORMAL)),
+ fake_metronome_(time_controller_.GetTaskQueueFactory(),
+ TimeDelta::Millis(16)),
+ decode_sync_(clock_, &fake_metronome_, run_loop_.task_queue()),
timing_(clock_),
proxy_(FrameBufferProxy::CreateFromFieldTrial(clock_,
run_loop_.task_queue(),
@@ -217,7 +222,8 @@
&decode_queue_,
this,
kMaxWaitForKeyframe,
- kMaxWaitForFrame)) {
+ kMaxWaitForFrame,
+ &decode_sync_)) {
// Avoid starting with negative render times.
timing_.set_min_playout_delay(10);
@@ -230,6 +236,8 @@
if (proxy_) {
proxy_->StopOnWorker();
}
+ fake_metronome_.Stop();
+ time_controller_.AdvanceTime(TimeDelta::Zero());
}
void OnEncodedFrame(std::unique_ptr<EncodedFrame> frame) override {
@@ -291,7 +299,10 @@
Clock* const clock_;
test::RunLoop run_loop_;
rtc::TaskQueue decode_queue_;
+ test::FakeMetronome fake_metronome_;
+ DecodeSynchronizer decode_sync_;
VCMTiming timing_;
+
::testing::NiceMock<VCMReceiveStatisticsCallbackMock> stats_callback_;
std::unique_ptr<FrameBufferProxy> proxy_;
@@ -544,7 +555,7 @@
proxy_->InsertFrame(Builder().Id(0).Time(0).AsLast().Build());
EXPECT_THAT(WaitForFrameOrTimeout(TimeDelta::Zero()), Frame(WithId(0)));
- time_controller_.AdvanceTime(kFps30Delay);
+ time_controller_.AdvanceTime(kFps30Delay / 2);
proxy_->InsertFrame(
Builder().Id(1).Time(kFps30Rtp).Refs({0}).AsLast().Build());
StartNextDecode();
@@ -598,6 +609,8 @@
StartNextDecode();
EXPECT_THAT(WaitForFrameOrTimeout(kFps30Delay), Frame(WithId(33)));
+ StartNextDecode();
+ EXPECT_THAT(WaitForFrameOrTimeout(kMaxWaitForFrame), TimedOut());
EXPECT_EQ(dropped_frames(), 1);
}
@@ -617,6 +630,10 @@
time_controller_.AdvanceTime(TimeDelta::Zero());
}
+// Note: This test takes a long time to run if the fake metronome is active.
+// Since the test needs to wait for the timestamp to rollover, it has a fake
+// delay of around 6.5 hours. Even though time is simulated, this will be
+// around 1,500,000 metronome tick invocations.
TEST_P(FrameBufferProxyTest, NextFrameWithOldTimestamp) {
// Test inserting 31 frames and pause the stream for a long time before
// frame 32.
@@ -668,16 +685,19 @@
.AsLast()
.Build());
// FrameBuffer2 drops the frame, while FrameBuffer3 will continue the stream.
- if (field_trial::IsEnabled("WebRTC-FrameBuffer3")) {
+ if (field_trial::FindFullName("WebRTC-FrameBuffer3")
+ .find("arm:FrameBuffer2") == std::string::npos) {
EXPECT_THAT(WaitForFrameOrTimeout(kFps30Delay), Frame(WithId(2)));
} else {
EXPECT_THAT(WaitForFrameOrTimeout(kMaxWaitForFrame), TimedOut());
}
}
-INSTANTIATE_TEST_SUITE_P(FrameBufferProxy,
- FrameBufferProxyTest,
- ::testing::Values("WebRTC-FrameBuffer3/Disabled/",
- "WebRTC-FrameBuffer3/Enabled/"));
+INSTANTIATE_TEST_SUITE_P(
+ FrameBufferProxy,
+ FrameBufferProxyTest,
+ ::testing::Values("WebRTC-FrameBuffer3/arm:FrameBuffer2/",
+ "WebRTC-FrameBuffer3/arm:FrameBuffer3/",
+ "WebRTC-FrameBuffer3/arm:SyncDecoding/"));
} // namespace webrtc
diff --git a/video/frame_decode_scheduler.cc b/video/frame_decode_scheduler.cc
deleted file mode 100644
index 5696e10..0000000
--- a/video/frame_decode_scheduler.cc
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Copyright (c) 2022 The WebRTC project authors. All Rights Reserved.
- *
- * Use of this source code is governed by a BSD-style license
- * that can be found in the LICENSE file in the root of the source
- * tree. An additional intellectual property rights grant can be found
- * in the file PATENTS. All contributing project authors may
- * be found in the AUTHORS file in the root of the source tree.
- */
-
-#include "video/frame_decode_scheduler.h"
-
-#include <algorithm>
-#include <utility>
-
-#include "api/sequence_checker.h"
-#include "rtc_base/task_utils/to_queued_task.h"
-
-namespace webrtc {
-
-FrameDecodeScheduler::FrameDecodeScheduler(
- Clock* clock,
- TaskQueueBase* const bookkeeping_queue,
- FrameReleaseCallback callback)
- : clock_(clock),
- bookkeeping_queue_(bookkeeping_queue),
- callback_(std::move(callback)) {
- RTC_DCHECK(clock_);
- RTC_DCHECK(bookkeeping_queue_);
-}
-
-FrameDecodeScheduler::~FrameDecodeScheduler() {
- RTC_DCHECK(!scheduled_rtp_) << "Outstanding scheduled rtp=" << *scheduled_rtp_
- << ". Call CancelOutstanding before destruction.";
-}
-
-void FrameDecodeScheduler::ScheduleFrame(
- uint32_t rtp,
- FrameDecodeTiming::FrameSchedule schedule) {
- RTC_DCHECK(!scheduled_rtp_.has_value())
- << "Can not schedule two frames for release at the same time.";
- scheduled_rtp_ = rtp;
-
- TimeDelta wait = std::max(TimeDelta::Zero(),
- schedule.max_decode_time - clock_->CurrentTime());
- bookkeeping_queue_->PostDelayedTask(
- ToQueuedTask(task_safety_.flag(),
- [this, rtp, schedule] {
- RTC_DCHECK_RUN_ON(bookkeeping_queue_);
- // If the next frame rtp has changed since this task was
- // this scheduled release should be skipped.
- if (scheduled_rtp_ != rtp)
- return;
- scheduled_rtp_ = absl::nullopt;
- callback_(rtp, schedule.render_time);
- }),
- wait.ms());
-}
-
-void FrameDecodeScheduler::CancelOutstanding() {
- scheduled_rtp_ = absl::nullopt;
-}
-
-} // namespace webrtc
diff --git a/video/frame_decode_scheduler.h b/video/frame_decode_scheduler.h
index 6e1c1bd..5387e54 100644
--- a/video/frame_decode_scheduler.h
+++ b/video/frame_decode_scheduler.h
@@ -16,10 +16,7 @@
#include <functional>
#include "absl/types/optional.h"
-#include "api/task_queue/task_queue_base.h"
#include "api/units/timestamp.h"
-#include "rtc_base/task_utils/pending_task_safety_flag.h"
-#include "system_wrappers/include/clock.h"
#include "video/frame_decode_timing.h"
namespace webrtc {
@@ -30,25 +27,23 @@
using FrameReleaseCallback =
std::function<void(uint32_t rtp_timestamp, Timestamp render_time)>;
- FrameDecodeScheduler(Clock* clock,
- TaskQueueBase* const bookkeeping_queue,
- FrameReleaseCallback callback);
- ~FrameDecodeScheduler();
- FrameDecodeScheduler(const FrameDecodeScheduler&) = delete;
- FrameDecodeScheduler& operator=(const FrameDecodeScheduler&) = delete;
+ virtual ~FrameDecodeScheduler() = default;
- absl::optional<uint32_t> scheduled_rtp() const { return scheduled_rtp_; }
+ // Returns the rtp timestamp of the next frame scheduled for release, or
+ // `nullopt` if no frame is currently scheduled.
+ virtual absl::optional<uint32_t> ScheduledRtpTimestamp() = 0;
- void ScheduleFrame(uint32_t rtp, FrameDecodeTiming::FrameSchedule schedule);
- void CancelOutstanding();
+ // Shedules a frame for release based on `schedule`. When released, `callback`
+ // will be invoked with the `rtp` timestamp of the frame and the `render_time`
+ virtual void ScheduleFrame(uint32_t rtp,
+ FrameDecodeTiming::FrameSchedule schedule,
+ FrameReleaseCallback callback) = 0;
- private:
- Clock* const clock_;
- TaskQueueBase* const bookkeeping_queue_;
- const FrameReleaseCallback callback_;
+ // Cancels all scheduled frames.
+ virtual void CancelOutstanding() = 0;
- absl::optional<uint32_t> scheduled_rtp_;
- ScopedTaskSafetyDetached task_safety_;
+ // Stop() Must be called before destruction.
+ virtual void Stop() = 0;
};
} // namespace webrtc
diff --git a/video/frame_decode_timing.cc b/video/frame_decode_timing.cc
index 7150bbc..02567ba 100644
--- a/video/frame_decode_timing.cc
+++ b/video/frame_decode_timing.cc
@@ -15,12 +15,6 @@
namespace webrtc {
-namespace {
-
-constexpr TimeDelta kMaxAllowedFrameDelay = TimeDelta::Millis(5);
-
-}
-
FrameDecodeTiming::FrameDecodeTiming(Clock* clock,
webrtc::VCMTiming const* timing)
: clock_(clock), timing_(timing) {
@@ -51,7 +45,7 @@
RTC_DLOG(LS_VERBOSE) << "Selected frame with rtp " << next_temporal_unit_rtp
<< " render time " << render_time.ms()
<< " with a max wait of " << max_wait.ms() << "ms";
- return FrameSchedule{.max_decode_time = now + max_wait,
+ return FrameSchedule{.latest_decode_time = now + max_wait,
.render_time = render_time};
}
diff --git a/video/frame_decode_timing.h b/video/frame_decode_timing.h
index 8c7353e..ff67ace 100644
--- a/video/frame_decode_timing.h
+++ b/video/frame_decode_timing.h
@@ -29,8 +29,12 @@
FrameDecodeTiming(const FrameDecodeTiming&) = delete;
FrameDecodeTiming& operator=(const FrameDecodeTiming&) = delete;
+ // Any frame that has decode delay more than this in the past can be
+ // fast-forwarded.
+ static constexpr TimeDelta kMaxAllowedFrameDelay = TimeDelta::Millis(5);
+
struct FrameSchedule {
- Timestamp max_decode_time;
+ Timestamp latest_decode_time;
Timestamp render_time;
};
diff --git a/video/frame_decode_timing_unittest.cc b/video/frame_decode_timing_unittest.cc
index ec77ed4..1932e85 100644
--- a/video/frame_decode_timing_unittest.cc
+++ b/video/frame_decode_timing_unittest.cc
@@ -81,10 +81,11 @@
EXPECT_THAT(
frame_decode_scheduler_.OnFrameBufferUpdated(90000, 180000, false),
- Optional(AllOf(Field(&FrameDecodeTiming::FrameSchedule::max_decode_time,
- Eq(clock_.CurrentTime() + decode_delay)),
- Field(&FrameDecodeTiming::FrameSchedule::render_time,
- Eq(render_time)))));
+ Optional(
+ AllOf(Field(&FrameDecodeTiming::FrameSchedule::latest_decode_time,
+ Eq(clock_.CurrentTime() + decode_delay)),
+ Field(&FrameDecodeTiming::FrameSchedule::render_time,
+ Eq(render_time)))));
}
TEST_F(FrameDecodeTimingTest, FastForwardsFrameTooFarInThePast) {
@@ -102,12 +103,12 @@
const Timestamp render_time = clock_.CurrentTime();
timing_.SetTimes(90000, render_time, decode_delay);
- EXPECT_THAT(
- frame_decode_scheduler_.OnFrameBufferUpdated(90000, 90000, false),
- Optional(AllOf(Field(&FrameDecodeTiming::FrameSchedule::max_decode_time,
- Eq(clock_.CurrentTime() + decode_delay)),
- Field(&FrameDecodeTiming::FrameSchedule::render_time,
- Eq(render_time)))));
+ EXPECT_THAT(frame_decode_scheduler_.OnFrameBufferUpdated(90000, 90000, false),
+ Optional(AllOf(
+ Field(&FrameDecodeTiming::FrameSchedule::latest_decode_time,
+ Eq(clock_.CurrentTime() + decode_delay)),
+ Field(&FrameDecodeTiming::FrameSchedule::render_time,
+ Eq(render_time)))));
}
} // namespace webrtc
diff --git a/video/task_queue_frame_decode_scheduler.cc b/video/task_queue_frame_decode_scheduler.cc
new file mode 100644
index 0000000..72de3c3
--- /dev/null
+++ b/video/task_queue_frame_decode_scheduler.cc
@@ -0,0 +1,76 @@
+/*
+ * Copyright (c) 2022 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "video/task_queue_frame_decode_scheduler.h"
+
+#include <algorithm>
+#include <utility>
+
+#include "api/sequence_checker.h"
+#include "rtc_base/checks.h"
+#include "rtc_base/task_utils/to_queued_task.h"
+
+namespace webrtc {
+
+TaskQueueFrameDecodeScheduler::TaskQueueFrameDecodeScheduler(
+ Clock* clock,
+ TaskQueueBase* const bookkeeping_queue)
+ : clock_(clock), bookkeeping_queue_(bookkeeping_queue) {
+ RTC_DCHECK(clock_);
+ RTC_DCHECK(bookkeeping_queue_);
+}
+
+TaskQueueFrameDecodeScheduler::~TaskQueueFrameDecodeScheduler() {
+ RTC_DCHECK(stopped_);
+ RTC_DCHECK(!scheduled_rtp_) << "Outstanding scheduled rtp=" << *scheduled_rtp_
+ << ". Call CancelOutstanding before destruction.";
+}
+
+void TaskQueueFrameDecodeScheduler::ScheduleFrame(
+ uint32_t rtp,
+ FrameDecodeTiming::FrameSchedule schedule,
+ FrameReleaseCallback cb) {
+ RTC_DCHECK(!stopped_) << "Can not schedule frames after stopped.";
+ RTC_DCHECK(!scheduled_rtp_.has_value())
+ << "Can not schedule two frames for release at the same time.";
+ RTC_DCHECK(cb);
+ scheduled_rtp_ = rtp;
+
+ TimeDelta wait = std::max(
+ TimeDelta::Zero(), schedule.latest_decode_time - clock_->CurrentTime());
+ bookkeeping_queue_->PostDelayedTask(
+ ToQueuedTask(task_safety_.flag(),
+ [this, rtp, schedule, cb = std::move(cb)] {
+ RTC_DCHECK_RUN_ON(bookkeeping_queue_);
+ // If the next frame rtp has changed since this task was
+ // this scheduled release should be skipped.
+ if (scheduled_rtp_ != rtp)
+ return;
+ scheduled_rtp_ = absl::nullopt;
+ cb(rtp, schedule.render_time);
+ }),
+ wait.ms());
+}
+
+void TaskQueueFrameDecodeScheduler::CancelOutstanding() {
+ scheduled_rtp_ = absl::nullopt;
+}
+
+absl::optional<uint32_t>
+TaskQueueFrameDecodeScheduler::ScheduledRtpTimestamp() {
+ return scheduled_rtp_;
+}
+
+void TaskQueueFrameDecodeScheduler::Stop() {
+ CancelOutstanding();
+ stopped_ = true;
+}
+
+} // namespace webrtc
diff --git a/video/task_queue_frame_decode_scheduler.h b/video/task_queue_frame_decode_scheduler.h
new file mode 100644
index 0000000..69c6dae
--- /dev/null
+++ b/video/task_queue_frame_decode_scheduler.h
@@ -0,0 +1,48 @@
+/*
+ * Copyright (c) 2022 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#ifndef VIDEO_TASK_QUEUE_FRAME_DECODE_SCHEDULER_H_
+#define VIDEO_TASK_QUEUE_FRAME_DECODE_SCHEDULER_H_
+
+#include "video/frame_decode_scheduler.h"
+
+namespace webrtc {
+
+// An implementation of FrameDecodeScheduler that is based on TaskQueues. This
+// is the default implementation for general use.
+class TaskQueueFrameDecodeScheduler : public FrameDecodeScheduler {
+ public:
+ TaskQueueFrameDecodeScheduler(Clock* clock,
+ TaskQueueBase* const bookkeeping_queue);
+ ~TaskQueueFrameDecodeScheduler() override;
+ TaskQueueFrameDecodeScheduler(const TaskQueueFrameDecodeScheduler&) = delete;
+ TaskQueueFrameDecodeScheduler& operator=(
+ const TaskQueueFrameDecodeScheduler&) = delete;
+
+ // FrameDecodeScheduler implementation.
+ absl::optional<uint32_t> ScheduledRtpTimestamp() override;
+ void ScheduleFrame(uint32_t rtp,
+ FrameDecodeTiming::FrameSchedule schedule,
+ FrameReleaseCallback cb) override;
+ void CancelOutstanding() override;
+ void Stop() override;
+
+ private:
+ Clock* const clock_;
+ TaskQueueBase* const bookkeeping_queue_;
+
+ absl::optional<uint32_t> scheduled_rtp_;
+ ScopedTaskSafetyDetached task_safety_;
+ bool stopped_ = false;
+};
+
+} // namespace webrtc
+
+#endif // VIDEO_TASK_QUEUE_FRAME_DECODE_SCHEDULER_H_
diff --git a/video/frame_decode_scheduler_unittest.cc b/video/task_queue_frame_decode_scheduler_unittest.cc
similarity index 60%
rename from video/frame_decode_scheduler_unittest.cc
rename to video/task_queue_frame_decode_scheduler_unittest.cc
index e30863c..2807e65 100644
--- a/video/frame_decode_scheduler_unittest.cc
+++ b/video/task_queue_frame_decode_scheduler_unittest.cc
@@ -8,13 +8,14 @@
* be found in the AUTHORS file in the root of the source tree.
*/
-#include "video/frame_decode_scheduler.h"
+#include "video/task_queue_frame_decode_scheduler.h"
#include <stddef.h>
#include <memory>
#include <utility>
+#include "absl/functional/bind_front.h"
#include "absl/types/optional.h"
#include "api/units/time_delta.h"
#include "api/units/timestamp.h"
@@ -28,29 +29,31 @@
using ::testing::Eq;
using ::testing::Optional;
-class FrameDecodeSchedulerTest : public ::testing::Test {
+class TaskQueueFrameDecodeSchedulerTest : public ::testing::Test {
public:
- FrameDecodeSchedulerTest()
+ TaskQueueFrameDecodeSchedulerTest()
: time_controller_(Timestamp::Millis(2000)),
task_queue_(time_controller_.GetTaskQueueFactory()->CreateTaskQueue(
"scheduler",
TaskQueueFactory::Priority::NORMAL)),
- scheduler_(std::make_unique<FrameDecodeScheduler>(
+ scheduler_(std::make_unique<TaskQueueFrameDecodeScheduler>(
time_controller_.GetClock(),
- task_queue_.Get(),
- [this](uint32_t rtp, Timestamp render_time) {
- OnFrame(rtp, render_time);
- })) {}
+ task_queue_.Get())) {}
- ~FrameDecodeSchedulerTest() override {
+ ~TaskQueueFrameDecodeSchedulerTest() override {
if (scheduler_) {
OnQueue([&] {
- scheduler_->CancelOutstanding();
+ scheduler_->Stop();
scheduler_ = nullptr;
});
}
}
+ void FrameReadyForDecode(uint32_t timestamp, Timestamp render_time) {
+ last_rtp_ = timestamp;
+ last_render_time_ = render_time;
+ }
+
protected:
template <class Task>
void OnQueue(Task&& t) {
@@ -63,23 +66,21 @@
std::unique_ptr<FrameDecodeScheduler> scheduler_;
absl::optional<uint32_t> last_rtp_;
absl::optional<Timestamp> last_render_time_;
-
- private:
- void OnFrame(uint32_t timestamp, Timestamp render_time) {
- last_rtp_ = timestamp;
- last_render_time_ = render_time;
- }
};
-TEST_F(FrameDecodeSchedulerTest, FrameYieldedAfterSpecifiedPeriod) {
+TEST_F(TaskQueueFrameDecodeSchedulerTest, FrameYieldedAfterSpecifiedPeriod) {
constexpr TimeDelta decode_delay = TimeDelta::Millis(5);
const Timestamp now = time_controller_.GetClock()->CurrentTime();
const uint32_t rtp = 90000;
const Timestamp render_time = now + TimeDelta::Millis(15);
+ FrameDecodeTiming::FrameSchedule schedule = {
+ .latest_decode_time = now + decode_delay, .render_time = render_time};
OnQueue([&] {
- scheduler_->ScheduleFrame(rtp, {.max_decode_time = now + decode_delay,
- .render_time = render_time});
- EXPECT_THAT(scheduler_->scheduled_rtp(), Optional(rtp));
+ scheduler_->ScheduleFrame(
+ rtp, schedule,
+ absl::bind_front(
+ &TaskQueueFrameDecodeSchedulerTest::FrameReadyForDecode, this));
+ EXPECT_THAT(scheduler_->ScheduledRtpTimestamp(), Optional(rtp));
});
EXPECT_THAT(last_rtp_, Eq(absl::nullopt));
@@ -88,35 +89,43 @@
EXPECT_THAT(last_render_time_, Optional(render_time));
}
-TEST_F(FrameDecodeSchedulerTest, NegativeDecodeDelayIsRoundedToZero) {
+TEST_F(TaskQueueFrameDecodeSchedulerTest, NegativeDecodeDelayIsRoundedToZero) {
constexpr TimeDelta decode_delay = TimeDelta::Millis(-5);
const Timestamp now = time_controller_.GetClock()->CurrentTime();
const uint32_t rtp = 90000;
const Timestamp render_time = now + TimeDelta::Millis(15);
+ FrameDecodeTiming::FrameSchedule schedule = {
+ .latest_decode_time = now + decode_delay, .render_time = render_time};
OnQueue([&] {
- scheduler_->ScheduleFrame(rtp, {.max_decode_time = now + decode_delay,
- .render_time = render_time});
- EXPECT_THAT(scheduler_->scheduled_rtp(), Optional(rtp));
+ scheduler_->ScheduleFrame(
+ rtp, schedule,
+ absl::bind_front(
+ &TaskQueueFrameDecodeSchedulerTest::FrameReadyForDecode, this));
+ EXPECT_THAT(scheduler_->ScheduledRtpTimestamp(), Optional(rtp));
});
EXPECT_THAT(last_rtp_, Optional(rtp));
EXPECT_THAT(last_render_time_, Optional(render_time));
}
-TEST_F(FrameDecodeSchedulerTest, CancelOutstanding) {
+TEST_F(TaskQueueFrameDecodeSchedulerTest, CancelOutstanding) {
constexpr TimeDelta decode_delay = TimeDelta::Millis(50);
const Timestamp now = time_controller_.GetClock()->CurrentTime();
const uint32_t rtp = 90000;
+ FrameDecodeTiming::FrameSchedule schedule = {
+ .latest_decode_time = now + decode_delay,
+ .render_time = now + TimeDelta::Millis(75)};
OnQueue([&] {
- scheduler_->ScheduleFrame(rtp,
- {.max_decode_time = now + decode_delay,
- .render_time = now + TimeDelta::Millis(75)});
- EXPECT_THAT(scheduler_->scheduled_rtp(), Optional(rtp));
+ scheduler_->ScheduleFrame(
+ rtp, schedule,
+ absl::bind_front(
+ &TaskQueueFrameDecodeSchedulerTest::FrameReadyForDecode, this));
+ EXPECT_THAT(scheduler_->ScheduledRtpTimestamp(), Optional(rtp));
});
time_controller_.AdvanceTime(decode_delay / 2);
OnQueue([&] {
- EXPECT_THAT(scheduler_->scheduled_rtp(), Optional(rtp));
+ EXPECT_THAT(scheduler_->ScheduledRtpTimestamp(), Optional(rtp));
scheduler_->CancelOutstanding();
- EXPECT_THAT(scheduler_->scheduled_rtp(), Eq(absl::nullopt));
+ EXPECT_THAT(scheduler_->ScheduledRtpTimestamp(), Eq(absl::nullopt));
});
time_controller_.AdvanceTime(decode_delay / 2);
EXPECT_THAT(last_rtp_, Eq(absl::nullopt));
diff --git a/video/video_receive_stream2.cc b/video/video_receive_stream2.cc
index 918413f..ef9692e 100644
--- a/video/video_receive_stream2.cc
+++ b/video/video_receive_stream2.cc
@@ -272,7 +272,7 @@
frame_buffer_ = FrameBufferProxy::CreateFromFieldTrial(
clock_, call_->worker_thread(), timing_.get(), &stats_proxy_,
&decode_queue_, this, TimeDelta::Millis(max_wait_for_keyframe_ms_),
- TimeDelta::Millis(max_wait_for_frame_ms_));
+ TimeDelta::Millis(max_wait_for_frame_ms_), nullptr);
if (config_.rtp.rtx_ssrc) {
rtx_receive_stream_ = std::make_unique<RtxReceiveStream>(