VideoStreamEncoder: expect frame entry on the encoder queue.
This change switches the sequence used by the FrameCadenceAdapter
to be the encoder_queue, enabling VideoStreamEncoder::OnFrame to be
invoked directly on the encoder_queue and eliminates the contained
PostTasks.
Bug: chromium:1255737
Change-Id: Ib86fc96ad2be9a38585fef2535855e3f9cc7e57c
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/238171
Commit-Queue: Markus Handell <handellm@webrtc.org>
Reviewed-by: Ilya Nikolaevskiy <ilnik@webrtc.org>
Reviewed-by: Niels Moller <nisse@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#35380}
diff --git a/video/BUILD.gn b/video/BUILD.gn
index 4821699..84b2d98 100644
--- a/video/BUILD.gn
+++ b/video/BUILD.gn
@@ -275,6 +275,7 @@
"../rtc_base/synchronization:mutex",
"../rtc_base/task_utils:pending_task_safety_flag",
"../rtc_base/task_utils:to_queued_task",
+ "../system_wrappers",
"../system_wrappers:field_trial",
"../system_wrappers:metrics",
]
diff --git a/video/frame_cadence_adapter.cc b/video/frame_cadence_adapter.cc
index 030814b..c82ab5a 100644
--- a/video/frame_cadence_adapter.cc
+++ b/video/frame_cadence_adapter.cc
@@ -10,6 +10,7 @@
#include "video/frame_cadence_adapter.h"
+#include <atomic>
#include <memory>
#include <utility>
@@ -20,6 +21,7 @@
#include "rtc_base/synchronization/mutex.h"
#include "rtc_base/task_utils/pending_task_safety_flag.h"
#include "rtc_base/task_utils/to_queued_task.h"
+#include "system_wrappers/include/clock.h"
#include "system_wrappers/include/field_trial.h"
#include "system_wrappers/include/metrics.h"
@@ -28,7 +30,7 @@
class FrameCadenceAdapterImpl : public FrameCadenceAdapterInterface {
public:
- explicit FrameCadenceAdapterImpl(TaskQueueBase* worker_queue);
+ FrameCadenceAdapterImpl(Clock* clock, TaskQueueBase* queue);
// FrameCadenceAdapterInterface overrides.
void Initialize(Callback* callback) override;
@@ -42,12 +44,15 @@
private:
// Called from OnFrame in zero-hertz mode.
- void OnFrameOnMainQueue(const VideoFrame& frame) RTC_RUN_ON(worker_queue_);
+ void OnFrameOnMainQueue(Timestamp post_time,
+ int frames_scheduled_for_processing,
+ const VideoFrame& frame) RTC_RUN_ON(queue_);
// Called to report on constraint UMAs.
- void MaybeReportFrameRateConstraintUmas() RTC_RUN_ON(&worker_queue_);
+ void MaybeReportFrameRateConstraintUmas() RTC_RUN_ON(&queue_);
- TaskQueueBase* const worker_queue_;
+ Clock* const clock_;
+ TaskQueueBase* const queue_;
// True if we support frame entry for screenshare with a minimum frequency of
// 0 Hz.
@@ -58,34 +63,36 @@
// The source's constraints.
absl::optional<VideoTrackSourceConstraints> source_constraints_
- RTC_GUARDED_BY(worker_queue_);
+ RTC_GUARDED_BY(queue_);
// Whether zero-hertz and UMA reporting is enabled.
- bool zero_hertz_and_uma_reporting_enabled_ RTC_GUARDED_BY(worker_queue_) =
- false;
+ bool zero_hertz_and_uma_reporting_enabled_ RTC_GUARDED_BY(queue_) = false;
// Race checker for incoming frames. This is the network thread in chromium,
// but may vary from test contexts.
rtc::RaceChecker incoming_frame_race_checker_;
- bool has_reported_screenshare_frame_rate_umas_ RTC_GUARDED_BY(worker_queue_) =
- false;
+ bool has_reported_screenshare_frame_rate_umas_ RTC_GUARDED_BY(queue_) = false;
- ScopedTaskSafety safety_;
+ // Number of frames that are currently scheduled for processing on the
+ // |queue_|.
+ std::atomic<int> frames_scheduled_for_processing_{0};
+
+ ScopedTaskSafetyDetached safety_;
};
-FrameCadenceAdapterImpl::FrameCadenceAdapterImpl(TaskQueueBase* worker_queue)
- : worker_queue_(worker_queue),
+FrameCadenceAdapterImpl::FrameCadenceAdapterImpl(Clock* clock,
+ TaskQueueBase* queue)
+ : clock_(clock),
+ queue_(queue),
zero_hertz_screenshare_enabled_(
- field_trial::IsEnabled("WebRTC-ZeroHertzScreenshare")) {
- RTC_DCHECK_RUN_ON(worker_queue_);
-}
+ field_trial::IsEnabled("WebRTC-ZeroHertzScreenshare")) {}
void FrameCadenceAdapterImpl::Initialize(Callback* callback) {
callback_ = callback;
}
void FrameCadenceAdapterImpl::SetZeroHertzModeEnabled(bool enabled) {
- RTC_DCHECK_RUN_ON(worker_queue_);
+ RTC_DCHECK_RUN_ON(queue_);
if (enabled && !zero_hertz_and_uma_reporting_enabled_)
has_reported_screenshare_frame_rate_umas_ = false;
zero_hertz_and_uma_reporting_enabled_ = enabled;
@@ -95,9 +102,17 @@
// This method is called on the network thread under Chromium, or other
// various contexts in test.
RTC_DCHECK_RUNS_SERIALIZED(&incoming_frame_race_checker_);
- worker_queue_->PostTask(ToQueuedTask(safety_, [this, frame] {
- RTC_DCHECK_RUN_ON(worker_queue_);
- OnFrameOnMainQueue(std::move(frame));
+
+ // Local time in webrtc time base.
+ Timestamp post_time = clock_->CurrentTime();
+ frames_scheduled_for_processing_.fetch_add(1, std::memory_order_relaxed);
+ queue_->PostTask(ToQueuedTask(safety_.flag(), [this, post_time, frame] {
+ RTC_DCHECK_RUN_ON(queue_);
+ const int frames_scheduled_for_processing =
+ frames_scheduled_for_processing_.fetch_sub(1,
+ std::memory_order_relaxed);
+ OnFrameOnMainQueue(post_time, frames_scheduled_for_processing,
+ std::move(frame));
MaybeReportFrameRateConstraintUmas();
}));
}
@@ -107,18 +122,21 @@
RTC_LOG(LS_INFO) << __func__ << " min_fps "
<< constraints.min_fps.value_or(-1) << " max_fps "
<< constraints.max_fps.value_or(-1);
- worker_queue_->PostTask(ToQueuedTask(safety_, [this, constraints] {
- RTC_DCHECK_RUN_ON(worker_queue_);
+ queue_->PostTask(ToQueuedTask(safety_.flag(), [this, constraints] {
+ RTC_DCHECK_RUN_ON(queue_);
source_constraints_ = constraints;
}));
}
-// RTC_RUN_ON(worker_queue_)
-void FrameCadenceAdapterImpl::OnFrameOnMainQueue(const VideoFrame& frame) {
- callback_->OnFrame(frame);
+// RTC_RUN_ON(queue_)
+void FrameCadenceAdapterImpl::OnFrameOnMainQueue(
+ Timestamp post_time,
+ int frames_scheduled_for_processing,
+ const VideoFrame& frame) {
+ callback_->OnFrame(post_time, frames_scheduled_for_processing, frame);
}
-// RTC_RUN_ON(worker_queue_)
+// RTC_RUN_ON(queue_)
void FrameCadenceAdapterImpl::MaybeReportFrameRateConstraintUmas() {
if (has_reported_screenshare_frame_rate_umas_)
return;
@@ -175,8 +193,8 @@
} // namespace
std::unique_ptr<FrameCadenceAdapterInterface>
-FrameCadenceAdapterInterface::Create(TaskQueueBase* worker_queue) {
- return std::make_unique<FrameCadenceAdapterImpl>(worker_queue);
+FrameCadenceAdapterInterface::Create(Clock* clock, TaskQueueBase* queue) {
+ return std::make_unique<FrameCadenceAdapterImpl>(clock, queue);
}
} // namespace webrtc
diff --git a/video/frame_cadence_adapter.h b/video/frame_cadence_adapter.h
index ca702cf..beb7396 100644
--- a/video/frame_cadence_adapter.h
+++ b/video/frame_cadence_adapter.h
@@ -18,13 +18,14 @@
#include "api/video/video_sink_interface.h"
#include "rtc_base/synchronization/mutex.h"
#include "rtc_base/thread_annotations.h"
+#include "system_wrappers/include/clock.h"
namespace webrtc {
// A sink adapter implementing mutations to the received frame cadence.
-// With the exception of construction & destruction which has to happen on the
-// same sequence, this class is thread-safe because three different execution
-// contexts call into it.
+// With the exception of the constructor and the methods overridden in
+// VideoSinkInterface, the rest of the interface to this class (including dtor)
+// needs to happen on the queue passed in Create.
class FrameCadenceAdapterInterface
: public rtc::VideoSinkInterface<VideoFrame> {
public:
@@ -33,8 +34,20 @@
public:
virtual ~Callback() = default;
- // Called when a frame arrives.
- virtual void OnFrame(const VideoFrame& frame) = 0;
+ // Called when a frame arrives on the |queue| specified in Create.
+ //
+ // The |post_time| parameter indicates the current time sampled when
+ // FrameCadenceAdapterInterface::OnFrame was called.
+ //
+ // |frames_scheduled_for_processing| indicates how many frames that have
+ // been scheduled for processing. During sequential conditions where
+ // FrameCadenceAdapterInterface::OnFrame is invoked and subsequently ending
+ // up in this callback, this value will read 1. Otherwise if the
+ // |queue| gets stalled for some reason, the value will increase
+ // beyond 1.
+ virtual void OnFrame(Timestamp post_time,
+ int frames_scheduled_for_processing,
+ const VideoFrame& frame) = 0;
// Called when the source has discarded a frame.
virtual void OnDiscardedFrame() = 0;
@@ -42,8 +55,11 @@
// Factory function creating a production instance. Deletion of the returned
// instance needs to happen on the same sequence that Create() was called on.
+ // Frames arriving in FrameCadenceAdapterInterface::OnFrame are posted to
+ // Callback::OnFrame on the |queue|.
static std::unique_ptr<FrameCadenceAdapterInterface> Create(
- TaskQueueBase* worker_queue);
+ Clock* clock,
+ TaskQueueBase* queue);
// Call before using the rest of the API.
virtual void Initialize(Callback* callback) = 0;
diff --git a/video/frame_cadence_adapter_unittest.cc b/video/frame_cadence_adapter_unittest.cc
index 56fa220..a6b6a87 100644
--- a/video/frame_cadence_adapter_unittest.cc
+++ b/video/frame_cadence_adapter_unittest.cc
@@ -26,6 +26,7 @@
namespace webrtc {
namespace {
+using ::testing::_;
using ::testing::ElementsAre;
using ::testing::Mock;
using ::testing::Pair;
@@ -39,13 +40,13 @@
.build();
}
-std::unique_ptr<FrameCadenceAdapterInterface> CreateAdapter() {
- return FrameCadenceAdapterInterface::Create(TaskQueueBase::Current());
+std::unique_ptr<FrameCadenceAdapterInterface> CreateAdapter(Clock* clock) {
+ return FrameCadenceAdapterInterface::Create(clock, TaskQueueBase::Current());
}
class MockCallback : public FrameCadenceAdapterInterface::Callback {
public:
- MOCK_METHOD(void, OnFrame, (const VideoFrame&), (override));
+ MOCK_METHOD(void, OnFrame, (Timestamp, int, const VideoFrame&), (override));
MOCK_METHOD(void, OnDiscardedFrame, (), (override));
};
@@ -61,7 +62,7 @@
auto disabler = std::make_unique<ZeroHertzFieldTrialDisabler>();
for (int i = 0; i != 2; i++) {
MockCallback callback;
- auto adapter = CreateAdapter();
+ auto adapter = CreateAdapter(time_controller.GetClock());
adapter->Initialize(&callback);
VideoFrame frame = CreateFrame();
EXPECT_CALL(callback, OnFrame).Times(1);
@@ -76,6 +77,22 @@
}
}
+TEST(FrameCadenceAdapterTest, CountsOutstandingFramesToProcess) {
+ GlobalSimulatedTimeController time_controller(Timestamp::Millis(1));
+ MockCallback callback;
+ auto adapter = CreateAdapter(time_controller.GetClock());
+ adapter->Initialize(&callback);
+ EXPECT_CALL(callback, OnFrame(_, 2, _)).Times(1);
+ EXPECT_CALL(callback, OnFrame(_, 1, _)).Times(1);
+ auto frame = CreateFrame();
+ adapter->OnFrame(frame);
+ adapter->OnFrame(frame);
+ time_controller.AdvanceTime(TimeDelta::Zero());
+ EXPECT_CALL(callback, OnFrame(_, 1, _)).Times(1);
+ adapter->OnFrame(frame);
+ time_controller.AdvanceTime(TimeDelta::Zero());
+}
+
class FrameCadenceAdapterMetricsTest : public ::testing::Test {
public:
FrameCadenceAdapterMetricsTest() : time_controller_(Timestamp::Millis(1)) {
@@ -83,13 +100,13 @@
}
void DepleteTaskQueues() { time_controller_.AdvanceTime(TimeDelta::Zero()); }
- private:
+ protected:
GlobalSimulatedTimeController time_controller_;
};
TEST_F(FrameCadenceAdapterMetricsTest, RecordsNoUmasWithNoFrameTransfer) {
MockCallback callback;
- auto adapter = CreateAdapter();
+ auto adapter = CreateAdapter(nullptr);
adapter->Initialize(&callback);
adapter->OnConstraintsChanged(
VideoTrackSourceConstraints{absl::nullopt, absl::nullopt});
@@ -129,7 +146,7 @@
TEST_F(FrameCadenceAdapterMetricsTest, RecordsNoUmasWithoutEnabledContentType) {
MockCallback callback;
- auto adapter = CreateAdapter();
+ auto adapter = CreateAdapter(time_controller_.GetClock());
adapter->Initialize(&callback);
adapter->OnFrame(CreateFrame());
adapter->OnConstraintsChanged(
@@ -170,7 +187,7 @@
TEST_F(FrameCadenceAdapterMetricsTest, RecordsNoConstraintsIfUnsetOnFrame) {
MockCallback callback;
- auto adapter = CreateAdapter();
+ auto adapter = CreateAdapter(time_controller_.GetClock());
adapter->Initialize(&callback);
adapter->SetZeroHertzModeEnabled(true);
adapter->OnFrame(CreateFrame());
@@ -182,7 +199,7 @@
TEST_F(FrameCadenceAdapterMetricsTest, RecordsEmptyConstraintsIfSetOnFrame) {
MockCallback callback;
- auto adapter = CreateAdapter();
+ auto adapter = CreateAdapter(time_controller_.GetClock());
adapter->Initialize(&callback);
adapter->SetZeroHertzModeEnabled(true);
adapter->OnConstraintsChanged(
@@ -221,7 +238,7 @@
TEST_F(FrameCadenceAdapterMetricsTest, RecordsMaxConstraintIfSetOnFrame) {
MockCallback callback;
- auto adapter = CreateAdapter();
+ auto adapter = CreateAdapter(time_controller_.GetClock());
adapter->Initialize(&callback);
adapter->SetZeroHertzModeEnabled(true);
adapter->OnConstraintsChanged(
@@ -257,7 +274,7 @@
TEST_F(FrameCadenceAdapterMetricsTest, RecordsMinConstraintIfSetOnFrame) {
MockCallback callback;
- auto adapter = CreateAdapter();
+ auto adapter = CreateAdapter(time_controller_.GetClock());
adapter->Initialize(&callback);
adapter->SetZeroHertzModeEnabled(true);
adapter->OnConstraintsChanged(
@@ -293,7 +310,7 @@
TEST_F(FrameCadenceAdapterMetricsTest, RecordsMinGtMaxConstraintIfSetOnFrame) {
MockCallback callback;
- auto adapter = CreateAdapter();
+ auto adapter = CreateAdapter(time_controller_.GetClock());
adapter->Initialize(&callback);
adapter->SetZeroHertzModeEnabled(true);
adapter->OnConstraintsChanged(VideoTrackSourceConstraints{5.0, 4.0});
@@ -328,7 +345,7 @@
TEST_F(FrameCadenceAdapterMetricsTest, RecordsMinLtMaxConstraintIfSetOnFrame) {
MockCallback callback;
- auto adapter = CreateAdapter();
+ auto adapter = CreateAdapter(time_controller_.GetClock());
adapter->Initialize(&callback);
adapter->SetZeroHertzModeEnabled(true);
adapter->OnConstraintsChanged(VideoTrackSourceConstraints{4.0, 5.0});
diff --git a/video/video_send_stream.cc b/video/video_send_stream.cc
index 05708d0..e78211b 100644
--- a/video/video_send_stream.cc
+++ b/video/video_send_stream.cc
@@ -12,6 +12,7 @@
#include <utility>
#include "api/array_view.h"
+#include "api/task_queue/task_queue_base.h"
#include "api/video/video_stream_encoder_settings.h"
#include "modules/rtp_rtcp/include/rtp_header_extension_map.h"
#include "modules/rtp_rtcp/source/rtp_header_extension_size.h"
@@ -106,6 +107,25 @@
return observers;
}
+std::unique_ptr<VideoStreamEncoder> CreateVideoStreamEncoder(
+ Clock* clock,
+ int num_cpu_cores,
+ TaskQueueFactory* task_queue_factory,
+ SendStatisticsProxy* stats_proxy,
+ const VideoStreamEncoderSettings& encoder_settings,
+ VideoStreamEncoder::BitrateAllocationCallbackType
+ bitrate_allocation_callback_type) {
+ std::unique_ptr<TaskQueueBase, TaskQueueDeleter> encoder_queue =
+ task_queue_factory->CreateTaskQueue("EncoderQueue",
+ TaskQueueFactory::Priority::NORMAL);
+ TaskQueueBase* encoder_queue_ptr = encoder_queue.get();
+ return std::make_unique<VideoStreamEncoder>(
+ clock, num_cpu_cores, stats_proxy, encoder_settings,
+ std::make_unique<OveruseFrameDetector>(stats_proxy),
+ FrameCadenceAdapterInterface::Create(clock, encoder_queue_ptr),
+ std::move(encoder_queue), bitrate_allocation_callback_type);
+}
+
} // namespace
namespace internal {
@@ -130,17 +150,13 @@
stats_proxy_(clock, config, encoder_config.content_type),
config_(std::move(config)),
content_type_(encoder_config.content_type),
- video_stream_encoder_(std::make_unique<VideoStreamEncoder>(
- clock,
- num_cpu_cores,
- &stats_proxy_,
- config_.encoder_settings,
- std::make_unique<OveruseFrameDetector>(&stats_proxy_),
- FrameCadenceAdapterInterface::Create(
- /*worker_queue=*/TaskQueueBase::Current()),
- task_queue_factory,
- /*worker_queue=*/TaskQueueBase::Current(),
- GetBitrateAllocationCallbackType(config_))),
+ video_stream_encoder_(
+ CreateVideoStreamEncoder(clock,
+ num_cpu_cores,
+ task_queue_factory,
+ &stats_proxy_,
+ config_.encoder_settings,
+ GetBitrateAllocationCallbackType(config_))),
encoder_feedback_(
clock,
config_.rtp.ssrcs,
diff --git a/video/video_stream_encoder.cc b/video/video_stream_encoder.cc
index c6870a0..1c0de4b 100644
--- a/video/video_stream_encoder.cc
+++ b/video/video_stream_encoder.cc
@@ -593,8 +593,8 @@
const VideoStreamEncoderSettings& settings,
std::unique_ptr<OveruseFrameDetector> overuse_detector,
std::unique_ptr<FrameCadenceAdapterInterface> frame_cadence_adapter,
- TaskQueueFactory* task_queue_factory,
- TaskQueueBase* worker_queue,
+ std::unique_ptr<webrtc::TaskQueueBase, webrtc::TaskQueueDeleter>
+ encoder_queue,
BitrateAllocationCallbackType allocation_cb_type)
: worker_queue_(TaskQueueBase::Current()),
number_of_cores_(number_of_cores),
@@ -618,7 +618,6 @@
was_encode_called_since_last_initialization_(false),
encoder_failed_(false),
clock_(clock),
- posted_frames_waiting_for_encode_(0),
last_captured_timestamp_(0),
delta_ntp_internal_ms_(clock_->CurrentNtpInMilliseconds() -
clock_->TimeInMilliseconds()),
@@ -665,9 +664,7 @@
!field_trial::IsEnabled("WebRTC-DefaultBitrateLimitsKillSwitch")),
qp_parsing_allowed_(
!field_trial::IsEnabled("WebRTC-QpParsingKillSwitch")),
- encoder_queue_(task_queue_factory->CreateTaskQueue(
- "EncoderQueue",
- TaskQueueFactory::Priority::NORMAL)) {
+ encoder_queue_(std::move(encoder_queue)) {
TRACE_EVENT0("webrtc", "VideoStreamEncoder::VideoStreamEncoder");
RTC_DCHECK_RUN_ON(worker_queue_);
RTC_DCHECK(encoder_stats_observer);
@@ -732,6 +729,7 @@
rate_allocator_ = nullptr;
ReleaseEncoder();
encoder_ = nullptr;
+ frame_cadence_adapter_ = nullptr;
shutdown_event.Set();
});
shutdown_event.Wait(rtc::Event::kForever);
@@ -824,14 +822,14 @@
void VideoStreamEncoder::ConfigureEncoder(VideoEncoderConfig config,
size_t max_data_payload_length) {
RTC_DCHECK_RUN_ON(worker_queue_);
- frame_cadence_adapter_->SetZeroHertzModeEnabled(
- config.content_type == VideoEncoderConfig::ContentType::kScreen);
encoder_queue_.PostTask(
[this, config = std::move(config), max_data_payload_length]() mutable {
RTC_DCHECK_RUN_ON(&encoder_queue_);
RTC_DCHECK(sink_);
RTC_LOG(LS_INFO) << "ConfigureEncoder requested.";
+ frame_cadence_adapter_->SetZeroHertzModeEnabled(
+ config.content_type == VideoEncoderConfig::ContentType::kScreen);
pending_encoder_creation_ =
(!encoder_ || encoder_config_.video_format != config.video_format ||
max_data_payload_length_ != max_data_payload_length);
@@ -1261,19 +1259,18 @@
degradation_preference_manager_->SetIsScreenshare(is_screenshare);
}
-void VideoStreamEncoder::OnFrame(const VideoFrame& video_frame) {
- RTC_DCHECK_RUN_ON(worker_queue_);
+void VideoStreamEncoder::OnFrame(Timestamp post_time,
+ int frames_scheduled_for_processing,
+ const VideoFrame& video_frame) {
+ RTC_DCHECK_RUN_ON(&encoder_queue_);
VideoFrame incoming_frame = video_frame;
- // Local time in webrtc time base.
- Timestamp now = clock_->CurrentTime();
-
// In some cases, e.g., when the frame from decoder is fed to encoder,
// the timestamp may be set to the future. As the encoding pipeline assumes
// capture time to be less than present time, we should reset the capture
// timestamps here. Otherwise there may be issues with RTP send stream.
- if (incoming_frame.timestamp_us() > now.us())
- incoming_frame.set_timestamp_us(now.us());
+ if (incoming_frame.timestamp_us() > post_time.us())
+ incoming_frame.set_timestamp_us(post_time.us());
// Capture time may come from clock with an offset and drift from clock_.
int64_t capture_ntp_time_ms;
@@ -1282,7 +1279,7 @@
} else if (video_frame.render_time_ms() != 0) {
capture_ntp_time_ms = video_frame.render_time_ms() + delta_ntp_internal_ms_;
} else {
- capture_ntp_time_ms = now.ms() + delta_ntp_internal_ms_;
+ capture_ntp_time_ms = post_time.ms() + delta_ntp_internal_ms_;
}
incoming_frame.set_ntp_time_ms(capture_ntp_time_ms);
@@ -1306,62 +1303,51 @@
}
bool log_stats = false;
- if (now.ms() - last_frame_log_ms_ > kFrameLogIntervalMs) {
- last_frame_log_ms_ = now.ms();
+ if (post_time.ms() - last_frame_log_ms_ > kFrameLogIntervalMs) {
+ last_frame_log_ms_ = post_time.ms();
log_stats = true;
}
last_captured_timestamp_ = incoming_frame.ntp_time_ms();
- int64_t post_time_us = clock_->CurrentTime().us();
- ++posted_frames_waiting_for_encode_;
-
- encoder_queue_.PostTask(
- [this, incoming_frame, post_time_us, log_stats]() {
- RTC_DCHECK_RUN_ON(&encoder_queue_);
- encoder_stats_observer_->OnIncomingFrame(incoming_frame.width(),
- incoming_frame.height());
- ++captured_frame_count_;
- const int posted_frames_waiting_for_encode =
- posted_frames_waiting_for_encode_.fetch_sub(1);
- RTC_DCHECK_GT(posted_frames_waiting_for_encode, 0);
- CheckForAnimatedContent(incoming_frame, post_time_us);
- bool cwnd_frame_drop =
- cwnd_frame_drop_interval_ &&
- (cwnd_frame_counter_++ % cwnd_frame_drop_interval_.value() == 0);
- if (posted_frames_waiting_for_encode == 1 && !cwnd_frame_drop) {
- MaybeEncodeVideoFrame(incoming_frame, post_time_us);
- } else {
- if (cwnd_frame_drop) {
- // Frame drop by congestion window pushback. Do not encode this
- // frame.
- ++dropped_frame_cwnd_pushback_count_;
- encoder_stats_observer_->OnFrameDropped(
- VideoStreamEncoderObserver::DropReason::kCongestionWindow);
- } else {
- // There is a newer frame in flight. Do not encode this frame.
- RTC_LOG(LS_VERBOSE)
- << "Incoming frame dropped due to that the encoder is blocked.";
- ++dropped_frame_encoder_block_count_;
- encoder_stats_observer_->OnFrameDropped(
- VideoStreamEncoderObserver::DropReason::kEncoderQueue);
- }
- accumulated_update_rect_.Union(incoming_frame.update_rect());
- accumulated_update_rect_is_valid_ &= incoming_frame.has_update_rect();
- }
- if (log_stats) {
- RTC_LOG(LS_INFO) << "Number of frames: captured "
- << captured_frame_count_
- << ", dropped (due to congestion window pushback) "
- << dropped_frame_cwnd_pushback_count_
- << ", dropped (due to encoder blocked) "
- << dropped_frame_encoder_block_count_
- << ", interval_ms " << kFrameLogIntervalMs;
- captured_frame_count_ = 0;
- dropped_frame_cwnd_pushback_count_ = 0;
- dropped_frame_encoder_block_count_ = 0;
- }
- });
+ encoder_stats_observer_->OnIncomingFrame(incoming_frame.width(),
+ incoming_frame.height());
+ ++captured_frame_count_;
+ CheckForAnimatedContent(incoming_frame, post_time.us());
+ bool cwnd_frame_drop =
+ cwnd_frame_drop_interval_ &&
+ (cwnd_frame_counter_++ % cwnd_frame_drop_interval_.value() == 0);
+ if (frames_scheduled_for_processing == 1 && !cwnd_frame_drop) {
+ MaybeEncodeVideoFrame(incoming_frame, post_time.us());
+ } else {
+ if (cwnd_frame_drop) {
+ // Frame drop by congestion window pushback. Do not encode this
+ // frame.
+ ++dropped_frame_cwnd_pushback_count_;
+ encoder_stats_observer_->OnFrameDropped(
+ VideoStreamEncoderObserver::DropReason::kCongestionWindow);
+ } else {
+ // There is a newer frame in flight. Do not encode this frame.
+ RTC_LOG(LS_VERBOSE)
+ << "Incoming frame dropped due to that the encoder is blocked.";
+ ++dropped_frame_encoder_block_count_;
+ encoder_stats_observer_->OnFrameDropped(
+ VideoStreamEncoderObserver::DropReason::kEncoderQueue);
+ }
+ accumulated_update_rect_.Union(incoming_frame.update_rect());
+ accumulated_update_rect_is_valid_ &= incoming_frame.has_update_rect();
+ }
+ if (log_stats) {
+ RTC_LOG(LS_INFO) << "Number of frames: captured " << captured_frame_count_
+ << ", dropped (due to congestion window pushback) "
+ << dropped_frame_cwnd_pushback_count_
+ << ", dropped (due to encoder blocked) "
+ << dropped_frame_encoder_block_count_ << ", interval_ms "
+ << kFrameLogIntervalMs;
+ captured_frame_count_ = 0;
+ dropped_frame_cwnd_pushback_count_ = 0;
+ dropped_frame_encoder_block_count_ = 0;
+ }
}
void VideoStreamEncoder::OnDiscardedFrame() {
diff --git a/video/video_stream_encoder.h b/video/video_stream_encoder.h
index e0eda70..4231f1b 100644
--- a/video/video_stream_encoder.h
+++ b/video/video_stream_encoder.h
@@ -77,8 +77,8 @@
const VideoStreamEncoderSettings& settings,
std::unique_ptr<OveruseFrameDetector> overuse_detector,
std::unique_ptr<FrameCadenceAdapterInterface> frame_cadence_adapter,
- TaskQueueFactory* task_queue_factory,
- TaskQueueBase* worker_queue,
+ std::unique_ptr<webrtc::TaskQueueBase, webrtc::TaskQueueDeleter>
+ encoder_queue,
BitrateAllocationCallbackType allocation_cb_type);
~VideoStreamEncoder() override;
@@ -146,8 +146,11 @@
explicit CadenceCallback(VideoStreamEncoder& video_stream_encoder)
: video_stream_encoder_(video_stream_encoder) {}
// FrameCadenceAdapterInterface::Callback overrides.
- void OnFrame(const VideoFrame& frame) override {
- video_stream_encoder_.OnFrame(frame);
+ void OnFrame(Timestamp post_time,
+ int frames_scheduled_for_processing,
+ const VideoFrame& frame) override {
+ video_stream_encoder_.OnFrame(post_time, frames_scheduled_for_processing,
+ frame);
}
void OnDiscardedFrame() override {
video_stream_encoder_.OnDiscardedFrame();
@@ -192,7 +195,9 @@
void ReconfigureEncoder() RTC_RUN_ON(&encoder_queue_);
void OnEncoderSettingsChanged() RTC_RUN_ON(&encoder_queue_);
- void OnFrame(const VideoFrame& video_frame);
+ void OnFrame(Timestamp post_time,
+ int frames_scheduled_for_processing,
+ const VideoFrame& video_frame);
void OnDiscardedFrame();
void MaybeEncodeVideoFrame(const VideoFrame& frame,
@@ -260,7 +265,8 @@
CadenceCallback cadence_callback_;
// Frame cadence encoder adapter. Frames enter this adapter first, and it then
// forwards them to our OnFrame method.
- const std::unique_ptr<FrameCadenceAdapterInterface> frame_cadence_adapter_;
+ std::unique_ptr<FrameCadenceAdapterInterface> frame_cadence_adapter_
+ RTC_GUARDED_BY(&encoder_queue_) RTC_PT_GUARDED_BY(&encoder_queue_);
VideoEncoderConfig encoder_config_ RTC_GUARDED_BY(&encoder_queue_);
std::unique_ptr<VideoEncoder> encoder_ RTC_GUARDED_BY(&encoder_queue_)
@@ -296,13 +302,12 @@
bool encoder_failed_ RTC_GUARDED_BY(&encoder_queue_);
Clock* const clock_;
- std::atomic<int> posted_frames_waiting_for_encode_;
// Used to make sure incoming time stamp is increasing for every frame.
- int64_t last_captured_timestamp_ RTC_GUARDED_BY(worker_queue_);
+ int64_t last_captured_timestamp_ RTC_GUARDED_BY(&encoder_queue_);
// Delta used for translating between NTP and internal timestamps.
- const int64_t delta_ntp_internal_ms_ RTC_GUARDED_BY(worker_queue_);
+ const int64_t delta_ntp_internal_ms_ RTC_GUARDED_BY(&encoder_queue_);
- int64_t last_frame_log_ms_ RTC_GUARDED_BY(worker_queue_);
+ int64_t last_frame_log_ms_ RTC_GUARDED_BY(&encoder_queue_);
int captured_frame_count_ RTC_GUARDED_BY(&encoder_queue_);
int dropped_frame_cwnd_pushback_count_ RTC_GUARDED_BY(&encoder_queue_);
int dropped_frame_encoder_block_count_ RTC_GUARDED_BY(&encoder_queue_);
diff --git a/video/video_stream_encoder_unittest.cc b/video/video_stream_encoder_unittest.cc
index ba29d0b..32d4f94 100644
--- a/video/video_stream_encoder_unittest.cc
+++ b/video/video_stream_encoder_unittest.cc
@@ -17,6 +17,7 @@
#include <utility>
#include "absl/memory/memory.h"
+#include "api/rtp_parameters.h"
#include "api/task_queue/default_task_queue_factory.h"
#include "api/task_queue/task_queue_factory.h"
#include "api/test/mock_fec_controller_override.h"
@@ -346,24 +347,25 @@
class VideoStreamEncoderUnderTest : public VideoStreamEncoder {
public:
- VideoStreamEncoderUnderTest(TimeController* time_controller,
- TaskQueueFactory* task_queue_factory,
- SendStatisticsProxy* stats_proxy,
- const VideoStreamEncoderSettings& settings,
- VideoStreamEncoder::BitrateAllocationCallbackType
- allocation_callback_type)
- : VideoStreamEncoder(
- time_controller->GetClock(),
- 1 /* number_of_cores */,
- stats_proxy,
- settings,
- std::unique_ptr<OveruseFrameDetector>(
- overuse_detector_proxy_ =
- new CpuOveruseDetectorProxy(stats_proxy)),
- FrameCadenceAdapterInterface::Create(TaskQueueBase::Current()),
- task_queue_factory,
- TaskQueueBase::Current(),
- allocation_callback_type),
+ VideoStreamEncoderUnderTest(
+ TimeController* time_controller,
+ std::unique_ptr<FrameCadenceAdapterInterface> cadence_adapter,
+ std::unique_ptr<webrtc::TaskQueueBase, webrtc::TaskQueueDeleter>
+ encoder_queue,
+ SendStatisticsProxy* stats_proxy,
+ const VideoStreamEncoderSettings& settings,
+ VideoStreamEncoder::BitrateAllocationCallbackType
+ allocation_callback_type)
+ : VideoStreamEncoder(time_controller->GetClock(),
+ 1 /* number_of_cores */,
+ stats_proxy,
+ settings,
+ std::unique_ptr<OveruseFrameDetector>(
+ overuse_detector_proxy_ =
+ new CpuOveruseDetectorProxy(stats_proxy)),
+ std::move(cadence_adapter),
+ std::move(encoder_queue),
+ allocation_callback_type),
time_controller_(time_controller),
fake_cpu_resource_(FakeResource::Create("FakeResource[CPU]")),
fake_quality_resource_(FakeResource::Create("FakeResource[QP]")),
@@ -659,14 +661,17 @@
/*number_of_cores=*/1,
/*stats_proxy=*/stats_proxy_.get(), encoder_settings_,
std::make_unique<CpuOveruseDetectorProxy>(/*stats_proxy=*/nullptr),
- std::move(zero_hertz_adapter), task_queue_factory_.get(),
- TaskQueueBase::Current(),
+ std::move(zero_hertz_adapter),
+ time_controller_.GetTaskQueueFactory()->CreateTaskQueue(
+ "EncoderQueue", TaskQueueFactory::Priority::NORMAL),
VideoStreamEncoder::BitrateAllocationCallbackType::
kVideoBitrateAllocation);
result->SetSink(&sink_, /*rotation_applied=*/false);
return result;
}
+ void DepleteTaskQueues() { time_controller_.AdvanceTime(TimeDelta::Zero()); }
+
private:
class NullEncoderSink : public VideoStreamEncoderInterface::EncoderSink {
public:
@@ -763,9 +768,17 @@
kVideoBitrateAllocationWhenScreenSharing) {
if (video_stream_encoder_)
video_stream_encoder_->Stop();
- video_stream_encoder_.reset(new VideoStreamEncoderUnderTest(
- &time_controller_, GetTaskQueueFactory(), stats_proxy_.get(),
- video_send_config_.encoder_settings, allocation_callback_type));
+
+ auto encoder_queue = GetTaskQueueFactory()->CreateTaskQueue(
+ "EncoderQueue", TaskQueueFactory::Priority::NORMAL);
+ TaskQueueBase* encoder_queue_ptr = encoder_queue.get();
+ std::unique_ptr<FrameCadenceAdapterInterface> cadence_adapter =
+ FrameCadenceAdapterInterface::Create(time_controller_.GetClock(),
+ encoder_queue_ptr);
+ video_stream_encoder_ = std::make_unique<VideoStreamEncoderUnderTest>(
+ &time_controller_, std::move(cadence_adapter), std::move(encoder_queue),
+ stats_proxy_.get(), video_send_config_.encoder_settings,
+ allocation_callback_type);
video_stream_encoder_->SetSink(&sink_, /*rotation_applied=*/false);
video_stream_encoder_->SetSource(
&video_source_, webrtc::DegradationPreference::MAINTAIN_FRAMERATE);
@@ -930,11 +943,6 @@
RTC_DCHECK(time_controller_);
}
- void BlockNextEncode() {
- MutexLock lock(&local_mutex_);
- block_next_encode_ = true;
- }
-
VideoEncoder::EncoderInfo GetEncoderInfo() const override {
MutexLock lock(&local_mutex_);
EncoderInfo info = FakeEncoder::GetEncoderInfo();
@@ -1108,7 +1116,6 @@
private:
int32_t Encode(const VideoFrame& input_image,
const std::vector<VideoFrameType>* frame_types) override {
- bool block_encode;
{
MutexLock lock(&local_mutex_);
if (expect_null_frame_) {
@@ -1126,16 +1133,11 @@
ntp_time_ms_ = input_image.ntp_time_ms();
last_input_width_ = input_image.width();
last_input_height_ = input_image.height();
- block_encode = block_next_encode_;
- block_next_encode_ = false;
last_update_rect_ = input_image.update_rect();
last_frame_types_ = *frame_types;
last_input_pixel_format_ = input_image.video_frame_buffer()->type();
}
int32_t result = FakeEncoder::Encode(input_image, frame_types);
- if (block_encode)
- EXPECT_TRUE(continue_encode_event_.Wait(kDefaultTimeoutMs));
-
return result;
}
@@ -1212,7 +1214,6 @@
kInitializationFailed,
kInitialized
} initialized_ RTC_GUARDED_BY(local_mutex_) = EncoderState::kUninitialized;
- bool block_next_encode_ RTC_GUARDED_BY(local_mutex_) = false;
rtc::Event continue_encode_event_;
uint32_t timestamp_ RTC_GUARDED_BY(local_mutex_) = 0;
int64_t ntp_time_ms_ RTC_GUARDED_BY(local_mutex_) = 0;
@@ -1592,20 +1593,10 @@
EXPECT_TRUE(frame_destroyed_event.Wait(kDefaultTimeoutMs));
}
-class VideoStreamEncoderBlockedTest : public VideoStreamEncoderTest {
- public:
- VideoStreamEncoderBlockedTest() {}
-
- TaskQueueFactory* GetTaskQueueFactory() override {
- return task_queue_factory_.get();
- }
-
- private:
- std::unique_ptr<TaskQueueFactory> task_queue_factory_ =
- CreateDefaultTaskQueueFactory();
-};
-
-TEST_F(VideoStreamEncoderBlockedTest, DropsPendingFramesOnSlowEncode) {
+TEST_F(VideoStreamEncoderTest, DropsPendingFramesOnSlowEncode) {
+ test::FrameForwarder source;
+ video_stream_encoder_->SetSource(&source,
+ DegradationPreference::MAINTAIN_FRAMERATE);
video_stream_encoder_->OnBitrateUpdatedAndWaitForManagedResources(
kTargetBitrate, kTargetBitrate, kTargetBitrate, 0, 0, 0);
@@ -1615,18 +1606,10 @@
++dropped_count;
});
- fake_encoder_.BlockNextEncode();
- video_source_.IncomingCapturedFrame(CreateFrame(1, nullptr));
- WaitForEncodedFrame(1);
- // Here, the encoder thread will be blocked in the TestEncoder waiting for a
- // call to ContinueEncode.
- video_source_.IncomingCapturedFrame(CreateFrame(2, nullptr));
- video_source_.IncomingCapturedFrame(CreateFrame(3, nullptr));
- fake_encoder_.ContinueEncode();
- WaitForEncodedFrame(3);
-
+ source.IncomingCapturedFrame(CreateFrame(1, nullptr));
+ source.IncomingCapturedFrame(CreateFrame(2, nullptr));
+ WaitForEncodedFrame(2);
video_stream_encoder_->Stop();
-
EXPECT_EQ(1, dropped_count);
}
@@ -7125,14 +7108,15 @@
video_stream_encoder_->Stop();
}
-TEST_F(VideoStreamEncoderBlockedTest, AccumulatesUpdateRectOnDroppedFrames) {
+TEST_F(VideoStreamEncoderTest, AccumulatesUpdateRectOnDroppedFrames) {
VideoFrame::UpdateRect rect;
+ test::FrameForwarder source;
+ video_stream_encoder_->SetSource(&source,
+ DegradationPreference::MAINTAIN_FRAMERATE);
video_stream_encoder_->OnBitrateUpdatedAndWaitForManagedResources(
kTargetBitrate, kTargetBitrate, kTargetBitrate, 0, 0, 0);
- fake_encoder_.BlockNextEncode();
- video_source_.IncomingCapturedFrame(
- CreateFrameWithUpdatedPixel(1, nullptr, 0));
+ source.IncomingCapturedFrame(CreateFrameWithUpdatedPixel(1, nullptr, 0));
WaitForEncodedFrame(1);
// On the very first frame full update should be forced.
rect = fake_encoder_.GetLastUpdateRect();
@@ -7140,15 +7124,10 @@
EXPECT_EQ(rect.offset_y, 0);
EXPECT_EQ(rect.height, codec_height_);
EXPECT_EQ(rect.width, codec_width_);
- // Here, the encoder thread will be blocked in the TestEncoder waiting for a
- // call to ContinueEncode.
- video_source_.IncomingCapturedFrame(
- CreateFrameWithUpdatedPixel(2, nullptr, 1));
- ExpectDroppedFrame();
- video_source_.IncomingCapturedFrame(
- CreateFrameWithUpdatedPixel(3, nullptr, 10));
- ExpectDroppedFrame();
- fake_encoder_.ContinueEncode();
+ // Frame with NTP timestamp 2 will be dropped due to outstanding frames
+ // scheduled for processing during encoder queue processing of frame 2.
+ source.IncomingCapturedFrame(CreateFrameWithUpdatedPixel(2, nullptr, 1));
+ source.IncomingCapturedFrame(CreateFrameWithUpdatedPixel(3, nullptr, 10));
WaitForEncodedFrame(3);
// Updates to pixels 1 and 10 should be accumulated to one 10x1 rect.
rect = fake_encoder_.GetLastUpdateRect();
@@ -7157,8 +7136,7 @@
EXPECT_EQ(rect.width, 10);
EXPECT_EQ(rect.height, 1);
- video_source_.IncomingCapturedFrame(
- CreateFrameWithUpdatedPixel(4, nullptr, 0));
+ source.IncomingCapturedFrame(CreateFrameWithUpdatedPixel(4, nullptr, 0));
WaitForEncodedFrame(4);
// Previous frame was encoded, so no accumulation should happen.
rect = fake_encoder_.GetLastUpdateRect();
@@ -8737,12 +8715,14 @@
VideoEncoderConfig config;
config.content_type = VideoEncoderConfig::ContentType::kScreen;
video_stream_encoder->ConfigureEncoder(std::move(config), 0);
+ factory.DepleteTaskQueues();
Mock::VerifyAndClearExpectations(adapter_ptr);
EXPECT_CALL(*adapter_ptr, SetZeroHertzModeEnabled(false));
VideoEncoderConfig config2;
config2.content_type = VideoEncoderConfig::ContentType::kRealtimeVideo;
video_stream_encoder->ConfigureEncoder(std::move(config2), 0);
+ factory.DepleteTaskQueues();
}
TEST(VideoStreamEncoderFrameCadenceTest,