Use SingleThreadedTaskQueue in DirectTransport
DirectTransport has so far used its own thread, which led to a different threading-model for in the unit-tests than is used in actual WebRTC. Because of that, some critical-sections that weren't truly necessary in WebRTC could not be replaced with thread-checks, because those checks failed in unit-tests.
This CL introduces SingleThreadedTaskQueue - a TaskQueue which guarantees to run all of its tasks on the same thread (rtc::TaskQueue doesn't guarantee that on Mac) - and uses that for DirectTransport. CLs based on top of this will uncomment thread-checks which had to be commented out before, and remove unnecessary critical-sections.
Future work would probably replace the thread-checkers by more sophisticated serialized-access checks, allowing us to move from the SingleThreadedTaskQueue to a normal TaskQueue.
Related implementation notes:
* This CL has made DirectTransport::StopSending() superfluous, and so it was deleted.
BUG=webrtc:8113, webrtc:7405, webrtc:8056, webrtc:8116
Review-Url: https://codereview.webrtc.org/2998923002
Cr-Commit-Position: refs/heads/master@{#19445}
diff --git a/webrtc/video/video_quality_test.cc b/webrtc/video/video_quality_test.cc
index 6ad64bc..16d7356 100644
--- a/webrtc/video/video_quality_test.cc
+++ b/webrtc/video/video_quality_test.cc
@@ -37,6 +37,7 @@
#include "webrtc/rtc_base/memory_usage.h"
#include "webrtc/rtc_base/optional.h"
#include "webrtc/rtc_base/platform_file.h"
+#include "webrtc/rtc_base/ptr_util.h"
#include "webrtc/rtc_base/timeutils.h"
#include "webrtc/system_wrappers/include/cpu_info.h"
#include "webrtc/system_wrappers/include/field_trial.h"
@@ -1651,6 +1652,10 @@
sender_call_->DestroyVideoReceiveStream(thumbnail_receive_stream);
thumbnail_send_streams_.clear();
thumbnail_receive_streams_.clear();
+ for (std::unique_ptr<test::VideoCapturer>& video_caputurer :
+ thumbnail_capturers_) {
+ video_caputurer.reset();
+ }
}
void VideoQualityTest::SetupScreenshareOrSVC() {
@@ -1769,6 +1774,11 @@
}
void VideoQualityTest::RunWithAnalyzer(const Params& params) {
+ std::unique_ptr<test::LayerFilteringTransport> send_transport;
+ std::unique_ptr<test::DirectTransport> recv_transport;
+ FILE* graph_data_output_file = nullptr;
+ std::unique_ptr<VideoAnalyzer> analyzer;
+
params_ = params;
RTC_CHECK(!params_.audio.enabled);
@@ -1776,7 +1786,6 @@
// differentiate between the analyzer and the renderer case.
CheckParams();
- FILE* graph_data_output_file = nullptr;
if (!params_.analyzer.graph_data_output_filename.empty()) {
graph_data_output_file =
fopen(params_.analyzer.graph_data_output_filename.c_str(), "w");
@@ -1794,22 +1803,26 @@
Call::Config call_config(event_log_.get());
call_config.bitrate_config = params.call.call_bitrate_config;
- CreateCalls(call_config, call_config);
- test::LayerFilteringTransport send_transport(
- params_.pipe, sender_call_.get(), kPayloadTypeVP8, kPayloadTypeVP9,
- params_.video.selected_tl, params_.ss.selected_sl, payload_type_map_);
+ task_queue_.SendTask([this, &call_config, &send_transport,
+ &recv_transport]() {
+ CreateCalls(call_config, call_config);
- test::DirectTransport recv_transport(params_.pipe, receiver_call_.get(),
- payload_type_map_);
+ send_transport = rtc::MakeUnique<test::LayerFilteringTransport>(
+ &task_queue_, params_.pipe, sender_call_.get(), kPayloadTypeVP8,
+ kPayloadTypeVP9, params_.video.selected_tl, params_.ss.selected_sl,
+ payload_type_map_);
+
+ recv_transport = rtc::MakeUnique<test::DirectTransport>(
+ &task_queue_, params_.pipe, receiver_call_.get(), payload_type_map_);
+ });
std::string graph_title = params_.analyzer.graph_title;
if (graph_title.empty())
graph_title = VideoQualityTest::GenerateGraphTitle();
-
bool is_quick_test_enabled = field_trial::IsEnabled("WebRTC-QuickPerfTest");
- VideoAnalyzer analyzer(
- &send_transport, params_.analyzer.test_label,
+ analyzer = rtc::MakeUnique<VideoAnalyzer>(
+ send_transport.get(), params_.analyzer.test_label,
params_.analyzer.avg_psnr_threshold, params_.analyzer.avg_ssim_threshold,
is_quick_test_enabled
? kFramesSentInQuickTest
@@ -1820,82 +1833,91 @@
static_cast<size_t>(params_.ss.selected_stream), params.ss.selected_sl,
params_.video.selected_tl, is_quick_test_enabled, clock_,
params_.logging.rtp_dump_name);
- analyzer.SetCall(sender_call_.get());
- analyzer.SetReceiver(receiver_call_->Receiver());
- send_transport.SetReceiver(&analyzer);
- recv_transport.SetReceiver(sender_call_->Receiver());
- SetupVideo(&analyzer, &recv_transport);
- SetupThumbnails(&analyzer, &recv_transport);
- video_receive_configs_[params_.ss.selected_stream].renderer = &analyzer;
- video_send_config_.pre_encode_callback = analyzer.pre_encode_proxy();
- RTC_DCHECK(!video_send_config_.post_encode_callback);
- video_send_config_.post_encode_callback = analyzer.encode_timing_proxy();
+ task_queue_.SendTask([&]() {
+ analyzer->SetCall(sender_call_.get());
+ analyzer->SetReceiver(receiver_call_->Receiver());
+ send_transport->SetReceiver(analyzer.get());
+ recv_transport->SetReceiver(sender_call_->Receiver());
- SetupScreenshareOrSVC();
+ SetupVideo(analyzer.get(), recv_transport.get());
+ SetupThumbnails(analyzer.get(), recv_transport.get());
+ video_receive_configs_[params_.ss.selected_stream].renderer =
+ analyzer.get();
+ video_send_config_.pre_encode_callback = analyzer->pre_encode_proxy();
+ RTC_DCHECK(!video_send_config_.post_encode_callback);
+ video_send_config_.post_encode_callback = analyzer->encode_timing_proxy();
- CreateFlexfecStreams();
- CreateVideoStreams();
- analyzer.SetSendStream(video_send_stream_);
- if (video_receive_streams_.size() == 1)
- analyzer.SetReceiveStream(video_receive_streams_[0]);
+ SetupScreenshareOrSVC();
- video_send_stream_->SetSource(analyzer.OutputInterface(),
- degradation_preference_);
+ CreateFlexfecStreams();
+ CreateVideoStreams();
+ analyzer->SetSendStream(video_send_stream_);
+ if (video_receive_streams_.size() == 1)
+ analyzer->SetReceiveStream(video_receive_streams_[0]);
- SetupThumbnailCapturers(params_.call.num_thumbnails);
- for (size_t i = 0; i < thumbnail_send_streams_.size(); ++i) {
- thumbnail_send_streams_[i]->SetSource(thumbnail_capturers_[i].get(),
- degradation_preference_);
- }
+ video_send_stream_->SetSource(analyzer->OutputInterface(),
+ degradation_preference_);
- CreateCapturer();
+ SetupThumbnailCapturers(params_.call.num_thumbnails);
+ for (size_t i = 0; i < thumbnail_send_streams_.size(); ++i) {
+ thumbnail_send_streams_[i]->SetSource(thumbnail_capturers_[i].get(),
+ degradation_preference_);
+ }
- analyzer.SetSource(video_capturer_.get(), params_.ss.infer_streams);
+ CreateCapturer();
- StartEncodedFrameLogs(video_send_stream_);
- StartEncodedFrameLogs(video_receive_streams_[params_.ss.selected_stream]);
- video_send_stream_->Start();
- for (VideoSendStream* thumbnail_send_stream : thumbnail_send_streams_)
- thumbnail_send_stream->Start();
- for (VideoReceiveStream* receive_stream : video_receive_streams_)
- receive_stream->Start();
- for (VideoReceiveStream* thumbnail_receive_stream :
- thumbnail_receive_streams_)
- thumbnail_receive_stream->Start();
+ analyzer->SetSource(video_capturer_.get(), params_.ss.infer_streams);
- analyzer.StartMeasuringCpuProcessTime();
+ StartEncodedFrameLogs(video_send_stream_);
+ StartEncodedFrameLogs(video_receive_streams_[params_.ss.selected_stream]);
+ video_send_stream_->Start();
+ for (VideoSendStream* thumbnail_send_stream : thumbnail_send_streams_)
+ thumbnail_send_stream->Start();
+ for (VideoReceiveStream* receive_stream : video_receive_streams_)
+ receive_stream->Start();
+ for (VideoReceiveStream* thumbnail_receive_stream :
+ thumbnail_receive_streams_)
+ thumbnail_receive_stream->Start();
- video_capturer_->Start();
- for (std::unique_ptr<test::VideoCapturer>& video_caputurer :
- thumbnail_capturers_) {
- video_caputurer->Start();
- }
+ analyzer->StartMeasuringCpuProcessTime();
- analyzer.Wait();
+ video_capturer_->Start();
+ for (std::unique_ptr<test::VideoCapturer>& video_caputurer :
+ thumbnail_capturers_) {
+ video_caputurer->Start();
+ }
+ });
- send_transport.StopSending();
- recv_transport.StopSending();
+ analyzer->Wait();
- for (std::unique_ptr<test::VideoCapturer>& video_caputurer :
- thumbnail_capturers_)
- video_caputurer->Stop();
- video_capturer_->Stop();
- for (VideoReceiveStream* thumbnail_receive_stream :
- thumbnail_receive_streams_)
- thumbnail_receive_stream->Stop();
- for (VideoReceiveStream* receive_stream : video_receive_streams_)
- receive_stream->Stop();
- for (VideoSendStream* thumbnail_send_stream : thumbnail_send_streams_)
- thumbnail_send_stream->Stop();
- video_send_stream_->Stop();
+ task_queue_.SendTask([&]() {
+ for (std::unique_ptr<test::VideoCapturer>& video_caputurer :
+ thumbnail_capturers_)
+ video_caputurer->Stop();
+ video_capturer_->Stop();
+ for (VideoReceiveStream* thumbnail_receive_stream :
+ thumbnail_receive_streams_)
+ thumbnail_receive_stream->Stop();
+ for (VideoReceiveStream* receive_stream : video_receive_streams_)
+ receive_stream->Stop();
+ for (VideoSendStream* thumbnail_send_stream : thumbnail_send_streams_)
+ thumbnail_send_stream->Stop();
+ video_send_stream_->Stop();
- DestroyStreams();
- DestroyThumbnailStreams();
+ DestroyStreams();
+ DestroyThumbnailStreams();
- event_log_->StopLogging();
- if (graph_data_output_file)
- fclose(graph_data_output_file);
+ event_log_->StopLogging();
+ if (graph_data_output_file)
+ fclose(graph_data_output_file);
+
+ video_capturer_.reset();
+ send_transport.reset();
+ recv_transport.reset();
+
+ DestroyCalls();
+ });
}
void VideoQualityTest::SetupAudio(int send_channel_id,
@@ -1942,162 +1964,177 @@
}
void VideoQualityTest::RunWithRenderers(const Params& params) {
- params_ = params;
- CheckParams();
-
- // TODO(ivica): Remove bitrate_config and use the default Call::Config(), to
- // match the full stack tests.
- Call::Config call_config(event_log_.get());
- call_config.bitrate_config = params_.call.call_bitrate_config;
-
+ std::unique_ptr<test::LayerFilteringTransport> send_transport;
+ std::unique_ptr<test::DirectTransport> recv_transport;
::VoiceEngineState voe;
- rtc::scoped_refptr<webrtc::AudioProcessing> audio_processing(
- webrtc::AudioProcessing::Create());
-
- if (params_.audio.enabled) {
- CreateVoiceEngine(&voe, audio_processing.get(), decoder_factory_);
- AudioState::Config audio_state_config;
- audio_state_config.voice_engine = voe.voice_engine;
- audio_state_config.audio_mixer = AudioMixerImpl::Create();
- audio_state_config.audio_processing = audio_processing;
- call_config.audio_state = AudioState::Create(audio_state_config);
- }
-
- CreateCalls(call_config, call_config);
-
- // TODO(minyue): consider if this is a good transport even for audio only
- // calls.
- test::LayerFilteringTransport send_transport(
- params.pipe, sender_call_.get(), kPayloadTypeVP8, kPayloadTypeVP9,
- params.video.selected_tl, params_.ss.selected_sl, payload_type_map_);
-
- test::DirectTransport recv_transport(params_.pipe, receiver_call_.get(),
- payload_type_map_);
-
- // TODO(ivica): Use two calls to be able to merge with RunWithAnalyzer or at
- // least share as much code as possible. That way this test would also match
- // the full stack tests better.
- send_transport.SetReceiver(receiver_call_->Receiver());
- recv_transport.SetReceiver(sender_call_->Receiver());
-
std::unique_ptr<test::VideoRenderer> local_preview;
- std::vector<std::unique_ptr<test::VideoRenderer>> loopback_renderers_;
- if (params_.video.enabled) {
- // Create video renderers.
- local_preview.reset(test::VideoRenderer::Create(
- "Local Preview", params_.video.width, params_.video.height));
-
- const size_t selected_stream_id = params_.ss.selected_stream;
- const size_t num_streams = params_.ss.streams.size();
-
- if (selected_stream_id == num_streams) {
- for (size_t stream_id = 0; stream_id < num_streams; ++stream_id) {
- std::ostringstream oss;
- oss << "Loopback Video - Stream #" << static_cast<int>(stream_id);
- loopback_renderers_.emplace_back(test::VideoRenderer::Create(
- oss.str().c_str(), params_.ss.streams[stream_id].width,
- params_.ss.streams[stream_id].height));
- }
- } else {
- loopback_renderers_.emplace_back(test::VideoRenderer::Create(
- "Loopback Video", params_.ss.streams[selected_stream_id].width,
- params_.ss.streams[selected_stream_id].height));
- }
-
- SetupVideo(&send_transport, &recv_transport);
-
- video_send_config_.pre_encode_callback = local_preview.get();
- if (selected_stream_id == num_streams) {
- for (size_t stream_id = 0; stream_id < num_streams; ++stream_id) {
- video_receive_configs_[stream_id].renderer =
- loopback_renderers_[stream_id].get();
- if (params_.audio.enabled && params_.audio.sync_video)
- video_receive_configs_[stream_id].sync_group = kSyncGroup;
- }
- } else {
- video_receive_configs_[selected_stream_id].renderer =
- loopback_renderers_.back().get();
- if (params_.audio.enabled && params_.audio.sync_video)
- video_receive_configs_[selected_stream_id].sync_group = kSyncGroup;
- }
-
- if (params_.screenshare.enabled)
- SetupScreenshareOrSVC();
-
- CreateFlexfecStreams();
- CreateVideoStreams();
-
- CreateCapturer();
- video_send_stream_->SetSource(video_capturer_.get(),
- degradation_preference_);
- }
-
+ std::vector<std::unique_ptr<test::VideoRenderer>> loopback_renderers;
AudioReceiveStream* audio_receive_stream = nullptr;
- if (params_.audio.enabled) {
- SetupAudio(voe.send_channel_id, voe.receive_channel_id, &send_transport,
- &audio_receive_stream);
- }
- for (VideoReceiveStream* receive_stream : video_receive_streams_)
- StartEncodedFrameLogs(receive_stream);
- StartEncodedFrameLogs(video_send_stream_);
+ task_queue_.SendTask([&]() {
+ params_ = params;
+ CheckParams();
- // Start sending and receiving video.
- if (params_.video.enabled) {
- for (VideoReceiveStream* video_receive_stream : video_receive_streams_)
- video_receive_stream->Start();
+ // TODO(ivica): Remove bitrate_config and use the default Call::Config(), to
+ // match the full stack tests.
+ Call::Config call_config(event_log_.get());
+ call_config.bitrate_config = params_.call.call_bitrate_config;
- video_send_stream_->Start();
- video_capturer_->Start();
- }
+ rtc::scoped_refptr<webrtc::AudioProcessing> audio_processing(
+ webrtc::AudioProcessing::Create());
- if (params_.audio.enabled) {
- // Start receiving audio.
- audio_receive_stream->Start();
- EXPECT_EQ(0, voe.base->StartPlayout(voe.receive_channel_id));
+ if (params_.audio.enabled) {
+ CreateVoiceEngine(&voe, audio_processing.get(), decoder_factory_);
+ AudioState::Config audio_state_config;
+ audio_state_config.voice_engine = voe.voice_engine;
+ audio_state_config.audio_mixer = AudioMixerImpl::Create();
+ audio_state_config.audio_processing = audio_processing;
+ call_config.audio_state = AudioState::Create(audio_state_config);
+ }
- // Start sending audio.
- audio_send_stream_->Start();
- EXPECT_EQ(0, voe.base->StartSend(voe.send_channel_id));
- }
+ CreateCalls(call_config, call_config);
+
+ // TODO(minyue): consider if this is a good transport even for audio only
+ // calls.
+ send_transport = rtc::MakeUnique<test::LayerFilteringTransport>(
+ &task_queue_, params.pipe, sender_call_.get(), kPayloadTypeVP8,
+ kPayloadTypeVP9, params.video.selected_tl, params_.ss.selected_sl,
+ payload_type_map_);
+
+ recv_transport = rtc::MakeUnique<test::DirectTransport>(
+ &task_queue_, params_.pipe, receiver_call_.get(), payload_type_map_);
+
+ // TODO(ivica): Use two calls to be able to merge with RunWithAnalyzer or at
+ // least share as much code as possible. That way this test would also match
+ // the full stack tests better.
+ send_transport->SetReceiver(receiver_call_->Receiver());
+ recv_transport->SetReceiver(sender_call_->Receiver());
+
+ if (params_.video.enabled) {
+ // Create video renderers.
+ local_preview.reset(test::VideoRenderer::Create(
+ "Local Preview", params_.video.width, params_.video.height));
+
+ const size_t selected_stream_id = params_.ss.selected_stream;
+ const size_t num_streams = params_.ss.streams.size();
+
+ if (selected_stream_id == num_streams) {
+ for (size_t stream_id = 0; stream_id < num_streams; ++stream_id) {
+ std::ostringstream oss;
+ oss << "Loopback Video - Stream #" << static_cast<int>(stream_id);
+ loopback_renderers.emplace_back(test::VideoRenderer::Create(
+ oss.str().c_str(), params_.ss.streams[stream_id].width,
+ params_.ss.streams[stream_id].height));
+ }
+ } else {
+ loopback_renderers.emplace_back(test::VideoRenderer::Create(
+ "Loopback Video", params_.ss.streams[selected_stream_id].width,
+ params_.ss.streams[selected_stream_id].height));
+ }
+
+ SetupVideo(send_transport.get(), recv_transport.get());
+
+ video_send_config_.pre_encode_callback = local_preview.get();
+ if (selected_stream_id == num_streams) {
+ for (size_t stream_id = 0; stream_id < num_streams; ++stream_id) {
+ video_receive_configs_[stream_id].renderer =
+ loopback_renderers[stream_id].get();
+ if (params_.audio.enabled && params_.audio.sync_video)
+ video_receive_configs_[stream_id].sync_group = kSyncGroup;
+ }
+ } else {
+ video_receive_configs_[selected_stream_id].renderer =
+ loopback_renderers.back().get();
+ if (params_.audio.enabled && params_.audio.sync_video)
+ video_receive_configs_[selected_stream_id].sync_group = kSyncGroup;
+ }
+
+ if (params_.screenshare.enabled)
+ SetupScreenshareOrSVC();
+
+ CreateFlexfecStreams();
+ CreateVideoStreams();
+
+ CreateCapturer();
+ video_send_stream_->SetSource(video_capturer_.get(),
+ degradation_preference_);
+ }
+
+ if (params_.audio.enabled) {
+ SetupAudio(voe.send_channel_id, voe.receive_channel_id,
+ send_transport.get(), &audio_receive_stream);
+ }
+
+ for (VideoReceiveStream* receive_stream : video_receive_streams_)
+ StartEncodedFrameLogs(receive_stream);
+ StartEncodedFrameLogs(video_send_stream_);
+
+ // Start sending and receiving video.
+ if (params_.video.enabled) {
+ for (VideoReceiveStream* video_receive_stream : video_receive_streams_)
+ video_receive_stream->Start();
+
+ video_send_stream_->Start();
+ video_capturer_->Start();
+ }
+
+ if (params_.audio.enabled) {
+ // Start receiving audio.
+ audio_receive_stream->Start();
+ EXPECT_EQ(0, voe.base->StartPlayout(voe.receive_channel_id));
+
+ // Start sending audio.
+ audio_send_stream_->Start();
+ EXPECT_EQ(0, voe.base->StartSend(voe.send_channel_id));
+ }
+ });
test::PressEnterToContinue();
- if (params_.audio.enabled) {
- // Stop sending audio.
- EXPECT_EQ(0, voe.base->StopSend(voe.send_channel_id));
- audio_send_stream_->Stop();
+ task_queue_.SendTask([&]() {
+ if (params_.audio.enabled) {
+ // Stop sending audio.
+ EXPECT_EQ(0, voe.base->StopSend(voe.send_channel_id));
+ audio_send_stream_->Stop();
- // Stop receiving audio.
- EXPECT_EQ(0, voe.base->StopPlayout(voe.receive_channel_id));
- audio_receive_stream->Stop();
- sender_call_->DestroyAudioSendStream(audio_send_stream_);
- receiver_call_->DestroyAudioReceiveStream(audio_receive_stream);
- }
+ // Stop receiving audio.
+ EXPECT_EQ(0, voe.base->StopPlayout(voe.receive_channel_id));
+ audio_receive_stream->Stop();
+ sender_call_->DestroyAudioSendStream(audio_send_stream_);
+ receiver_call_->DestroyAudioReceiveStream(audio_receive_stream);
+ }
- // Stop receiving and sending video.
- if (params_.video.enabled) {
- video_capturer_->Stop();
- video_send_stream_->Stop();
- for (FlexfecReceiveStream* flexfec_receive_stream :
- flexfec_receive_streams_) {
- for (VideoReceiveStream* video_receive_stream : video_receive_streams_) {
- video_receive_stream->RemoveSecondarySink(flexfec_receive_stream);
+ // Stop receiving and sending video.
+ if (params_.video.enabled) {
+ video_capturer_->Stop();
+ video_send_stream_->Stop();
+ for (FlexfecReceiveStream* flexfec_receive_stream :
+ flexfec_receive_streams_) {
+ for (VideoReceiveStream* video_receive_stream :
+ video_receive_streams_) {
+ video_receive_stream->RemoveSecondarySink(flexfec_receive_stream);
+ }
+ receiver_call_->DestroyFlexfecReceiveStream(flexfec_receive_stream);
}
- receiver_call_->DestroyFlexfecReceiveStream(flexfec_receive_stream);
+ for (VideoReceiveStream* receive_stream : video_receive_streams_) {
+ receive_stream->Stop();
+ receiver_call_->DestroyVideoReceiveStream(receive_stream);
+ }
+ sender_call_->DestroyVideoSendStream(video_send_stream_);
}
- for (VideoReceiveStream* receive_stream : video_receive_streams_) {
- receive_stream->Stop();
- receiver_call_->DestroyVideoReceiveStream(receive_stream);
- }
- sender_call_->DestroyVideoSendStream(video_send_stream_);
- }
- send_transport.StopSending();
- recv_transport.StopSending();
+ video_capturer_.reset();
+ send_transport.reset();
+ recv_transport.reset();
- if (params_.audio.enabled)
- DestroyVoiceEngine(&voe);
+ if (params_.audio.enabled)
+ DestroyVoiceEngine(&voe);
+
+ local_preview.reset();
+ loopback_renderers.clear();
+
+ DestroyCalls();
+ });
}
void VideoQualityTest::StartEncodedFrameLogs(VideoSendStream* stream) {