Use SingleThreadedTaskQueue in DirectTransport
DirectTransport has so far used its own thread, which led to a different threading-model for in the unit-tests than is used in actual WebRTC. Because of that, some critical-sections that weren't truly necessary in WebRTC could not be replaced with thread-checks, because those checks failed in unit-tests.
This CL introduces SingleThreadedTaskQueue - a TaskQueue which guarantees to run all of its tasks on the same thread (rtc::TaskQueue doesn't guarantee that on Mac) - and uses that for DirectTransport. CLs based on top of this will uncomment thread-checks which had to be commented out before, and remove unnecessary critical-sections.
Future work would probably replace the thread-checkers by more sophisticated serialized-access checks, allowing us to move from the SingleThreadedTaskQueue to a normal TaskQueue.
Related implementation notes:
* This CL has made DirectTransport::StopSending() superfluous, and so it was deleted.
BUG=webrtc:8113, webrtc:7405, webrtc:8056, webrtc:8116
Review-Url: https://codereview.webrtc.org/2998923002
Cr-Commit-Position: refs/heads/master@{#19445}
diff --git a/webrtc/audio/BUILD.gn b/webrtc/audio/BUILD.gn
index 50a2485..8d8db4b 100644
--- a/webrtc/audio/BUILD.gn
+++ b/webrtc/audio/BUILD.gn
@@ -156,6 +156,7 @@
"../system_wrappers",
"../test:fake_audio_device",
"../test:field_trial",
+ "../test:single_threaded_task_queue",
"../test:test_common",
"../test:test_main",
"//testing/gmock",
diff --git a/webrtc/audio/test/audio_bwe_integration_test.cc b/webrtc/audio/test/audio_bwe_integration_test.cc
index c4f72de..265738c 100644
--- a/webrtc/audio/test/audio_bwe_integration_test.cc
+++ b/webrtc/audio/test/audio_bwe_integration_test.cc
@@ -48,15 +48,18 @@
send_audio_device_ = send_audio_device;
}
-test::PacketTransport* AudioBweTest::CreateSendTransport(Call* sender_call) {
+test::PacketTransport* AudioBweTest::CreateSendTransport(
+ SingleThreadedTaskQueueForTesting* task_queue,
+ Call* sender_call) {
return new test::PacketTransport(
- sender_call, this, test::PacketTransport::kSender,
+ task_queue, sender_call, this, test::PacketTransport::kSender,
test::CallTest::payload_type_map_, GetNetworkPipeConfig());
}
-test::PacketTransport* AudioBweTest::CreateReceiveTransport() {
+test::PacketTransport* AudioBweTest::CreateReceiveTransport(
+ SingleThreadedTaskQueueForTesting* task_queue) {
return new test::PacketTransport(
- nullptr, this, test::PacketTransport::kReceiver,
+ task_queue, nullptr, this, test::PacketTransport::kReceiver,
test::CallTest::payload_type_map_, GetNetworkPipeConfig());
}
diff --git a/webrtc/audio/test/audio_bwe_integration_test.h b/webrtc/audio/test/audio_bwe_integration_test.h
index 769603a..729b18f 100644
--- a/webrtc/audio/test/audio_bwe_integration_test.h
+++ b/webrtc/audio/test/audio_bwe_integration_test.h
@@ -15,6 +15,7 @@
#include "webrtc/test/call_test.h"
#include "webrtc/test/fake_audio_device.h"
+#include "webrtc/test/single_threaded_task_queue.h"
namespace webrtc {
namespace test {
@@ -38,8 +39,11 @@
test::FakeAudioDevice* send_audio_device,
test::FakeAudioDevice* recv_audio_device) override;
- test::PacketTransport* CreateSendTransport(Call* sender_call) override;
- test::PacketTransport* CreateReceiveTransport() override;
+ test::PacketTransport* CreateSendTransport(
+ SingleThreadedTaskQueueForTesting* task_queue,
+ Call* sender_call) override;
+ test::PacketTransport* CreateReceiveTransport(
+ SingleThreadedTaskQueueForTesting* task_queue) override;
void PerformTest() override;
diff --git a/webrtc/audio/test/low_bandwidth_audio_test.cc b/webrtc/audio/test/low_bandwidth_audio_test.cc
index 070253e..55f8621 100644
--- a/webrtc/audio/test/low_bandwidth_audio_test.cc
+++ b/webrtc/audio/test/low_bandwidth_audio_test.cc
@@ -86,15 +86,17 @@
}
test::PacketTransport* AudioQualityTest::CreateSendTransport(
+ SingleThreadedTaskQueueForTesting* task_queue,
Call* sender_call) {
return new test::PacketTransport(
- sender_call, this, test::PacketTransport::kSender,
+ task_queue, sender_call, this, test::PacketTransport::kSender,
test::CallTest::payload_type_map_, GetNetworkPipeConfig());
}
-test::PacketTransport* AudioQualityTest::CreateReceiveTransport() {
+test::PacketTransport* AudioQualityTest::CreateReceiveTransport(
+ SingleThreadedTaskQueueForTesting* task_queue) {
return new test::PacketTransport(
- nullptr, this, test::PacketTransport::kReceiver,
+ task_queue, nullptr, this, test::PacketTransport::kReceiver,
test::CallTest::payload_type_map_, GetNetworkPipeConfig());
}
diff --git a/webrtc/audio/test/low_bandwidth_audio_test.h b/webrtc/audio/test/low_bandwidth_audio_test.h
index eae650a..ae75707 100644
--- a/webrtc/audio/test/low_bandwidth_audio_test.h
+++ b/webrtc/audio/test/low_bandwidth_audio_test.h
@@ -41,8 +41,11 @@
test::FakeAudioDevice* send_audio_device,
test::FakeAudioDevice* recv_audio_device) override;
- test::PacketTransport* CreateSendTransport(Call* sender_call) override;
- test::PacketTransport* CreateReceiveTransport() override;
+ test::PacketTransport* CreateSendTransport(
+ SingleThreadedTaskQueueForTesting* task_queue,
+ Call* sender_call) override;
+ test::PacketTransport* CreateReceiveTransport(
+ SingleThreadedTaskQueueForTesting* task_queue) override;
void ModifyAudioConfigs(
AudioSendStream::Config* send_config,
diff --git a/webrtc/call/bitrate_estimator_tests.cc b/webrtc/call/bitrate_estimator_tests.cc
index de12ef7..5d4acbb 100644
--- a/webrtc/call/bitrate_estimator_tests.cc
+++ b/webrtc/call/bitrate_estimator_tests.cc
@@ -102,51 +102,55 @@
virtual ~BitrateEstimatorTest() { EXPECT_TRUE(streams_.empty()); }
virtual void SetUp() {
- Call::Config config(event_log_.get());
- receiver_call_.reset(Call::Create(config));
- sender_call_.reset(Call::Create(config));
+ task_queue_.SendTask([this]() {
+ Call::Config config(event_log_.get());
+ receiver_call_.reset(Call::Create(config));
+ sender_call_.reset(Call::Create(config));
- send_transport_.reset(
- new test::DirectTransport(sender_call_.get(), payload_type_map_));
- send_transport_->SetReceiver(receiver_call_->Receiver());
- receive_transport_.reset(
- new test::DirectTransport(receiver_call_.get(), payload_type_map_));
- receive_transport_->SetReceiver(sender_call_->Receiver());
+ send_transport_.reset(new test::DirectTransport(
+ &task_queue_, sender_call_.get(), payload_type_map_));
+ send_transport_->SetReceiver(receiver_call_->Receiver());
+ receive_transport_.reset(new test::DirectTransport(
+ &task_queue_, receiver_call_.get(), payload_type_map_));
+ receive_transport_->SetReceiver(sender_call_->Receiver());
- video_send_config_ = VideoSendStream::Config(send_transport_.get());
- video_send_config_.rtp.ssrcs.push_back(kVideoSendSsrcs[0]);
- // Encoders will be set separately per stream.
- video_send_config_.encoder_settings.encoder = nullptr;
- video_send_config_.encoder_settings.payload_name = "FAKE";
- video_send_config_.encoder_settings.payload_type =
- kFakeVideoSendPayloadType;
- test::FillEncoderConfiguration(1, &video_encoder_config_);
+ video_send_config_ = VideoSendStream::Config(send_transport_.get());
+ video_send_config_.rtp.ssrcs.push_back(kVideoSendSsrcs[0]);
+ // Encoders will be set separately per stream.
+ video_send_config_.encoder_settings.encoder = nullptr;
+ video_send_config_.encoder_settings.payload_name = "FAKE";
+ video_send_config_.encoder_settings.payload_type =
+ kFakeVideoSendPayloadType;
+ test::FillEncoderConfiguration(1, &video_encoder_config_);
- receive_config_ = VideoReceiveStream::Config(receive_transport_.get());
- // receive_config_.decoders will be set by every stream separately.
- receive_config_.rtp.remote_ssrc = video_send_config_.rtp.ssrcs[0];
- receive_config_.rtp.local_ssrc = kReceiverLocalVideoSsrc;
- receive_config_.rtp.remb = true;
- receive_config_.rtp.extensions.push_back(
- RtpExtension(RtpExtension::kTimestampOffsetUri, kTOFExtensionId));
- receive_config_.rtp.extensions.push_back(
- RtpExtension(RtpExtension::kAbsSendTimeUri, kASTExtensionId));
+ receive_config_ = VideoReceiveStream::Config(receive_transport_.get());
+ // receive_config_.decoders will be set by every stream separately.
+ receive_config_.rtp.remote_ssrc = video_send_config_.rtp.ssrcs[0];
+ receive_config_.rtp.local_ssrc = kReceiverLocalVideoSsrc;
+ receive_config_.rtp.remb = true;
+ receive_config_.rtp.extensions.push_back(
+ RtpExtension(RtpExtension::kTimestampOffsetUri, kTOFExtensionId));
+ receive_config_.rtp.extensions.push_back(
+ RtpExtension(RtpExtension::kAbsSendTimeUri, kASTExtensionId));
+ });
}
virtual void TearDown() {
- std::for_each(streams_.begin(), streams_.end(),
- std::mem_fun(&Stream::StopSending));
+ task_queue_.SendTask([this]() {
+ std::for_each(streams_.begin(), streams_.end(),
+ std::mem_fun(&Stream::StopSending));
- send_transport_->StopSending();
- receive_transport_->StopSending();
+ while (!streams_.empty()) {
+ delete streams_.back();
+ streams_.pop_back();
+ }
- while (!streams_.empty()) {
- delete streams_.back();
- streams_.pop_back();
- }
+ send_transport_.reset();
+ receive_transport_.reset();
- receiver_call_.reset();
- sender_call_.reset();
+ receiver_call_.reset();
+ sender_call_.reset();
+ });
}
protected:
@@ -241,66 +245,80 @@
"RemoteBitrateEstimatorSingleStream: Instantiating.";
TEST_F(BitrateEstimatorTest, InstantiatesTOFPerDefaultForVideo) {
- video_send_config_.rtp.extensions.push_back(
- RtpExtension(RtpExtension::kTimestampOffsetUri, kTOFExtensionId));
- receiver_log_.PushExpectedLogLine(kSingleStreamLog);
- receiver_log_.PushExpectedLogLine(kSingleStreamLog);
- streams_.push_back(new Stream(this));
+ task_queue_.SendTask([this]() {
+ video_send_config_.rtp.extensions.push_back(
+ RtpExtension(RtpExtension::kTimestampOffsetUri, kTOFExtensionId));
+ receiver_log_.PushExpectedLogLine(kSingleStreamLog);
+ receiver_log_.PushExpectedLogLine(kSingleStreamLog);
+ streams_.push_back(new Stream(this));
+ });
EXPECT_TRUE(receiver_log_.Wait());
}
TEST_F(BitrateEstimatorTest, ImmediatelySwitchToASTForVideo) {
- video_send_config_.rtp.extensions.push_back(
- RtpExtension(RtpExtension::kAbsSendTimeUri, kASTExtensionId));
- receiver_log_.PushExpectedLogLine(kSingleStreamLog);
- receiver_log_.PushExpectedLogLine(kSingleStreamLog);
- receiver_log_.PushExpectedLogLine("Switching to absolute send time RBE.");
- receiver_log_.PushExpectedLogLine(kAbsSendTimeLog);
- streams_.push_back(new Stream(this));
+ task_queue_.SendTask([this]() {
+ video_send_config_.rtp.extensions.push_back(
+ RtpExtension(RtpExtension::kAbsSendTimeUri, kASTExtensionId));
+ receiver_log_.PushExpectedLogLine(kSingleStreamLog);
+ receiver_log_.PushExpectedLogLine(kSingleStreamLog);
+ receiver_log_.PushExpectedLogLine("Switching to absolute send time RBE.");
+ receiver_log_.PushExpectedLogLine(kAbsSendTimeLog);
+ streams_.push_back(new Stream(this));
+ });
EXPECT_TRUE(receiver_log_.Wait());
}
TEST_F(BitrateEstimatorTest, SwitchesToASTForVideo) {
- video_send_config_.rtp.extensions.push_back(
- RtpExtension(RtpExtension::kTimestampOffsetUri, kTOFExtensionId));
- receiver_log_.PushExpectedLogLine(kSingleStreamLog);
- receiver_log_.PushExpectedLogLine(kSingleStreamLog);
- streams_.push_back(new Stream(this));
+ task_queue_.SendTask([this]() {
+ video_send_config_.rtp.extensions.push_back(
+ RtpExtension(RtpExtension::kTimestampOffsetUri, kTOFExtensionId));
+ receiver_log_.PushExpectedLogLine(kSingleStreamLog);
+ receiver_log_.PushExpectedLogLine(kSingleStreamLog);
+ streams_.push_back(new Stream(this));
+ });
EXPECT_TRUE(receiver_log_.Wait());
- video_send_config_.rtp.extensions[0] =
- RtpExtension(RtpExtension::kAbsSendTimeUri, kASTExtensionId);
- receiver_log_.PushExpectedLogLine("Switching to absolute send time RBE.");
- receiver_log_.PushExpectedLogLine(kAbsSendTimeLog);
- streams_.push_back(new Stream(this));
+ task_queue_.SendTask([this]() {
+ video_send_config_.rtp.extensions[0] =
+ RtpExtension(RtpExtension::kAbsSendTimeUri, kASTExtensionId);
+ receiver_log_.PushExpectedLogLine("Switching to absolute send time RBE.");
+ receiver_log_.PushExpectedLogLine(kAbsSendTimeLog);
+ streams_.push_back(new Stream(this));
+ });
EXPECT_TRUE(receiver_log_.Wait());
}
// This test is flaky. See webrtc:5790.
TEST_F(BitrateEstimatorTest, DISABLED_SwitchesToASTThenBackToTOFForVideo) {
- video_send_config_.rtp.extensions.push_back(
- RtpExtension(RtpExtension::kTimestampOffsetUri, kTOFExtensionId));
- receiver_log_.PushExpectedLogLine(kSingleStreamLog);
- receiver_log_.PushExpectedLogLine(kAbsSendTimeLog);
- receiver_log_.PushExpectedLogLine(kSingleStreamLog);
- streams_.push_back(new Stream(this));
+ task_queue_.SendTask([this]() {
+ video_send_config_.rtp.extensions.push_back(
+ RtpExtension(RtpExtension::kTimestampOffsetUri, kTOFExtensionId));
+ receiver_log_.PushExpectedLogLine(kSingleStreamLog);
+ receiver_log_.PushExpectedLogLine(kAbsSendTimeLog);
+ receiver_log_.PushExpectedLogLine(kSingleStreamLog);
+ streams_.push_back(new Stream(this));
+ });
EXPECT_TRUE(receiver_log_.Wait());
- video_send_config_.rtp.extensions[0] =
- RtpExtension(RtpExtension::kAbsSendTimeUri, kASTExtensionId);
- receiver_log_.PushExpectedLogLine(kAbsSendTimeLog);
- receiver_log_.PushExpectedLogLine("Switching to absolute send time RBE.");
- streams_.push_back(new Stream(this));
+ task_queue_.SendTask([this]() {
+ video_send_config_.rtp.extensions[0] =
+ RtpExtension(RtpExtension::kAbsSendTimeUri, kASTExtensionId);
+ receiver_log_.PushExpectedLogLine(kAbsSendTimeLog);
+ receiver_log_.PushExpectedLogLine("Switching to absolute send time RBE.");
+ streams_.push_back(new Stream(this));
+ });
EXPECT_TRUE(receiver_log_.Wait());
- video_send_config_.rtp.extensions[0] =
- RtpExtension(RtpExtension::kTimestampOffsetUri, kTOFExtensionId);
- receiver_log_.PushExpectedLogLine(kAbsSendTimeLog);
- receiver_log_.PushExpectedLogLine(
- "WrappingBitrateEstimator: Switching to transmission time offset RBE.");
- streams_.push_back(new Stream(this));
- streams_[0]->StopSending();
- streams_[1]->StopSending();
+ task_queue_.SendTask([this]() {
+ video_send_config_.rtp.extensions[0] =
+ RtpExtension(RtpExtension::kTimestampOffsetUri, kTOFExtensionId);
+ receiver_log_.PushExpectedLogLine(kAbsSendTimeLog);
+ receiver_log_.PushExpectedLogLine(
+ "WrappingBitrateEstimator: Switching to transmission time offset RBE.");
+ streams_.push_back(new Stream(this));
+ streams_[0]->StopSending();
+ streams_[1]->StopSending();
+ });
EXPECT_TRUE(receiver_log_.Wait());
}
} // namespace webrtc
diff --git a/webrtc/call/call_perf_tests.cc b/webrtc/call/call_perf_tests.cc
index f3a2058..530ad29 100644
--- a/webrtc/call/call_perf_tests.cc
+++ b/webrtc/call/call_perf_tests.cc
@@ -21,7 +21,7 @@
#include "webrtc/modules/audio_mixer/audio_mixer_impl.h"
#include "webrtc/modules/rtp_rtcp/include/rtp_header_parser.h"
#include "webrtc/rtc_base/checks.h"
-#include "webrtc/rtc_base/constructormagic.h"
+#include "webrtc/rtc_base/ptr_util.h"
#include "webrtc/rtc_base/thread_annotations.h"
#include "webrtc/system_wrappers/include/metrics_default.h"
#include "webrtc/test/call_test.h"
@@ -35,6 +35,7 @@
#include "webrtc/test/frame_generator_capturer.h"
#include "webrtc/test/gtest.h"
#include "webrtc/test/rtp_rtcp_observer.h"
+#include "webrtc/test/single_threaded_task_queue.h"
#include "webrtc/test/testsupport/fileutils.h"
#include "webrtc/test/testsupport/perf_test.h"
#include "webrtc/video/transport_adapter.h"
@@ -143,146 +144,167 @@
const uint32_t kAudioSendSsrc = 1234;
const uint32_t kAudioRecvSsrc = 5678;
- metrics::Reset();
- rtc::scoped_refptr<AudioProcessing> audio_processing =
- AudioProcessing::Create();
- VoiceEngine* voice_engine = VoiceEngine::Create();
- VoEBase* voe_base = VoEBase::GetInterface(voice_engine);
- FakeAudioDevice fake_audio_device(
- FakeAudioDevice::CreatePulsedNoiseCapturer(256, 48000),
- FakeAudioDevice::CreateDiscardRenderer(48000), audio_rtp_speed);
- EXPECT_EQ(0, voe_base->Init(&fake_audio_device, audio_processing.get(),
- decoder_factory_));
- VoEBase::ChannelConfig config;
- config.enable_voice_pacing = true;
- int send_channel_id = voe_base->CreateChannel(config);
- int recv_channel_id = voe_base->CreateChannel();
-
- AudioState::Config send_audio_state_config;
- send_audio_state_config.voice_engine = voice_engine;
- send_audio_state_config.audio_mixer = AudioMixerImpl::Create();
- send_audio_state_config.audio_processing = audio_processing;
- Call::Config sender_config(event_log_.get());
-
- sender_config.audio_state = AudioState::Create(send_audio_state_config);
- Call::Config receiver_config(event_log_.get());
- receiver_config.audio_state = sender_config.audio_state;
- CreateCalls(sender_config, receiver_config);
-
-
- VideoRtcpAndSyncObserver observer(Clock::GetRealTimeClock());
+ int send_channel_id;
+ int recv_channel_id;
FakeNetworkPipe::Config audio_net_config;
audio_net_config.queue_delay_ms = 500;
audio_net_config.loss_percent = 5;
+ rtc::scoped_refptr<AudioProcessing> audio_processing;
+ VoiceEngine* voice_engine;
+ VoEBase* voe_base;
+ std::unique_ptr<FakeAudioDevice> fake_audio_device;
+ VideoRtcpAndSyncObserver observer(Clock::GetRealTimeClock());
+
std::map<uint8_t, MediaType> audio_pt_map;
std::map<uint8_t, MediaType> video_pt_map;
- std::copy_if(std::begin(payload_type_map_), std::end(payload_type_map_),
- std::inserter(audio_pt_map, audio_pt_map.end()),
- [](const std::pair<const uint8_t, MediaType>& pair) {
- return pair.second == MediaType::AUDIO;
- });
- std::copy_if(std::begin(payload_type_map_), std::end(payload_type_map_),
- std::inserter(video_pt_map, video_pt_map.end()),
- [](const std::pair<const uint8_t, MediaType>& pair) {
- return pair.second == MediaType::VIDEO;
- });
- test::PacketTransport audio_send_transport(sender_call_.get(), &observer,
- test::PacketTransport::kSender,
- audio_pt_map, audio_net_config);
- audio_send_transport.SetReceiver(receiver_call_->Receiver());
+ std::unique_ptr<test::PacketTransport> audio_send_transport;
+ std::unique_ptr<test::PacketTransport> video_send_transport;
+ std::unique_ptr<test::PacketTransport> receive_transport;
- test::PacketTransport video_send_transport(
- sender_call_.get(), &observer, test::PacketTransport::kSender,
- video_pt_map, FakeNetworkPipe::Config());
- video_send_transport.SetReceiver(receiver_call_->Receiver());
-
- test::PacketTransport receive_transport(
- receiver_call_.get(), &observer, test::PacketTransport::kReceiver,
- payload_type_map_, FakeNetworkPipe::Config());
- receive_transport.SetReceiver(sender_call_->Receiver());
-
- CreateSendConfig(1, 0, 0, &video_send_transport);
- CreateMatchingReceiveConfigs(&receive_transport);
-
- AudioSendStream::Config audio_send_config(&audio_send_transport);
- audio_send_config.voe_channel_id = send_channel_id;
- audio_send_config.rtp.ssrc = kAudioSendSsrc;
- audio_send_config.send_codec_spec =
- rtc::Optional<AudioSendStream::Config::SendCodecSpec>(
- {kAudioSendPayloadType, {"ISAC", 16000, 1}});
- audio_send_config.encoder_factory = CreateBuiltinAudioEncoderFactory();
- AudioSendStream* audio_send_stream =
- sender_call_->CreateAudioSendStream(audio_send_config);
-
- video_send_config_.rtp.nack.rtp_history_ms = kNackRtpHistoryMs;
- if (fec == FecMode::kOn) {
- video_send_config_.rtp.ulpfec.red_payload_type = kRedPayloadType;
- video_send_config_.rtp.ulpfec.ulpfec_payload_type = kUlpfecPayloadType;
- video_receive_configs_[0].rtp.ulpfec.red_payload_type = kRedPayloadType;
- video_receive_configs_[0].rtp.ulpfec.ulpfec_payload_type =
- kUlpfecPayloadType;
- }
- video_receive_configs_[0].rtp.nack.rtp_history_ms = 1000;
- video_receive_configs_[0].renderer = &observer;
- video_receive_configs_[0].sync_group = kSyncGroup;
-
- AudioReceiveStream::Config audio_recv_config;
- audio_recv_config.rtp.remote_ssrc = kAudioSendSsrc;
- audio_recv_config.rtp.local_ssrc = kAudioRecvSsrc;
- audio_recv_config.voe_channel_id = recv_channel_id;
- audio_recv_config.sync_group = kSyncGroup;
- audio_recv_config.decoder_factory = decoder_factory_;
- audio_recv_config.decoder_map = {{kAudioSendPayloadType, {"ISAC", 16000, 1}}};
-
+ AudioSendStream* audio_send_stream;
AudioReceiveStream* audio_receive_stream;
+ std::unique_ptr<DriftingClock> drifting_clock;
- if (create_first == CreateOrder::kAudioFirst) {
- audio_receive_stream =
- receiver_call_->CreateAudioReceiveStream(audio_recv_config);
- CreateVideoStreams();
- } else {
- CreateVideoStreams();
- audio_receive_stream =
- receiver_call_->CreateAudioReceiveStream(audio_recv_config);
- }
- EXPECT_EQ(1u, video_receive_streams_.size());
- observer.set_receive_stream(video_receive_streams_[0]);
- DriftingClock drifting_clock(clock_, video_ntp_speed);
- CreateFrameGeneratorCapturerWithDrift(&drifting_clock, video_rtp_speed,
- kDefaultFramerate, kDefaultWidth,
- kDefaultHeight);
+ task_queue_.SendTask([&]() {
+ metrics::Reset();
+ audio_processing = AudioProcessing::Create();
+ voice_engine = VoiceEngine::Create();
+ voe_base = VoEBase::GetInterface(voice_engine);
+ fake_audio_device = rtc::MakeUnique<FakeAudioDevice>(
+ FakeAudioDevice::CreatePulsedNoiseCapturer(256, 48000),
+ FakeAudioDevice::CreateDiscardRenderer(48000), audio_rtp_speed);
+ EXPECT_EQ(0, voe_base->Init(fake_audio_device.get(), audio_processing.get(),
+ decoder_factory_));
+ VoEBase::ChannelConfig config;
+ config.enable_voice_pacing = true;
+ send_channel_id = voe_base->CreateChannel(config);
+ recv_channel_id = voe_base->CreateChannel();
- Start();
+ AudioState::Config send_audio_state_config;
+ send_audio_state_config.voice_engine = voice_engine;
+ send_audio_state_config.audio_mixer = AudioMixerImpl::Create();
+ send_audio_state_config.audio_processing = audio_processing;
+ Call::Config sender_config(event_log_.get());
- audio_send_stream->Start();
- audio_receive_stream->Start();
+ sender_config.audio_state = AudioState::Create(send_audio_state_config);
+ Call::Config receiver_config(event_log_.get());
+ receiver_config.audio_state = sender_config.audio_state;
+ CreateCalls(sender_config, receiver_config);
+
+ std::copy_if(std::begin(payload_type_map_), std::end(payload_type_map_),
+ std::inserter(audio_pt_map, audio_pt_map.end()),
+ [](const std::pair<const uint8_t, MediaType>& pair) {
+ return pair.second == MediaType::AUDIO;
+ });
+ std::copy_if(std::begin(payload_type_map_), std::end(payload_type_map_),
+ std::inserter(video_pt_map, video_pt_map.end()),
+ [](const std::pair<const uint8_t, MediaType>& pair) {
+ return pair.second == MediaType::VIDEO;
+ });
+
+ audio_send_transport = rtc::MakeUnique<test::PacketTransport>(
+ &task_queue_, sender_call_.get(), &observer,
+ test::PacketTransport::kSender, audio_pt_map, audio_net_config);
+ audio_send_transport->SetReceiver(receiver_call_->Receiver());
+
+ video_send_transport = rtc::MakeUnique<test::PacketTransport>(
+ &task_queue_, sender_call_.get(), &observer,
+ test::PacketTransport::kSender, video_pt_map,
+ FakeNetworkPipe::Config());
+ video_send_transport->SetReceiver(receiver_call_->Receiver());
+
+ receive_transport = rtc::MakeUnique<test::PacketTransport>(
+ &task_queue_, receiver_call_.get(), &observer,
+ test::PacketTransport::kReceiver, payload_type_map_,
+ FakeNetworkPipe::Config());
+ receive_transport->SetReceiver(sender_call_->Receiver());
+
+ CreateSendConfig(1, 0, 0, video_send_transport.get());
+ CreateMatchingReceiveConfigs(receive_transport.get());
+
+ AudioSendStream::Config audio_send_config(audio_send_transport.get());
+ audio_send_config.voe_channel_id = send_channel_id;
+ audio_send_config.rtp.ssrc = kAudioSendSsrc;
+ audio_send_config.send_codec_spec =
+ rtc::Optional<AudioSendStream::Config::SendCodecSpec>(
+ {kAudioSendPayloadType, {"ISAC", 16000, 1}});
+ audio_send_config.encoder_factory = CreateBuiltinAudioEncoderFactory();
+ audio_send_stream = sender_call_->CreateAudioSendStream(audio_send_config);
+
+ video_send_config_.rtp.nack.rtp_history_ms = kNackRtpHistoryMs;
+ if (fec == FecMode::kOn) {
+ video_send_config_.rtp.ulpfec.red_payload_type = kRedPayloadType;
+ video_send_config_.rtp.ulpfec.ulpfec_payload_type = kUlpfecPayloadType;
+ video_receive_configs_[0].rtp.ulpfec.red_payload_type = kRedPayloadType;
+ video_receive_configs_[0].rtp.ulpfec.ulpfec_payload_type =
+ kUlpfecPayloadType;
+ }
+ video_receive_configs_[0].rtp.nack.rtp_history_ms = 1000;
+ video_receive_configs_[0].renderer = &observer;
+ video_receive_configs_[0].sync_group = kSyncGroup;
+
+ AudioReceiveStream::Config audio_recv_config;
+ audio_recv_config.rtp.remote_ssrc = kAudioSendSsrc;
+ audio_recv_config.rtp.local_ssrc = kAudioRecvSsrc;
+ audio_recv_config.voe_channel_id = recv_channel_id;
+ audio_recv_config.sync_group = kSyncGroup;
+ audio_recv_config.decoder_factory = decoder_factory_;
+ audio_recv_config.decoder_map = {
+ {kAudioSendPayloadType, {"ISAC", 16000, 1}}};
+
+ if (create_first == CreateOrder::kAudioFirst) {
+ audio_receive_stream =
+ receiver_call_->CreateAudioReceiveStream(audio_recv_config);
+ CreateVideoStreams();
+ } else {
+ CreateVideoStreams();
+ audio_receive_stream =
+ receiver_call_->CreateAudioReceiveStream(audio_recv_config);
+ }
+ EXPECT_EQ(1u, video_receive_streams_.size());
+ observer.set_receive_stream(video_receive_streams_[0]);
+ drifting_clock = rtc::MakeUnique<DriftingClock>(clock_, video_ntp_speed);
+ CreateFrameGeneratorCapturerWithDrift(drifting_clock.get(), video_rtp_speed,
+ kDefaultFramerate, kDefaultWidth,
+ kDefaultHeight);
+
+ Start();
+
+ audio_send_stream->Start();
+ audio_receive_stream->Start();
+ });
EXPECT_TRUE(observer.Wait())
<< "Timed out while waiting for audio and video to be synchronized.";
- audio_send_stream->Stop();
- audio_receive_stream->Stop();
+ task_queue_.SendTask([&]() {
+ audio_send_stream->Stop();
+ audio_receive_stream->Stop();
- Stop();
- video_send_transport.StopSending();
- audio_send_transport.StopSending();
- receive_transport.StopSending();
+ Stop();
- DestroyStreams();
+ DestroyStreams();
- sender_call_->DestroyAudioSendStream(audio_send_stream);
- receiver_call_->DestroyAudioReceiveStream(audio_receive_stream);
+ video_send_transport.reset();
+ audio_send_transport.reset();
+ receive_transport.reset();
- voe_base->DeleteChannel(send_channel_id);
- voe_base->DeleteChannel(recv_channel_id);
- voe_base->Release();
+ sender_call_->DestroyAudioSendStream(audio_send_stream);
+ receiver_call_->DestroyAudioReceiveStream(audio_receive_stream);
- DestroyCalls();
+ voe_base->DeleteChannel(send_channel_id);
+ voe_base->DeleteChannel(recv_channel_id);
+ voe_base->Release();
- VoiceEngine::Delete(voice_engine);
+ DestroyCalls();
+
+ VoiceEngine::Delete(voice_engine);
+
+ fake_audio_device.reset();
+ });
observer.PrintResults();
@@ -335,14 +357,17 @@
rtp_start_timestamp_(0) {}
private:
- test::PacketTransport* CreateSendTransport(Call* sender_call) override {
- return new test::PacketTransport(sender_call, this,
+ test::PacketTransport* CreateSendTransport(
+ test::SingleThreadedTaskQueueForTesting* task_queue,
+ Call* sender_call) override {
+ return new test::PacketTransport(task_queue, sender_call, this,
test::PacketTransport::kSender,
payload_type_map_, net_config_);
}
- test::PacketTransport* CreateReceiveTransport() override {
- return new test::PacketTransport(nullptr, this,
+ test::PacketTransport* CreateReceiveTransport(
+ test::SingleThreadedTaskQueueForTesting* task_queue) override {
+ return new test::PacketTransport(task_queue, nullptr, this,
test::PacketTransport::kReceiver,
payload_type_map_, net_config_);
}
diff --git a/webrtc/call/rampup_tests.cc b/webrtc/call/rampup_tests.cc
index a50abee..cfb5117 100644
--- a/webrtc/call/rampup_tests.cc
+++ b/webrtc/call/rampup_tests.cc
@@ -90,9 +90,11 @@
send_stream_ = send_stream;
}
-test::PacketTransport* RampUpTester::CreateSendTransport(Call* sender_call) {
+test::PacketTransport* RampUpTester::CreateSendTransport(
+ test::SingleThreadedTaskQueueForTesting* task_queue,
+ Call* sender_call) {
send_transport_ = new test::PacketTransport(
- sender_call, this, test::PacketTransport::kSender,
+ task_queue, sender_call, this, test::PacketTransport::kSender,
test::CallTest::payload_type_map_, forward_transport_config_);
return send_transport_;
}
diff --git a/webrtc/call/rampup_tests.h b/webrtc/call/rampup_tests.h
index eedef7b..8358822 100644
--- a/webrtc/call/rampup_tests.h
+++ b/webrtc/call/rampup_tests.h
@@ -84,7 +84,9 @@
void OnVideoStreamsCreated(
VideoSendStream* send_stream,
const std::vector<VideoReceiveStream*>& receive_streams) override;
- test::PacketTransport* CreateSendTransport(Call* sender_call) override;
+ test::PacketTransport* CreateSendTransport(
+ test::SingleThreadedTaskQueueForTesting* task_queue,
+ Call* sender_call) override;
void ModifyVideoConfigs(
VideoSendStream::Config* send_config,
std::vector<VideoReceiveStream::Config>* receive_configs,
diff --git a/webrtc/test/BUILD.gn b/webrtc/test/BUILD.gn
index 2db6120..f6baa27 100644
--- a/webrtc/test/BUILD.gn
+++ b/webrtc/test/BUILD.gn
@@ -287,6 +287,7 @@
"frame_generator_unittest.cc",
"rtp_file_reader_unittest.cc",
"rtp_file_writer_unittest.cc",
+ "single_threaded_task_queue_unittest.cc",
"testsupport/always_passing_unittest.cc",
"testsupport/metrics/video_metrics_unittest.cc",
"testsupport/packet_reader_unittest.cc",
@@ -401,8 +402,23 @@
"../call",
"../modules/rtp_rtcp",
"../rtc_base:rtc_base_approved",
+ "../rtc_base:sequenced_task_checker",
"../system_wrappers",
]
+ public_deps = [
+ ":single_threaded_task_queue",
+ ]
+}
+
+rtc_source_set("single_threaded_task_queue") {
+ testonly = true
+ sources = [
+ "single_threaded_task_queue.cc",
+ "single_threaded_task_queue.h",
+ ]
+ deps = [
+ "../rtc_base:rtc_base_approved",
+ ]
}
rtc_source_set("fake_audio_device") {
diff --git a/webrtc/test/call_test.cc b/webrtc/test/call_test.cc
index 51816d8..3efc022 100644
--- a/webrtc/test/call_test.cc
+++ b/webrtc/test/call_test.cc
@@ -18,8 +18,11 @@
#include "webrtc/config.h"
#include "webrtc/modules/audio_mixer/audio_mixer_impl.h"
#include "webrtc/rtc_base/checks.h"
+#include "webrtc/rtc_base/event.h"
+#include "webrtc/rtc_base/ptr_util.h"
#include "webrtc/test/testsupport/fileutils.h"
#include "webrtc/voice_engine/include/voe_base.h"
+
namespace webrtc {
namespace test {
@@ -41,112 +44,124 @@
num_flexfec_streams_(0),
decoder_factory_(CreateBuiltinAudioDecoderFactory()),
encoder_factory_(CreateBuiltinAudioEncoderFactory()),
+ task_queue_("CallTestTaskQueue"),
fake_send_audio_device_(nullptr),
fake_recv_audio_device_(nullptr) {}
CallTest::~CallTest() {
+ task_queue_.SendTask([this]() {
+ fake_send_audio_device_.reset();
+ fake_recv_audio_device_.reset();
+ frame_generator_capturer_.reset();
+ });
}
void CallTest::RunBaseTest(BaseTest* test) {
- num_video_streams_ = test->GetNumVideoStreams();
- num_audio_streams_ = test->GetNumAudioStreams();
- num_flexfec_streams_ = test->GetNumFlexfecStreams();
- RTC_DCHECK(num_video_streams_ > 0 || num_audio_streams_ > 0);
- Call::Config send_config(test->GetSenderCallConfig());
- if (num_audio_streams_ > 0) {
- CreateFakeAudioDevices(test->CreateCapturer(), test->CreateRenderer());
- test->OnFakeAudioDevicesCreated(fake_send_audio_device_.get(),
- fake_recv_audio_device_.get());
- apm_send_ = AudioProcessing::Create();
- apm_recv_ = AudioProcessing::Create();
- CreateVoiceEngines();
- AudioState::Config audio_state_config;
- audio_state_config.voice_engine = voe_send_.voice_engine;
- audio_state_config.audio_mixer = AudioMixerImpl::Create();
- audio_state_config.audio_processing = apm_send_;
- send_config.audio_state = AudioState::Create(audio_state_config);
- }
- CreateSenderCall(send_config);
- if (sender_call_transport_controller_ != nullptr) {
- test->OnRtpTransportControllerSendCreated(
- sender_call_transport_controller_);
- }
- if (test->ShouldCreateReceivers()) {
- Call::Config recv_config(test->GetReceiverCallConfig());
+ task_queue_.SendTask([this, test]() {
+ num_video_streams_ = test->GetNumVideoStreams();
+ num_audio_streams_ = test->GetNumAudioStreams();
+ num_flexfec_streams_ = test->GetNumFlexfecStreams();
+ RTC_DCHECK(num_video_streams_ > 0 || num_audio_streams_ > 0);
+ Call::Config send_config(test->GetSenderCallConfig());
if (num_audio_streams_ > 0) {
+ CreateFakeAudioDevices(test->CreateCapturer(), test->CreateRenderer());
+ test->OnFakeAudioDevicesCreated(fake_send_audio_device_.get(),
+ fake_recv_audio_device_.get());
+ apm_send_ = AudioProcessing::Create();
+ apm_recv_ = AudioProcessing::Create();
+ CreateVoiceEngines();
AudioState::Config audio_state_config;
- audio_state_config.voice_engine = voe_recv_.voice_engine;
+ audio_state_config.voice_engine = voe_send_.voice_engine;
audio_state_config.audio_mixer = AudioMixerImpl::Create();
- audio_state_config.audio_processing = apm_recv_;
- recv_config.audio_state = AudioState::Create(audio_state_config);
+ audio_state_config.audio_processing = apm_send_;
+ send_config.audio_state = AudioState::Create(audio_state_config);
}
- CreateReceiverCall(recv_config);
- }
- test->OnCallsCreated(sender_call_.get(), receiver_call_.get());
- receive_transport_.reset(test->CreateReceiveTransport());
- send_transport_.reset(test->CreateSendTransport(sender_call_.get()));
+ CreateSenderCall(send_config);
+ if (sender_call_transport_controller_ != nullptr) {
+ test->OnRtpTransportControllerSendCreated(
+ sender_call_transport_controller_);
+ }
+ if (test->ShouldCreateReceivers()) {
+ Call::Config recv_config(test->GetReceiverCallConfig());
+ if (num_audio_streams_ > 0) {
+ AudioState::Config audio_state_config;
+ audio_state_config.voice_engine = voe_recv_.voice_engine;
+ audio_state_config.audio_mixer = AudioMixerImpl::Create();
+ audio_state_config.audio_processing = apm_recv_;
+ recv_config.audio_state = AudioState::Create(audio_state_config);
+ }
+ CreateReceiverCall(recv_config);
+ }
+ test->OnCallsCreated(sender_call_.get(), receiver_call_.get());
+ receive_transport_.reset(test->CreateReceiveTransport(&task_queue_));
+ send_transport_.reset(
+ test->CreateSendTransport(&task_queue_, sender_call_.get()));
- if (test->ShouldCreateReceivers()) {
- send_transport_->SetReceiver(receiver_call_->Receiver());
- receive_transport_->SetReceiver(sender_call_->Receiver());
- if (num_video_streams_ > 0)
- receiver_call_->SignalChannelNetworkState(MediaType::VIDEO, kNetworkUp);
- if (num_audio_streams_ > 0)
- receiver_call_->SignalChannelNetworkState(MediaType::AUDIO, kNetworkUp);
- } else {
- // Sender-only call delivers to itself.
- send_transport_->SetReceiver(sender_call_->Receiver());
- receive_transport_->SetReceiver(nullptr);
- }
+ if (test->ShouldCreateReceivers()) {
+ send_transport_->SetReceiver(receiver_call_->Receiver());
+ receive_transport_->SetReceiver(sender_call_->Receiver());
+ if (num_video_streams_ > 0)
+ receiver_call_->SignalChannelNetworkState(MediaType::VIDEO, kNetworkUp);
+ if (num_audio_streams_ > 0)
+ receiver_call_->SignalChannelNetworkState(MediaType::AUDIO, kNetworkUp);
+ } else {
+ // Sender-only call delivers to itself.
+ send_transport_->SetReceiver(sender_call_->Receiver());
+ receive_transport_->SetReceiver(nullptr);
+ }
- CreateSendConfig(num_video_streams_, num_audio_streams_, num_flexfec_streams_,
- send_transport_.get());
- if (test->ShouldCreateReceivers()) {
- CreateMatchingReceiveConfigs(receive_transport_.get());
- }
- if (num_video_streams_ > 0) {
- test->ModifyVideoConfigs(&video_send_config_, &video_receive_configs_,
- &video_encoder_config_);
- }
- if (num_audio_streams_ > 0) {
- test->ModifyAudioConfigs(&audio_send_config_, &audio_receive_configs_);
- }
- if (num_flexfec_streams_ > 0) {
- test->ModifyFlexfecConfigs(&flexfec_receive_configs_);
- }
+ CreateSendConfig(num_video_streams_, num_audio_streams_,
+ num_flexfec_streams_, send_transport_.get());
+ if (test->ShouldCreateReceivers()) {
+ CreateMatchingReceiveConfigs(receive_transport_.get());
+ }
+ if (num_video_streams_ > 0) {
+ test->ModifyVideoConfigs(&video_send_config_, &video_receive_configs_,
+ &video_encoder_config_);
+ }
+ if (num_audio_streams_ > 0) {
+ test->ModifyAudioConfigs(&audio_send_config_, &audio_receive_configs_);
+ }
+ if (num_flexfec_streams_ > 0) {
+ test->ModifyFlexfecConfigs(&flexfec_receive_configs_);
+ }
- if (num_flexfec_streams_ > 0) {
- CreateFlexfecStreams();
- test->OnFlexfecStreamsCreated(flexfec_receive_streams_);
- }
- if (num_video_streams_ > 0) {
- CreateVideoStreams();
- test->OnVideoStreamsCreated(video_send_stream_, video_receive_streams_);
- }
- if (num_audio_streams_ > 0) {
- CreateAudioStreams();
- test->OnAudioStreamsCreated(audio_send_stream_, audio_receive_streams_);
- }
+ if (num_flexfec_streams_ > 0) {
+ CreateFlexfecStreams();
+ test->OnFlexfecStreamsCreated(flexfec_receive_streams_);
+ }
+ if (num_video_streams_ > 0) {
+ CreateVideoStreams();
+ test->OnVideoStreamsCreated(video_send_stream_, video_receive_streams_);
+ }
+ if (num_audio_streams_ > 0) {
+ CreateAudioStreams();
+ test->OnAudioStreamsCreated(audio_send_stream_, audio_receive_streams_);
+ }
- if (num_video_streams_ > 0) {
- int width = kDefaultWidth;
- int height = kDefaultHeight;
- int frame_rate = kDefaultFramerate;
- test->ModifyVideoCaptureStartResolution(&width, &height, &frame_rate);
- CreateFrameGeneratorCapturer(frame_rate, width, height);
- test->OnFrameGeneratorCapturerCreated(frame_generator_capturer_.get());
- }
+ if (num_video_streams_ > 0) {
+ int width = kDefaultWidth;
+ int height = kDefaultHeight;
+ int frame_rate = kDefaultFramerate;
+ test->ModifyVideoCaptureStartResolution(&width, &height, &frame_rate);
+ CreateFrameGeneratorCapturer(frame_rate, width, height);
+ test->OnFrameGeneratorCapturerCreated(frame_generator_capturer_.get());
+ }
- Start();
+ Start();
+ });
+
test->PerformTest();
- send_transport_->StopSending();
- receive_transport_->StopSending();
- Stop();
- DestroyStreams();
- DestroyCalls();
- if (num_audio_streams_ > 0)
- DestroyVoiceEngines();
+ task_queue_.SendTask([this]() {
+ Stop();
+ DestroyStreams();
+ send_transport_.reset();
+ receive_transport_.reset();
+ DestroyCalls();
+ if (num_audio_streams_ > 0)
+ DestroyVoiceEngines();
+ });
test->OnTestFinished();
}
@@ -517,16 +532,19 @@
void BaseTest::OnCallsCreated(Call* sender_call, Call* receiver_call) {
}
-test::PacketTransport* BaseTest::CreateSendTransport(Call* sender_call) {
- return new PacketTransport(sender_call, this, test::PacketTransport::kSender,
- CallTest::payload_type_map_,
- FakeNetworkPipe::Config());
+test::PacketTransport* BaseTest::CreateSendTransport(
+ SingleThreadedTaskQueueForTesting* task_queue,
+ Call* sender_call) {
+ return new PacketTransport(
+ task_queue, sender_call, this, test::PacketTransport::kSender,
+ CallTest::payload_type_map_, FakeNetworkPipe::Config());
}
-test::PacketTransport* BaseTest::CreateReceiveTransport() {
- return new PacketTransport(nullptr, this, test::PacketTransport::kReceiver,
- CallTest::payload_type_map_,
- FakeNetworkPipe::Config());
+test::PacketTransport* BaseTest::CreateReceiveTransport(
+ SingleThreadedTaskQueueForTesting* task_queue) {
+ return new PacketTransport(
+ task_queue, nullptr, this, test::PacketTransport::kReceiver,
+ CallTest::payload_type_map_, FakeNetworkPipe::Config());
}
size_t BaseTest::GetNumVideoStreams() const {
diff --git a/webrtc/test/call_test.h b/webrtc/test/call_test.h
index 5186afa..b0ae3f6 100644
--- a/webrtc/test/call_test.h
+++ b/webrtc/test/call_test.h
@@ -23,6 +23,7 @@
#include "webrtc/test/fake_videorenderer.h"
#include "webrtc/test/frame_generator_capturer.h"
#include "webrtc/test/rtp_rtcp_observer.h"
+#include "webrtc/test/single_threaded_task_queue.h"
namespace webrtc {
@@ -136,6 +137,8 @@
rtc::scoped_refptr<AudioEncoderFactory> encoder_factory_;
test::FakeVideoRenderer fake_renderer_;
+ SingleThreadedTaskQueueForTesting task_queue_;
+
private:
// TODO(holmer): Remove once VoiceEngine is fully refactored to the new API.
// These methods are used to set up legacy voice engines and channels which is
@@ -188,8 +191,11 @@
RtpTransportControllerSend* controller);
virtual void OnCallsCreated(Call* sender_call, Call* receiver_call);
- virtual test::PacketTransport* CreateSendTransport(Call* sender_call);
- virtual test::PacketTransport* CreateReceiveTransport();
+ virtual test::PacketTransport* CreateSendTransport(
+ SingleThreadedTaskQueueForTesting* task_queue,
+ Call* sender_call);
+ virtual test::PacketTransport* CreateReceiveTransport(
+ SingleThreadedTaskQueueForTesting* task_queue);
virtual void ModifyVideoConfigs(
VideoSendStream::Config* send_config,
diff --git a/webrtc/test/direct_transport.cc b/webrtc/test/direct_transport.cc
index 370425c..81f3b69 100644
--- a/webrtc/test/direct_transport.cc
+++ b/webrtc/test/direct_transport.cc
@@ -10,7 +10,9 @@
#include "webrtc/test/direct_transport.h"
#include "webrtc/call/call.h"
+#include "webrtc/rtc_base/ptr_util.h"
#include "webrtc/system_wrappers/include/clock.h"
+#include "webrtc/test/single_threaded_task_queue.h"
namespace webrtc {
namespace test {
@@ -32,36 +34,71 @@
DirectTransport::DirectTransport(const FakeNetworkPipe::Config& config,
Call* send_call,
std::unique_ptr<Demuxer> demuxer)
+ : DirectTransport(nullptr, config, send_call, std::move(demuxer)) {}
+
+DirectTransport::DirectTransport(
+ SingleThreadedTaskQueueForTesting* task_queue,
+ Call* send_call,
+ const std::map<uint8_t, MediaType>& payload_type_map)
+ : DirectTransport(task_queue,
+ FakeNetworkPipe::Config(),
+ send_call,
+ payload_type_map) {
+}
+
+DirectTransport::DirectTransport(
+ SingleThreadedTaskQueueForTesting* task_queue,
+ const FakeNetworkPipe::Config& config,
+ Call* send_call,
+ const std::map<uint8_t, MediaType>& payload_type_map)
+ : DirectTransport(
+ task_queue,
+ config,
+ send_call,
+ std::unique_ptr<Demuxer>(new DemuxerImpl(payload_type_map))) {
+}
+
+DirectTransport::DirectTransport(SingleThreadedTaskQueueForTesting* task_queue,
+ const FakeNetworkPipe::Config& config,
+ Call* send_call,
+ std::unique_ptr<Demuxer> demuxer)
: send_call_(send_call),
- packet_event_(false, false),
- thread_(NetworkProcess, this, "NetworkProcess"),
clock_(Clock::GetRealTimeClock()),
- shutting_down_(false),
+ task_queue_(task_queue),
fake_network_(clock_, config, std::move(demuxer)) {
- thread_.Start();
+ // TODO(eladalon): When the deprecated ctors are removed, this check
+ // can be restored. https://bugs.chromium.org/p/webrtc/issues/detail?id=8125
+ // RTC_DCHECK(task_queue);
+ if (!task_queue) {
+ deprecated_task_queue_ =
+ rtc::MakeUnique<SingleThreadedTaskQueueForTesting>("deprecated_queue");
+ task_queue_ = deprecated_task_queue_.get();
+ }
+
if (send_call_) {
send_call_->SignalChannelNetworkState(MediaType::AUDIO, kNetworkUp);
send_call_->SignalChannelNetworkState(MediaType::VIDEO, kNetworkUp);
}
+ SendPackets();
}
-DirectTransport::~DirectTransport() { StopSending(); }
+DirectTransport::~DirectTransport() {
+ RTC_DCHECK_CALLED_SEQUENTIALLY(&sequence_checker_);
+ // Constructor updates |next_scheduled_task_|, so it's guaranteed to
+ // be initialized.
+ task_queue_->CancelTask(next_scheduled_task_);
+}
void DirectTransport::SetConfig(const FakeNetworkPipe::Config& config) {
fake_network_.SetConfig(config);
}
void DirectTransport::StopSending() {
- {
- rtc::CritScope crit(&lock_);
- shutting_down_ = true;
- }
-
- packet_event_.Set();
- thread_.Stop();
+ task_queue_->CancelTask(next_scheduled_task_);
}
void DirectTransport::SetReceiver(PacketReceiver* receiver) {
+ RTC_DCHECK_CALLED_SEQUENTIALLY(&sequence_checker_);
fake_network_.SetReceiver(receiver);
}
@@ -74,13 +111,11 @@
send_call_->OnSentPacket(sent_packet);
}
fake_network_.SendPacket(data, length);
- packet_event_.Set();
return true;
}
bool DirectTransport::SendRtcp(const uint8_t* data, size_t length) {
fake_network_.SendPacket(data, length);
- packet_event_.Set();
return true;
}
@@ -104,18 +139,15 @@
packet->data_length(), packet_time);
}
-bool DirectTransport::NetworkProcess(void* transport) {
- return static_cast<DirectTransport*>(transport)->SendPackets();
-}
+void DirectTransport::SendPackets() {
+ RTC_DCHECK_CALLED_SEQUENTIALLY(&sequence_checker_);
-bool DirectTransport::SendPackets() {
fake_network_.Process();
- int64_t wait_time_ms = fake_network_.TimeUntilNextProcess();
- if (wait_time_ms > 0) {
- packet_event_.Wait(static_cast<int>(wait_time_ms));
- }
- rtc::CritScope crit(&lock_);
- return shutting_down_ ? false : true;
+
+ int64_t delay_ms = fake_network_.TimeUntilNextProcess();
+ next_scheduled_task_ = task_queue_->PostDelayedTask([this]() {
+ SendPackets();
+ }, delay_ms);
}
} // namespace test
} // namespace webrtc
diff --git a/webrtc/test/direct_transport.h b/webrtc/test/direct_transport.h
index 0698992..93ba971 100644
--- a/webrtc/test/direct_transport.h
+++ b/webrtc/test/direct_transport.h
@@ -12,14 +12,13 @@
#include <assert.h>
-#include <deque>
+#include <memory>
#include "webrtc/api/call/transport.h"
#include "webrtc/call/call.h"
-#include "webrtc/rtc_base/criticalsection.h"
-#include "webrtc/rtc_base/event.h"
-#include "webrtc/rtc_base/platform_thread.h"
+#include "webrtc/rtc_base/sequenced_task_checker.h"
#include "webrtc/test/fake_network_pipe.h"
+#include "webrtc/test/single_threaded_task_queue.h"
namespace webrtc {
@@ -30,14 +29,17 @@
class DirectTransport : public Transport {
public:
- DirectTransport(Call* send_call,
- const std::map<uint8_t, MediaType>& payload_type_map);
- DirectTransport(const FakeNetworkPipe::Config& config,
- Call* send_call,
- const std::map<uint8_t, MediaType>& payload_type_map);
- DirectTransport(const FakeNetworkPipe::Config& config,
- Call* send_call,
- std::unique_ptr<Demuxer> demuxer);
+ RTC_DEPRECATED DirectTransport(
+ Call* send_call,
+ const std::map<uint8_t, MediaType>& payload_type_map);
+ RTC_DEPRECATED DirectTransport(
+ const FakeNetworkPipe::Config& config,
+ Call* send_call,
+ const std::map<uint8_t, MediaType>& payload_type_map);
+ RTC_DEPRECATED DirectTransport(
+ const FakeNetworkPipe::Config& config,
+ Call* send_call,
+ std::unique_ptr<Demuxer> demuxer);
// This deprecated variant always uses MediaType::VIDEO.
RTC_DEPRECATED explicit DirectTransport(Call* send_call)
@@ -46,11 +48,26 @@
send_call,
std::unique_ptr<Demuxer>(new ForceDemuxer(MediaType::VIDEO))) {}
+ DirectTransport(SingleThreadedTaskQueueForTesting* task_queue,
+ Call* send_call,
+ const std::map<uint8_t, MediaType>& payload_type_map);
+
+ DirectTransport(SingleThreadedTaskQueueForTesting* task_queue,
+ const FakeNetworkPipe::Config& config,
+ Call* send_call,
+ const std::map<uint8_t, MediaType>& payload_type_map);
+
+ DirectTransport(SingleThreadedTaskQueueForTesting* task_queue,
+ const FakeNetworkPipe::Config& config,
+ Call* send_call,
+ std::unique_ptr<Demuxer> demuxer);
+
~DirectTransport() override;
void SetConfig(const FakeNetworkPipe::Config& config);
- virtual void StopSending();
+ RTC_DEPRECATED void StopSending();
+
// TODO(holmer): Look into moving this to the constructor.
virtual void SetReceiver(PacketReceiver* receiver);
@@ -77,18 +94,25 @@
RTC_DISALLOW_COPY_AND_ASSIGN(ForceDemuxer);
};
- static bool NetworkProcess(void* transport);
- bool SendPackets();
+ void SendPackets();
- rtc::CriticalSection lock_;
Call* const send_call_;
- rtc::Event packet_event_;
- rtc::PlatformThread thread_;
Clock* const clock_;
- bool shutting_down_;
+ // TODO(eladalon): Make |task_queue_| const.
+ // https://bugs.chromium.org/p/webrtc/issues/detail?id=8125
+ SingleThreadedTaskQueueForTesting* task_queue_;
+ SingleThreadedTaskQueueForTesting::TaskId next_scheduled_task_;
FakeNetworkPipe fake_network_;
+
+ rtc::SequencedTaskChecker sequence_checker_;
+
+ // TODO(eladalon): https://bugs.chromium.org/p/webrtc/issues/detail?id=8125
+ // Deprecated versions of the ctor don't get the task queue passed in from
+ // outside. We'll create one locally for them. This is deprecated, and will
+ // be removed as soon as the need for those ctors is removed.
+ std::unique_ptr<SingleThreadedTaskQueueForTesting> deprecated_task_queue_;
};
} // namespace test
} // namespace webrtc
diff --git a/webrtc/test/layer_filtering_transport.cc b/webrtc/test/layer_filtering_transport.cc
index 00fe5f9..7d7288a 100644
--- a/webrtc/test/layer_filtering_transport.cc
+++ b/webrtc/test/layer_filtering_transport.cc
@@ -21,6 +21,7 @@
namespace test {
LayerFilteringTransport::LayerFilteringTransport(
+ SingleThreadedTaskQueueForTesting* task_queue,
const FakeNetworkPipe::Config& config,
Call* send_call,
uint8_t vp8_video_payload_type,
@@ -28,7 +29,7 @@
int selected_tl,
int selected_sl,
const std::map<uint8_t, MediaType>& payload_type_map)
- : test::DirectTransport(config, send_call, payload_type_map),
+ : DirectTransport(task_queue, config, send_call, payload_type_map),
vp8_video_payload_type_(vp8_video_payload_type),
vp9_video_payload_type_(vp9_video_payload_type),
selected_tl_(selected_tl),
diff --git a/webrtc/test/layer_filtering_transport.h b/webrtc/test/layer_filtering_transport.h
index 488f764..32eb4c5 100644
--- a/webrtc/test/layer_filtering_transport.h
+++ b/webrtc/test/layer_filtering_transport.h
@@ -13,6 +13,7 @@
#include "webrtc/call/call.h"
#include "webrtc/test/direct_transport.h"
#include "webrtc/test/fake_network_pipe.h"
+#include "webrtc/test/single_threaded_task_queue.h"
#include <map>
@@ -22,7 +23,8 @@
class LayerFilteringTransport : public test::DirectTransport {
public:
- LayerFilteringTransport(const FakeNetworkPipe::Config& config,
+ LayerFilteringTransport(SingleThreadedTaskQueueForTesting* task_queue,
+ const FakeNetworkPipe::Config& config,
Call* send_call,
uint8_t vp8_video_payload_type,
uint8_t vp9_video_payload_type,
diff --git a/webrtc/test/rtp_rtcp_observer.h b/webrtc/test/rtp_rtcp_observer.h
index 54db770..4154010 100644
--- a/webrtc/test/rtp_rtcp_observer.h
+++ b/webrtc/test/rtp_rtcp_observer.h
@@ -32,6 +32,7 @@
namespace test {
class PacketTransport;
+class SingleThreadedTaskQueueForTesting;
class RtpRtcpObserver {
public:
@@ -91,12 +92,16 @@
public:
enum TransportType { kReceiver, kSender };
- PacketTransport(Call* send_call,
+ PacketTransport(SingleThreadedTaskQueueForTesting* task_queue,
+ Call* send_call,
RtpRtcpObserver* observer,
TransportType transport_type,
const std::map<uint8_t, MediaType>& payload_type_map,
const FakeNetworkPipe::Config& configuration)
- : test::DirectTransport(configuration, send_call, payload_type_map),
+ : test::DirectTransport(task_queue,
+ configuration,
+ send_call,
+ payload_type_map),
observer_(observer),
transport_type_(transport_type) {}
diff --git a/webrtc/test/single_threaded_task_queue.cc b/webrtc/test/single_threaded_task_queue.cc
new file mode 100644
index 0000000..bee5981
--- /dev/null
+++ b/webrtc/test/single_threaded_task_queue.cc
@@ -0,0 +1,144 @@
+/*
+ * Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "webrtc/test/single_threaded_task_queue.h"
+
+#include <utility>
+
+#include "webrtc/rtc_base/checks.h"
+#include "webrtc/rtc_base/ptr_util.h"
+#include "webrtc/rtc_base/safe_conversions.h"
+#include "webrtc/rtc_base/timeutils.h"
+
+namespace webrtc {
+namespace test {
+
+SingleThreadedTaskQueueForTesting::QueuedTask::QueuedTask(
+ SingleThreadedTaskQueueForTesting::TaskId task_id,
+ int64_t earliest_execution_time,
+ SingleThreadedTaskQueueForTesting::Task task)
+ : task_id(task_id),
+ earliest_execution_time(earliest_execution_time),
+ task(task) {}
+
+SingleThreadedTaskQueueForTesting::QueuedTask::~QueuedTask() = default;
+
+SingleThreadedTaskQueueForTesting::SingleThreadedTaskQueueForTesting(
+ const char* name)
+ : thread_(Run, this, name),
+ running_(true),
+ next_task_id_(0),
+ wake_up_(false, false) {
+ thread_.Start();
+}
+
+SingleThreadedTaskQueueForTesting::~SingleThreadedTaskQueueForTesting() {
+ RTC_DCHECK_RUN_ON(&owner_thread_checker_);
+ {
+ rtc::CritScope lock(&cs_);
+ running_ = false;
+ }
+ wake_up_.Set();
+ thread_.Stop();
+}
+
+SingleThreadedTaskQueueForTesting::TaskId
+SingleThreadedTaskQueueForTesting::PostTask(Task task) {
+ return PostDelayedTask(task, 0);
+}
+
+SingleThreadedTaskQueueForTesting::TaskId
+SingleThreadedTaskQueueForTesting::PostDelayedTask(Task task,
+ int64_t delay_ms) {
+ int64_t earliest_exec_time = rtc::TimeAfter(delay_ms);
+
+ rtc::CritScope lock(&cs_);
+
+ TaskId id = next_task_id_++;
+
+ // Insert after any other tasks with an earlier-or-equal target time.
+ auto it = tasks_.begin();
+ for (; it != tasks_.end(); it++) {
+ if (earliest_exec_time < (*it)->earliest_execution_time) {
+ break;
+ }
+ }
+ tasks_.insert(it, rtc::MakeUnique<QueuedTask>(id, earliest_exec_time, task));
+
+ // This class is optimized for simplicty, not for performance. This will wake
+ // the thread up even if the next task in the queue is only scheduled for
+ // quite some time from now. In that case, the thread will just send itself
+ // back to sleep.
+ wake_up_.Set();
+
+ return id;
+}
+
+void SingleThreadedTaskQueueForTesting::SendTask(Task task) {
+ rtc::Event done(true, false);
+ PostTask([&task, &done]() {
+ task();
+ done.Set();
+ });
+ done.Wait(rtc::Event::kForever);
+}
+
+bool SingleThreadedTaskQueueForTesting::CancelTask(TaskId task_id) {
+ rtc::CritScope lock(&cs_);
+ for (auto it = tasks_.begin(); it != tasks_.end(); it++) {
+ if ((*it)->task_id == task_id) {
+ tasks_.erase(it);
+ return true;
+ }
+ }
+ return false;
+}
+
+void SingleThreadedTaskQueueForTesting::Run(void* obj) {
+ static_cast<SingleThreadedTaskQueueForTesting*>(obj)->RunLoop();
+}
+
+void SingleThreadedTaskQueueForTesting::RunLoop() {
+ while (true) {
+ std::unique_ptr<QueuedTask> queued_task;
+
+ // An empty queue would lead to sleeping until the queue becoems non-empty.
+ // A queue where the earliest task is shceduled for later than now, will
+ // lead to sleeping until the time of the next scheduled task (or until
+ // more tasks are scheduled).
+ int wait_time = rtc::Event::kForever;
+
+ {
+ rtc::CritScope lock(&cs_);
+ if (!running_) {
+ return;
+ }
+ if (!tasks_.empty()) {
+ int64_t remaining_delay_ms = rtc::TimeDiff(
+ tasks_.front()->earliest_execution_time, rtc::TimeMillis());
+ if (remaining_delay_ms <= 0) {
+ queued_task = std::move(tasks_.front());
+ tasks_.pop_front();
+ } else {
+ wait_time = rtc::saturated_cast<int>(remaining_delay_ms);
+ }
+ }
+ }
+
+ if (queued_task) {
+ queued_task->task();
+ } else {
+ wake_up_.Wait(wait_time);
+ }
+ }
+}
+
+} // namespace test
+} // namespace webrtc
diff --git a/webrtc/test/single_threaded_task_queue.h b/webrtc/test/single_threaded_task_queue.h
new file mode 100644
index 0000000..ecb22b3
--- /dev/null
+++ b/webrtc/test/single_threaded_task_queue.h
@@ -0,0 +1,97 @@
+/*
+ * Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+#ifndef WEBRTC_TEST_SINGLE_THREADED_TASK_QUEUE_H_
+#define WEBRTC_TEST_SINGLE_THREADED_TASK_QUEUE_H_
+
+#include <functional>
+#include <list>
+#include <memory>
+
+#include "webrtc/rtc_base/criticalsection.h"
+#include "webrtc/rtc_base/event.h"
+#include "webrtc/rtc_base/platform_thread.h"
+#include "webrtc/rtc_base/thread_checker.h"
+
+namespace webrtc {
+namespace test {
+
+// This class gives capabilities similar to rtc::TaskQueue, but ensures
+// everything happens on the same thread. This is intended to make the
+// threading model of unit-tests (specifically end-to-end tests) more closely
+// resemble that of real WebRTC, thereby allowing us to replace some critical
+// sections by thread-checkers.
+// This task is NOT tuned for performance, but rather for simplicity.
+class SingleThreadedTaskQueueForTesting {
+ public:
+ using Task = std::function<void()>;
+ using TaskId = size_t;
+
+ explicit SingleThreadedTaskQueueForTesting(const char* name);
+ ~SingleThreadedTaskQueueForTesting();
+
+ // Sends one task to the task-queue, and returns a handle by which the
+ // task can be cancelled.
+ // This mimics the behavior of TaskQueue, but only for lambdas, rather than
+ // for both lambdas and QueuedTask objects.
+ TaskId PostTask(Task task);
+
+ // Same as PostTask(), but ensures that the task will not begin execution
+ // less than |delay_ms| milliseconds after being posted; an upper bound
+ // is not provided.
+ TaskId PostDelayedTask(Task task, int64_t delay_ms);
+
+ // Send one task to the queue. The function does not return until the task
+ // has finished executing. No support for canceling the task.
+ void SendTask(Task task);
+
+ // Given an identifier to the task, attempts to eject it from the queue.
+ // Returns true if the task was found and cancelled. Failure possible
+ // only for invalid task IDs, or for tasks which have already been executed.
+ bool CancelTask(TaskId task_id);
+
+ private:
+ struct QueuedTask {
+ QueuedTask(TaskId task_id, int64_t earliest_execution_time, Task task);
+ ~QueuedTask();
+
+ TaskId task_id;
+ int64_t earliest_execution_time;
+ Task task;
+ };
+
+ static void Run(void* obj);
+
+ void RunLoop();
+
+ rtc::CriticalSection cs_;
+ std::list<std::unique_ptr<QueuedTask>> tasks_ GUARDED_BY(cs_);
+ rtc::ThreadChecker owner_thread_checker_;
+ rtc::PlatformThread thread_;
+ bool running_ GUARDED_BY(cs_);
+
+ TaskId next_task_id_;
+
+ // The task-queue will sleep when not executing a task. Wake up occurs when:
+ // * Upon destruction, to make sure that the |thead_| terminates, so that it
+ // may be joined. [Event will be set.]
+ // * New task added. Because we optimize for simplicity rahter than for
+ // performance (this class is a testing facility only), waking up occurs
+ // when we get a new task even if it is scheduled with a delay. The RunLoop
+ // is in charge of sending itself back to sleep if the next task is only
+ // to be executed at a later time. [Event will be set.]
+ // * When the next task in the queue is a delayed-task, and the time for
+ // its execution has come. [Event will time-out.]
+ rtc::Event wake_up_;
+};
+
+} // namespace test
+} // namespace webrtc
+
+#endif // WEBRTC_TEST_SINGLE_THREADED_TASK_QUEUE_H_
diff --git a/webrtc/test/single_threaded_task_queue_unittest.cc b/webrtc/test/single_threaded_task_queue_unittest.cc
new file mode 100644
index 0000000..8ad8b4f
--- /dev/null
+++ b/webrtc/test/single_threaded_task_queue_unittest.cc
@@ -0,0 +1,364 @@
+/*
+ * Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "webrtc/test/single_threaded_task_queue.h"
+
+#include <atomic>
+#include <memory>
+#include <vector>
+
+#include "webrtc/rtc_base/event.h"
+#include "webrtc/rtc_base/ptr_util.h"
+#include "webrtc/test/gtest.h"
+
+namespace webrtc {
+namespace test {
+
+namespace {
+
+using TaskId = SingleThreadedTaskQueueForTesting::TaskId;
+
+// Test should not rely on the object under test not being faulty. If the task
+// queue ever blocks forever, we want the tests to fail, rather than hang.
+constexpr int kMaxWaitTimeMs = 10000;
+
+TEST(SingleThreadedTaskQueueForTestingTest, SanityConstructionDestruction) {
+ SingleThreadedTaskQueueForTesting task_queue("task_queue");
+}
+
+TEST(SingleThreadedTaskQueueForTestingTest, ExecutesPostedTasks) {
+ SingleThreadedTaskQueueForTesting task_queue("task_queue");
+
+ std::atomic<bool> executed(false);
+ rtc::Event done(true, false);
+
+ task_queue.PostTask([&executed, &done]() {
+ executed.store(true);
+ done.Set();
+ });
+ ASSERT_TRUE(done.Wait(kMaxWaitTimeMs));
+
+ EXPECT_TRUE(executed.load());
+}
+
+TEST(SingleThreadedTaskQueueForTestingTest,
+ PostMultipleTasksFromSameExternalThread) {
+ SingleThreadedTaskQueueForTesting task_queue("task_queue");
+
+ constexpr size_t kCount = 3;
+ std::atomic<bool> executed[kCount];
+ for (std::atomic<bool>& exec : executed) {
+ exec.store(false);
+ }
+
+ std::vector<std::unique_ptr<rtc::Event>> done_events;
+ for (size_t i = 0; i < kCount; i++) {
+ done_events.emplace_back(rtc::MakeUnique<rtc::Event>(false, false));
+ }
+
+ // To avoid the tasks which comprise the actual test from running before they
+ // have all be posted, which could result in only one task ever being in the
+ // queue at any given time, post one waiting task that would block the
+ // task-queue, and unblock only after all tasks have been posted.
+ rtc::Event rendezvous(true, false);
+ task_queue.PostTask([&rendezvous]() {
+ ASSERT_TRUE(rendezvous.Wait(kMaxWaitTimeMs));
+ });
+
+ // Post the tasks which comprise the test.
+ for (size_t i = 0; i < kCount; i++) {
+ task_queue.PostTask([&executed, &done_events, i]() { // |i| by value.
+ executed[i].store(true);
+ done_events[i]->Set();
+ });
+ }
+
+ rendezvous.Set(); // Release the task-queue.
+
+ // Wait until the task queue has executed all the tasks.
+ for (size_t i = 0; i < kCount; i++) {
+ ASSERT_TRUE(done_events[i]->Wait(kMaxWaitTimeMs));
+ }
+
+ for (size_t i = 0; i < kCount; i++) {
+ EXPECT_TRUE(executed[i].load());
+ }
+}
+
+TEST(SingleThreadedTaskQueueForTestingTest, PostToTaskQueueFromOwnThread) {
+ SingleThreadedTaskQueueForTesting task_queue("task_queue");
+
+ std::atomic<bool> executed(false);
+ rtc::Event done(true, false);
+
+ auto internally_posted_task = [&executed, &done]() {
+ executed.store(true);
+ done.Set();
+ };
+
+ auto externally_posted_task = [&task_queue, &internally_posted_task]() {
+ task_queue.PostTask(internally_posted_task);
+ };
+
+ task_queue.PostTask(externally_posted_task);
+
+ ASSERT_TRUE(done.Wait(kMaxWaitTimeMs));
+ EXPECT_TRUE(executed.load());
+}
+
+TEST(SingleThreadedTaskQueueForTestingTest, TasksExecutedInSequence) {
+ SingleThreadedTaskQueueForTesting task_queue("task_queue");
+
+ // The first task would perform:
+ // accumulator = 10 * accumulator + i
+ // Where |i| is 1, 2 and 3 for the 1st, 2nd and 3rd tasks, respectively.
+ // The result would be 123 if and only iff the tasks were executed in order.
+ size_t accumulator = 0;
+ size_t expected_value = 0; // Updates to the correct value.
+
+ // Prevent the chain from being set in motion before we've had time to
+ // schedule it all, lest the queue only contain one task at a time.
+ rtc::Event rendezvous(true, false);
+ task_queue.PostTask([&rendezvous]() {
+ ASSERT_TRUE(rendezvous.Wait(kMaxWaitTimeMs));
+ });
+
+ for (size_t i = 0; i < 3; i++) {
+ task_queue.PostTask([&accumulator, i]() { // |i| passed by value.
+ accumulator = 10 * accumulator + i;
+ });
+ expected_value = 10 * expected_value + i;
+ }
+
+ // The test will wait for the task-queue to finish.
+ rtc::Event done(true, false);
+ task_queue.PostTask([&done]() {
+ done.Set();
+ });
+
+ rendezvous.Set(); // Set the chain in motion.
+
+ ASSERT_TRUE(done.Wait(kMaxWaitTimeMs));
+
+ EXPECT_EQ(accumulator, expected_value);
+}
+
+TEST(SingleThreadedTaskQueueForTestingTest, ExecutesPostedDelayedTask) {
+ SingleThreadedTaskQueueForTesting task_queue("task_queue");
+
+ std::atomic<bool> executed(false);
+ rtc::Event done(true, false);
+
+ constexpr int64_t delay_ms = 20;
+ static_assert(delay_ms < kMaxWaitTimeMs / 2, "Delay too long for tests.");
+
+ task_queue.PostDelayedTask([&executed, &done]() {
+ executed.store(true);
+ done.Set();
+ }, delay_ms);
+ ASSERT_TRUE(done.Wait(kMaxWaitTimeMs));
+
+ EXPECT_TRUE(executed.load());
+}
+
+TEST(SingleThreadedTaskQueueForTestingTest, DoesNotExecuteDelayedTaskTooSoon) {
+ SingleThreadedTaskQueueForTesting task_queue("task_queue");
+
+ std::atomic<bool> executed(false);
+
+ constexpr int64_t delay_ms = 2000;
+ static_assert(delay_ms < kMaxWaitTimeMs / 2, "Delay too long for tests.");
+
+ task_queue.PostDelayedTask([&executed]() {
+ executed.store(true);
+ }, delay_ms);
+
+ // Wait less than is enough, make sure the task was not yet executed.
+ rtc::Event not_done(true, false);
+ ASSERT_FALSE(not_done.Wait(delay_ms / 2));
+ EXPECT_FALSE(executed.load());
+}
+
+TEST(SingleThreadedTaskQueueForTestingTest,
+ TaskWithLesserDelayPostedAfterFirstDelayedTaskExectuedBeforeFirst) {
+ SingleThreadedTaskQueueForTesting task_queue("task_queue");
+
+ std::atomic<bool> earlier_executed(false);
+ constexpr int64_t earlier_delay_ms = 500;
+
+ std::atomic<bool> later_executed(false);
+ constexpr int64_t later_delay_ms = 1000;
+
+ static_assert(earlier_delay_ms + later_delay_ms < kMaxWaitTimeMs / 2,
+ "Delay too long for tests.");
+
+ rtc::Event done(true, false);
+
+ auto earlier_task = [&earlier_executed, &later_executed]() {
+ EXPECT_FALSE(later_executed.load());
+ earlier_executed.store(true);
+ };
+
+ auto later_task = [&earlier_executed, &later_executed, &done]() {
+ EXPECT_TRUE(earlier_executed.load());
+ later_executed.store(true);
+ done.Set();
+ };
+
+ task_queue.PostDelayedTask(later_task, later_delay_ms);
+ task_queue.PostDelayedTask(earlier_task, earlier_delay_ms);
+
+ ASSERT_TRUE(done.Wait(kMaxWaitTimeMs));
+ ASSERT_TRUE(earlier_executed);
+ ASSERT_TRUE(later_executed);
+}
+
+TEST(SingleThreadedTaskQueueForTestingTest,
+ TaskWithGreaterDelayPostedAfterFirstDelayedTaskExectuedAfterFirst) {
+ SingleThreadedTaskQueueForTesting task_queue("task_queue");
+
+ std::atomic<bool> earlier_executed(false);
+ constexpr int64_t earlier_delay_ms = 500;
+
+ std::atomic<bool> later_executed(false);
+ constexpr int64_t later_delay_ms = 1000;
+
+ static_assert(earlier_delay_ms + later_delay_ms < kMaxWaitTimeMs / 2,
+ "Delay too long for tests.");
+
+ rtc::Event done(true, false);
+
+ auto earlier_task = [&earlier_executed, &later_executed]() {
+ EXPECT_FALSE(later_executed.load());
+ earlier_executed.store(true);
+ };
+
+ auto later_task = [&earlier_executed, &later_executed, &done]() {
+ EXPECT_TRUE(earlier_executed.load());
+ later_executed.store(true);
+ done.Set();
+ };
+
+ task_queue.PostDelayedTask(earlier_task, earlier_delay_ms);
+ task_queue.PostDelayedTask(later_task, later_delay_ms);
+
+ ASSERT_TRUE(done.Wait(kMaxWaitTimeMs));
+ ASSERT_TRUE(earlier_executed);
+ ASSERT_TRUE(later_executed);
+}
+
+TEST(SingleThreadedTaskQueueForTestingTest, ExternalThreadCancelsTask) {
+ SingleThreadedTaskQueueForTesting task_queue("task_queue");
+
+ rtc::Event done(true, false);
+
+ // Prevent the to-be-cancelled task from being executed before we've had
+ // time to cancel it.
+ rtc::Event rendezvous(true, false);
+ task_queue.PostTask([&rendezvous]() {
+ ASSERT_TRUE(rendezvous.Wait(kMaxWaitTimeMs));
+ });
+
+ TaskId cancelled_task_id = task_queue.PostTask([]() {
+ EXPECT_TRUE(false);
+ });
+ task_queue.PostTask([&done]() {
+ done.Set();
+ });
+
+ task_queue.CancelTask(cancelled_task_id);
+
+ // Set the tasks in motion; the cancelled task does not run (otherwise the
+ // test would fail). The last task ends the test, showing that the queue
+ // progressed beyond the cancelled task.
+ rendezvous.Set();
+ ASSERT_TRUE(done.Wait(kMaxWaitTimeMs));
+}
+
+// In this test, we'll set off a chain where the first task cancels the second
+// task, then a third task runs (showing that we really cancelled the task,
+// rather than just halted the task-queue).
+TEST(SingleThreadedTaskQueueForTestingTest, InternalThreadCancelsTask) {
+ SingleThreadedTaskQueueForTesting task_queue("task_queue");
+
+ rtc::Event done(true, false);
+
+ // Prevent the chain from being set-off before we've set everything up.
+ rtc::Event rendezvous(true, false);
+ task_queue.PostTask([&rendezvous]() {
+ ASSERT_TRUE(rendezvous.Wait(kMaxWaitTimeMs));
+ });
+
+ // This is the canceller-task. It takes cancelled_task_id by reference,
+ // because the ID will only become known after the cancelled task is
+ // scheduled.
+ TaskId cancelled_task_id;
+ auto canceller_task = [&task_queue, &cancelled_task_id]() {
+ task_queue.CancelTask(cancelled_task_id);
+ };
+ task_queue.PostTask(canceller_task);
+
+ // This task will be cancelled by the task before it.
+ auto cancelled_task = []() {
+ EXPECT_TRUE(false);
+ };
+ cancelled_task_id = task_queue.PostTask(cancelled_task);
+
+ // When this task runs, it will allow the test to be finished.
+ auto completion_marker_task = [&done]() {
+ done.Set();
+ };
+ task_queue.PostTask(completion_marker_task);
+
+ rendezvous.Set(); // Set the chain in motion.
+
+ ASSERT_TRUE(done.Wait(kMaxWaitTimeMs));
+}
+
+TEST(SingleThreadedTaskQueueForTestingTest, SendTask) {
+ SingleThreadedTaskQueueForTesting task_queue("task_queue");
+
+ std::atomic<bool> executed(false);
+
+ task_queue.SendTask([&executed]() {
+ // Intentionally delay, so that if SendTask didn't block, the sender thread
+ // would have time to read |executed|.
+ rtc::Event delay(true, false);
+ ASSERT_FALSE(delay.Wait(1000));
+ executed.store(true);
+ });
+
+ EXPECT_TRUE(executed);
+}
+
+TEST(SingleThreadedTaskQueueForTestingTest,
+ DestructTaskQueueWhileTasksPending) {
+ auto task_queue =
+ rtc::MakeUnique<SingleThreadedTaskQueueForTesting>("task_queue");
+
+ std::atomic<size_t> counter(0);
+
+ constexpr size_t tasks = 10;
+ for (size_t i = 0; i < tasks; i++) {
+ task_queue->PostTask([&counter]() {
+ std::atomic_fetch_add(&counter, static_cast<size_t>(1));
+ rtc::Event delay(true, false);
+ ASSERT_FALSE(delay.Wait(500));
+ });
+ }
+
+ task_queue.reset();
+
+ EXPECT_LT(counter, tasks);
+}
+
+} // namespace
+} // namespace test
+} // namespace webrtc
diff --git a/webrtc/video/end_to_end_tests.cc b/webrtc/video/end_to_end_tests.cc
index 01d34a7..5ef5845 100644
--- a/webrtc/video/end_to_end_tests.cc
+++ b/webrtc/video/end_to_end_tests.cc
@@ -39,6 +39,7 @@
#include "webrtc/rtc_base/event.h"
#include "webrtc/rtc_base/file.h"
#include "webrtc/rtc_base/optional.h"
+#include "webrtc/rtc_base/ptr_util.h"
#include "webrtc/rtc_base/random.h"
#include "webrtc/rtc_base/rate_limiter.h"
#include "webrtc/system_wrappers/include/metrics.h"
@@ -217,41 +218,50 @@
rtc::Event event_;
} renderer;
- CreateCalls(Call::Config(event_log_.get()), Call::Config(event_log_.get()));
-
- test::DirectTransport sender_transport(sender_call_.get(), payload_type_map_);
- test::DirectTransport receiver_transport(receiver_call_.get(),
- payload_type_map_);
- sender_transport.SetReceiver(receiver_call_->Receiver());
- receiver_transport.SetReceiver(sender_call_->Receiver());
-
- CreateSendConfig(1, 0, 0, &sender_transport);
- CreateMatchingReceiveConfigs(&receiver_transport);
-
- video_receive_configs_[0].renderer = &renderer;
-
- CreateVideoStreams();
- Start();
-
- // Create frames that are smaller than the send width/height, this is done to
- // check that the callbacks are done after processing video.
- std::unique_ptr<test::FrameGenerator> frame_generator(
- test::FrameGenerator::CreateSquareGenerator(kWidth, kHeight));
test::FrameForwarder frame_forwarder;
- video_send_stream_->SetSource(
- &frame_forwarder,
- VideoSendStream::DegradationPreference::kMaintainFramerate);
+ std::unique_ptr<test::DirectTransport> sender_transport;
+ std::unique_ptr<test::DirectTransport> receiver_transport;
- frame_forwarder.IncomingCapturedFrame(*frame_generator->NextFrame());
+ task_queue_.SendTask([this, &renderer, &frame_forwarder, &sender_transport,
+ &receiver_transport]() {
+ CreateCalls(Call::Config(event_log_.get()), Call::Config(event_log_.get()));
+
+ sender_transport = rtc::MakeUnique<test::DirectTransport>(
+ &task_queue_, sender_call_.get(), payload_type_map_);
+ receiver_transport = rtc::MakeUnique<test::DirectTransport>(
+ &task_queue_, receiver_call_.get(), payload_type_map_);
+ sender_transport->SetReceiver(receiver_call_->Receiver());
+ receiver_transport->SetReceiver(sender_call_->Receiver());
+
+ CreateSendConfig(1, 0, 0, sender_transport.get());
+ CreateMatchingReceiveConfigs(receiver_transport.get());
+
+ video_receive_configs_[0].renderer = &renderer;
+
+ CreateVideoStreams();
+ Start();
+
+ // Create frames that are smaller than the send width/height, this is done
+ // to check that the callbacks are done after processing video.
+ std::unique_ptr<test::FrameGenerator> frame_generator(
+ test::FrameGenerator::CreateSquareGenerator(kWidth, kHeight));
+ video_send_stream_->SetSource(
+ &frame_forwarder,
+ VideoSendStream::DegradationPreference::kMaintainFramerate);
+
+ frame_forwarder.IncomingCapturedFrame(*frame_generator->NextFrame());
+ });
+
EXPECT_TRUE(renderer.Wait())
<< "Timed out while waiting for the frame to render.";
- Stop();
-
- sender_transport.StopSending();
- receiver_transport.StopSending();
-
- DestroyStreams();
+ task_queue_.SendTask([this, &sender_transport, &receiver_transport]() {
+ Stop();
+ DestroyStreams();
+ sender_transport.reset();
+ receiver_transport.reset();
+ DestroyCalls();
+ });
}
TEST_F(EndToEndTest, TransmitsFirstFrame) {
@@ -266,39 +276,48 @@
rtc::Event event_;
} renderer;
- CreateCalls(Call::Config(event_log_.get()), Call::Config(event_log_.get()));
-
- test::DirectTransport sender_transport(sender_call_.get(), payload_type_map_);
- test::DirectTransport receiver_transport(receiver_call_.get(),
- payload_type_map_);
- sender_transport.SetReceiver(receiver_call_->Receiver());
- receiver_transport.SetReceiver(sender_call_->Receiver());
-
- CreateSendConfig(1, 0, 0, &sender_transport);
- CreateMatchingReceiveConfigs(&receiver_transport);
- video_receive_configs_[0].renderer = &renderer;
-
- CreateVideoStreams();
- Start();
-
- std::unique_ptr<test::FrameGenerator> frame_generator(
- test::FrameGenerator::CreateSquareGenerator(kDefaultWidth,
- kDefaultHeight));
+ std::unique_ptr<test::FrameGenerator> frame_generator;
test::FrameForwarder frame_forwarder;
- video_send_stream_->SetSource(
- &frame_forwarder,
- VideoSendStream::DegradationPreference::kMaintainFramerate);
- frame_forwarder.IncomingCapturedFrame(*frame_generator->NextFrame());
+
+ std::unique_ptr<test::DirectTransport> sender_transport;
+ std::unique_ptr<test::DirectTransport> receiver_transport;
+
+ task_queue_.SendTask([this, &renderer, &frame_generator, &frame_forwarder,
+ &sender_transport, &receiver_transport]() {
+ CreateCalls(Call::Config(event_log_.get()), Call::Config(event_log_.get()));
+
+ sender_transport = rtc::MakeUnique<test::DirectTransport>(
+ &task_queue_, sender_call_.get(), payload_type_map_);
+ receiver_transport = rtc::MakeUnique<test::DirectTransport>(
+ &task_queue_, receiver_call_.get(), payload_type_map_);
+ sender_transport->SetReceiver(receiver_call_->Receiver());
+ receiver_transport->SetReceiver(sender_call_->Receiver());
+
+ CreateSendConfig(1, 0, 0, sender_transport.get());
+ CreateMatchingReceiveConfigs(receiver_transport.get());
+ video_receive_configs_[0].renderer = &renderer;
+
+ CreateVideoStreams();
+ Start();
+
+ frame_generator = test::FrameGenerator::CreateSquareGenerator(
+ kDefaultWidth, kDefaultHeight);
+ video_send_stream_->SetSource(
+ &frame_forwarder,
+ VideoSendStream::DegradationPreference::kMaintainFramerate);
+ frame_forwarder.IncomingCapturedFrame(*frame_generator->NextFrame());
+ });
EXPECT_TRUE(renderer.Wait())
<< "Timed out while waiting for the frame to render.";
- Stop();
-
- sender_transport.StopSending();
- receiver_transport.StopSending();
-
- DestroyStreams();
+ task_queue_.SendTask([this, &sender_transport, &receiver_transport]() {
+ Stop();
+ DestroyStreams();
+ sender_transport.reset();
+ receiver_transport.reset();
+ DestroyCalls();
+ });
}
class CodecObserver : public test::EndToEndTest,
@@ -540,10 +559,11 @@
size_t GetNumVideoStreams() const override { return 0; }
size_t GetNumAudioStreams() const override { return 1; }
- test::PacketTransport* CreateReceiveTransport() override {
+ test::PacketTransport* CreateReceiveTransport(
+ test::SingleThreadedTaskQueueForTesting* task_queue) override {
test::PacketTransport* receive_transport = new test::PacketTransport(
- nullptr, this, test::PacketTransport::kReceiver, payload_type_map_,
- FakeNetworkPipe::Config());
+ task_queue, nullptr, this, test::PacketTransport::kReceiver,
+ payload_type_map_, FakeNetworkPipe::Config());
receive_transport_ = receive_transport;
return receive_transport;
}
@@ -798,12 +818,14 @@
return SEND_PACKET;
}
- test::PacketTransport* CreateSendTransport(Call* sender_call) override {
+ test::PacketTransport* CreateSendTransport(
+ test::SingleThreadedTaskQueueForTesting* task_queue,
+ Call* sender_call) override {
// At low RTT (< kLowRttNackMs) -> NACK only, no FEC.
const int kNetworkDelayMs = 100;
FakeNetworkPipe::Config config;
config.queue_delay_ms = kNetworkDelayMs;
- return new test::PacketTransport(sender_call, this,
+ return new test::PacketTransport(task_queue, sender_call, this,
test::PacketTransport::kSender,
test::CallTest::payload_type_map_, config);
}
@@ -977,13 +999,15 @@
return SEND_PACKET;
}
- test::PacketTransport* CreateSendTransport(Call* sender_call) override {
+ test::PacketTransport* CreateSendTransport(
+ test::SingleThreadedTaskQueueForTesting* task_queue,
+ Call* sender_call) override {
// At low RTT (< kLowRttNackMs) -> NACK only, no FEC.
// Configure some network delay.
const int kNetworkDelayMs = 50;
FakeNetworkPipe::Config config;
config.queue_delay_ms = kNetworkDelayMs;
- return new test::PacketTransport(sender_call, this,
+ return new test::PacketTransport(task_queue, sender_call, this,
test::PacketTransport::kSender,
payload_type_map_, config);
}
@@ -1325,35 +1349,45 @@
rtc::Event delivered_packet_;
};
- CreateCalls(Call::Config(event_log_.get()), Call::Config(event_log_.get()));
+ std::unique_ptr<test::DirectTransport> send_transport;
+ std::unique_ptr<test::DirectTransport> receive_transport;
+ std::unique_ptr<PacketInputObserver> input_observer;
- test::DirectTransport send_transport(sender_call_.get(), payload_type_map_);
- test::DirectTransport receive_transport(receiver_call_.get(),
- payload_type_map_);
- PacketInputObserver input_observer(receiver_call_->Receiver());
- send_transport.SetReceiver(&input_observer);
- receive_transport.SetReceiver(sender_call_->Receiver());
+ task_queue_.SendTask([this, &send_transport, &receive_transport,
+ &input_observer]() {
+ CreateCalls(Call::Config(event_log_.get()), Call::Config(event_log_.get()));
- CreateSendConfig(1, 0, 0, &send_transport);
- CreateMatchingReceiveConfigs(&receive_transport);
+ send_transport = rtc::MakeUnique<test::DirectTransport>(
+ &task_queue_, sender_call_.get(), payload_type_map_);
+ receive_transport = rtc::MakeUnique<test::DirectTransport>(
+ &task_queue_, receiver_call_.get(), payload_type_map_);
+ input_observer =
+ rtc::MakeUnique<PacketInputObserver>(receiver_call_->Receiver());
+ send_transport->SetReceiver(input_observer.get());
+ receive_transport->SetReceiver(sender_call_->Receiver());
- CreateVideoStreams();
- CreateFrameGeneratorCapturer(kDefaultFramerate, kDefaultWidth,
- kDefaultHeight);
- Start();
+ CreateSendConfig(1, 0, 0, send_transport.get());
+ CreateMatchingReceiveConfigs(receive_transport.get());
- receiver_call_->DestroyVideoReceiveStream(video_receive_streams_[0]);
- video_receive_streams_.clear();
+ CreateVideoStreams();
+ CreateFrameGeneratorCapturer(kDefaultFramerate, kDefaultWidth,
+ kDefaultHeight);
+ Start();
+
+ receiver_call_->DestroyVideoReceiveStream(video_receive_streams_[0]);
+ video_receive_streams_.clear();
+ });
// Wait() waits for a received packet.
- EXPECT_TRUE(input_observer.Wait());
+ EXPECT_TRUE(input_observer->Wait());
- Stop();
-
- DestroyStreams();
-
- send_transport.StopSending();
- receive_transport.StopSending();
+ task_queue_.SendTask([this, &send_transport, &receive_transport]() {
+ Stop();
+ DestroyStreams();
+ send_transport.reset();
+ receive_transport.reset();
+ DestroyCalls();
+ });
}
void EndToEndTest::RespectsRtcpMode(RtcpMode rtcp_mode) {
@@ -1460,7 +1494,8 @@
int height;
} codec_settings[kNumStreams];
- MultiStreamTest() {
+ explicit MultiStreamTest(test::SingleThreadedTaskQueueForTesting* task_queue)
+ : task_queue_(task_queue) {
// TODO(sprang): Cleanup when msvc supports explicit initializers for array.
codec_settings[0] = {1, 640, 480};
codec_settings[1] = {2, 320, 240};
@@ -1472,78 +1507,92 @@
void RunTest() {
webrtc::RtcEventLogNullImpl event_log;
Call::Config config(&event_log);
- std::unique_ptr<Call> sender_call(Call::Create(config));
- std::unique_ptr<Call> receiver_call(Call::Create(config));
- std::unique_ptr<test::DirectTransport> sender_transport(
- CreateSendTransport(sender_call.get()));
- std::unique_ptr<test::DirectTransport> receiver_transport(
- CreateReceiveTransport(receiver_call.get()));
- sender_transport->SetReceiver(receiver_call->Receiver());
- receiver_transport->SetReceiver(sender_call->Receiver());
-
- std::unique_ptr<VideoEncoder> encoders[kNumStreams];
- for (size_t i = 0; i < kNumStreams; ++i)
- encoders[i].reset(VP8Encoder::Create());
+ std::unique_ptr<Call> sender_call;
+ std::unique_ptr<Call> receiver_call;
+ std::unique_ptr<test::DirectTransport> sender_transport;
+ std::unique_ptr<test::DirectTransport> receiver_transport;
VideoSendStream* send_streams[kNumStreams];
VideoReceiveStream* receive_streams[kNumStreams];
-
test::FrameGeneratorCapturer* frame_generators[kNumStreams];
std::vector<std::unique_ptr<VideoDecoder>> allocated_decoders;
- for (size_t i = 0; i < kNumStreams; ++i) {
- uint32_t ssrc = codec_settings[i].ssrc;
- int width = codec_settings[i].width;
- int height = codec_settings[i].height;
+ std::unique_ptr<VideoEncoder> encoders[kNumStreams];
- VideoSendStream::Config send_config(sender_transport.get());
- send_config.rtp.ssrcs.push_back(ssrc);
- send_config.encoder_settings.encoder = encoders[i].get();
- send_config.encoder_settings.payload_name = "VP8";
- send_config.encoder_settings.payload_type = kVideoPayloadType;
- VideoEncoderConfig encoder_config;
- test::FillEncoderConfiguration(1, &encoder_config);
- encoder_config.max_bitrate_bps = 100000;
+ task_queue_->SendTask([&]() {
+ sender_call = rtc::WrapUnique(Call::Create(config));
+ receiver_call = rtc::WrapUnique(Call::Create(config));
+ sender_transport =
+ rtc::WrapUnique(CreateSendTransport(task_queue_, sender_call.get()));
+ receiver_transport = rtc::WrapUnique(
+ CreateReceiveTransport(task_queue_, receiver_call.get()));
- UpdateSendConfig(i, &send_config, &encoder_config, &frame_generators[i]);
+ sender_transport->SetReceiver(receiver_call->Receiver());
+ receiver_transport->SetReceiver(sender_call->Receiver());
- send_streams[i] = sender_call->CreateVideoSendStream(
- send_config.Copy(), encoder_config.Copy());
- send_streams[i]->Start();
+ for (size_t i = 0; i < kNumStreams; ++i)
+ encoders[i].reset(VP8Encoder::Create());
- VideoReceiveStream::Config receive_config(receiver_transport.get());
- receive_config.rtp.remote_ssrc = ssrc;
- receive_config.rtp.local_ssrc = test::CallTest::kReceiverLocalVideoSsrc;
- VideoReceiveStream::Decoder decoder =
- test::CreateMatchingDecoder(send_config.encoder_settings);
- allocated_decoders.push_back(
- std::unique_ptr<VideoDecoder>(decoder.decoder));
- receive_config.decoders.push_back(decoder);
+ for (size_t i = 0; i < kNumStreams; ++i) {
+ uint32_t ssrc = codec_settings[i].ssrc;
+ int width = codec_settings[i].width;
+ int height = codec_settings[i].height;
- UpdateReceiveConfig(i, &receive_config);
+ VideoSendStream::Config send_config(sender_transport.get());
+ send_config.rtp.ssrcs.push_back(ssrc);
+ send_config.encoder_settings.encoder = encoders[i].get();
+ send_config.encoder_settings.payload_name = "VP8";
+ send_config.encoder_settings.payload_type = kVideoPayloadType;
+ VideoEncoderConfig encoder_config;
+ test::FillEncoderConfiguration(1, &encoder_config);
+ encoder_config.max_bitrate_bps = 100000;
- receive_streams[i] =
- receiver_call->CreateVideoReceiveStream(std::move(receive_config));
- receive_streams[i]->Start();
+ UpdateSendConfig(i, &send_config, &encoder_config,
+ &frame_generators[i]);
- frame_generators[i] = test::FrameGeneratorCapturer::Create(
- width, height, 30, Clock::GetRealTimeClock());
- send_streams[i]->SetSource(
- frame_generators[i],
- VideoSendStream::DegradationPreference::kMaintainFramerate);
- frame_generators[i]->Start();
- }
+ send_streams[i] = sender_call->CreateVideoSendStream(
+ send_config.Copy(), encoder_config.Copy());
+ send_streams[i]->Start();
+
+ VideoReceiveStream::Config receive_config(receiver_transport.get());
+ receive_config.rtp.remote_ssrc = ssrc;
+ receive_config.rtp.local_ssrc = test::CallTest::kReceiverLocalVideoSsrc;
+ VideoReceiveStream::Decoder decoder =
+ test::CreateMatchingDecoder(send_config.encoder_settings);
+ allocated_decoders.push_back(
+ std::unique_ptr<VideoDecoder>(decoder.decoder));
+ receive_config.decoders.push_back(decoder);
+
+ UpdateReceiveConfig(i, &receive_config);
+
+ receive_streams[i] =
+ receiver_call->CreateVideoReceiveStream(std::move(receive_config));
+ receive_streams[i]->Start();
+
+ frame_generators[i] = test::FrameGeneratorCapturer::Create(
+ width, height, 30, Clock::GetRealTimeClock());
+ send_streams[i]->SetSource(
+ frame_generators[i],
+ VideoSendStream::DegradationPreference::kMaintainFramerate);
+ frame_generators[i]->Start();
+ }
+ });
Wait();
- for (size_t i = 0; i < kNumStreams; ++i) {
- frame_generators[i]->Stop();
- sender_call->DestroyVideoSendStream(send_streams[i]);
- receiver_call->DestroyVideoReceiveStream(receive_streams[i]);
- delete frame_generators[i];
- }
+ task_queue_->SendTask([&]() {
+ for (size_t i = 0; i < kNumStreams; ++i) {
+ frame_generators[i]->Stop();
+ sender_call->DestroyVideoSendStream(send_streams[i]);
+ receiver_call->DestroyVideoReceiveStream(receive_streams[i]);
+ delete frame_generators[i];
+ }
- sender_transport->StopSending();
- receiver_transport->StopSending();
+ sender_transport.reset();
+ receiver_transport.reset();
+
+ sender_call.reset();
+ receiver_call.reset();
+ });
}
protected:
@@ -1559,12 +1608,20 @@
virtual void UpdateReceiveConfig(size_t stream_index,
VideoReceiveStream::Config* receive_config) {
}
- virtual test::DirectTransport* CreateSendTransport(Call* sender_call) {
- return new test::DirectTransport(sender_call, payload_type_map_);
+ virtual test::DirectTransport* CreateSendTransport(
+ test::SingleThreadedTaskQueueForTesting* task_queue,
+ Call* sender_call) {
+ return new test::DirectTransport(task_queue, sender_call,
+ payload_type_map_);
}
- virtual test::DirectTransport* CreateReceiveTransport(Call* receiver_call) {
- return new test::DirectTransport(receiver_call, payload_type_map_);
+ virtual test::DirectTransport* CreateReceiveTransport(
+ test::SingleThreadedTaskQueueForTesting* task_queue,
+ Call* receiver_call) {
+ return new test::DirectTransport(task_queue, receiver_call,
+ payload_type_map_);
}
+
+ test::SingleThreadedTaskQueueForTesting* const task_queue_;
};
// Each renderer verifies that it receives the expected resolution, and as soon
@@ -1600,7 +1657,8 @@
class Tester : public MultiStreamTest {
public:
- Tester() {}
+ explicit Tester(test::SingleThreadedTaskQueueForTesting* task_queue)
+ : MultiStreamTest(task_queue) {}
virtual ~Tester() {}
protected:
@@ -1629,7 +1687,7 @@
private:
std::unique_ptr<VideoOutputObserver> observers_[kNumStreams];
- } tester;
+ } tester(&task_queue_);
tester.RunTest();
}
@@ -1640,11 +1698,12 @@
class RtpExtensionHeaderObserver : public test::DirectTransport {
public:
RtpExtensionHeaderObserver(
+ test::SingleThreadedTaskQueueForTesting* task_queue,
Call* sender_call,
const uint32_t& first_media_ssrc,
const std::map<uint32_t, uint32_t>& ssrc_map,
const std::map<uint8_t, MediaType>& payload_type_map)
- : DirectTransport(sender_call, payload_type_map),
+ : DirectTransport(task_queue, sender_call, payload_type_map),
done_(false, false),
parser_(RtpHeaderParser::Create()),
first_media_ssrc_(first_media_ssrc),
@@ -1759,8 +1818,11 @@
class TransportSequenceNumberTester : public MultiStreamTest {
public:
- TransportSequenceNumberTester()
- : first_media_ssrc_(0), observer_(nullptr) {}
+ explicit TransportSequenceNumberTester(
+ test::SingleThreadedTaskQueueForTesting* task_queue)
+ : MultiStreamTest(task_queue),
+ first_media_ssrc_(0),
+ observer_(nullptr) {}
virtual ~TransportSequenceNumberTester() {}
protected:
@@ -1807,15 +1869,17 @@
receive_config->renderer = &fake_renderer_;
}
- test::DirectTransport* CreateSendTransport(Call* sender_call) override {
+ test::DirectTransport* CreateSendTransport(
+ test::SingleThreadedTaskQueueForTesting* task_queue,
+ Call* sender_call) override {
std::map<uint8_t, MediaType> payload_type_map =
MultiStreamTest::payload_type_map_;
RTC_DCHECK(payload_type_map.find(kSendRtxPayloadType) ==
payload_type_map.end());
payload_type_map[kSendRtxPayloadType] = MediaType::VIDEO;
- observer_ =
- new RtpExtensionHeaderObserver(sender_call, first_media_ssrc_,
- rtx_to_media_ssrcs_, payload_type_map);
+ observer_ = new RtpExtensionHeaderObserver(
+ task_queue, sender_call, first_media_ssrc_, rtx_to_media_ssrcs_,
+ payload_type_map);
return observer_;
}
@@ -1824,7 +1888,7 @@
uint32_t first_media_ssrc_;
std::map<uint32_t, uint32_t> rtx_to_media_ssrcs_;
RtpExtensionHeaderObserver* observer_;
- } tester;
+ } tester(&task_queue_);
tester.RunTest();
}
@@ -2037,30 +2101,36 @@
EncodedFrameTestObserver post_encode_observer;
EncodedFrameTestObserver pre_decode_observer;
-
- CreateCalls(Call::Config(event_log_.get()), Call::Config(event_log_.get()));
-
- test::DirectTransport sender_transport(sender_call_.get(), payload_type_map_);
- test::DirectTransport receiver_transport(receiver_call_.get(),
- payload_type_map_);
- sender_transport.SetReceiver(receiver_call_->Receiver());
- receiver_transport.SetReceiver(sender_call_->Receiver());
-
- CreateSendConfig(1, 0, 0, &sender_transport);
- CreateMatchingReceiveConfigs(&receiver_transport);
- video_send_config_.post_encode_callback = &post_encode_observer;
- video_receive_configs_[0].pre_decode_callback = &pre_decode_observer;
-
- CreateVideoStreams();
- Start();
-
- std::unique_ptr<test::FrameGenerator> frame_generator(
- test::FrameGenerator::CreateSquareGenerator(kDefaultWidth,
- kDefaultHeight));
test::FrameForwarder forwarder;
- video_send_stream_->SetSource(
- &forwarder, VideoSendStream::DegradationPreference::kMaintainFramerate);
- forwarder.IncomingCapturedFrame(*frame_generator->NextFrame());
+ std::unique_ptr<test::FrameGenerator> frame_generator;
+
+ std::unique_ptr<test::DirectTransport> sender_transport;
+ std::unique_ptr<test::DirectTransport> receiver_transport;
+
+ task_queue_.SendTask([&]() {
+ CreateCalls(Call::Config(event_log_.get()), Call::Config(event_log_.get()));
+
+ sender_transport = rtc::MakeUnique<test::DirectTransport>(
+ &task_queue_, sender_call_.get(), payload_type_map_);
+ receiver_transport = rtc::MakeUnique<test::DirectTransport>(
+ &task_queue_, receiver_call_.get(), payload_type_map_);
+ sender_transport->SetReceiver(receiver_call_->Receiver());
+ receiver_transport->SetReceiver(sender_call_->Receiver());
+
+ CreateSendConfig(1, 0, 0, sender_transport.get());
+ CreateMatchingReceiveConfigs(receiver_transport.get());
+ video_send_config_.post_encode_callback = &post_encode_observer;
+ video_receive_configs_[0].pre_decode_callback = &pre_decode_observer;
+
+ CreateVideoStreams();
+ Start();
+
+ frame_generator = test::FrameGenerator::CreateSquareGenerator(
+ kDefaultWidth, kDefaultHeight);
+ video_send_stream_->SetSource(
+ &forwarder, VideoSendStream::DegradationPreference::kMaintainFramerate);
+ forwarder.IncomingCapturedFrame(*frame_generator->NextFrame());
+ });
EXPECT_TRUE(post_encode_observer.Wait())
<< "Timed out while waiting for send-side encoded-frame callback.";
@@ -2070,12 +2140,13 @@
post_encode_observer.ExpectEqualFrames(pre_decode_observer);
- Stop();
-
- sender_transport.StopSending();
- receiver_transport.StopSending();
-
- DestroyStreams();
+ task_queue_.SendTask([this, &sender_transport, &receiver_transport]() {
+ Stop();
+ DestroyStreams();
+ sender_transport.reset();
+ receiver_transport.reset();
+ DestroyCalls();
+ });
}
TEST_F(EndToEndTest, ReceiveStreamSendsRemb) {
@@ -2203,10 +2274,11 @@
~BweObserver() {}
- test::PacketTransport* CreateReceiveTransport() override {
+ test::PacketTransport* CreateReceiveTransport(
+ test::SingleThreadedTaskQueueForTesting* task_queue) override {
receive_transport_ = new test::PacketTransport(
- nullptr, this, test::PacketTransport::kReceiver, payload_type_map_,
- FakeNetworkPipe::Config());
+ task_queue, nullptr, this, test::PacketTransport::kReceiver,
+ payload_type_map_, FakeNetworkPipe::Config());
return receive_transport_;
}
@@ -2313,7 +2385,9 @@
TEST_F(EndToEndTest, StopSendingKeyframeRequestsForInactiveStream) {
class KeyframeRequestObserver : public test::EndToEndTest {
public:
- KeyframeRequestObserver() : clock_(Clock::GetRealTimeClock()) {}
+ explicit KeyframeRequestObserver(
+ test::SingleThreadedTaskQueueForTesting* task_queue)
+ : clock_(Clock::GetRealTimeClock()), task_queue_(task_queue) {}
void OnVideoStreamsCreated(
VideoSendStream* send_stream,
@@ -2334,7 +2408,7 @@
SleepMs(100);
}
ASSERT_TRUE(frame_decoded);
- send_stream_->Stop();
+ task_queue_->SendTask([this]() { send_stream_->Stop(); });
SleepMs(10000);
ASSERT_EQ(
1U, receive_stream_->GetStats().rtcp_packet_type_counts.pli_packets);
@@ -2344,7 +2418,8 @@
Clock* clock_;
VideoSendStream* send_stream_;
VideoReceiveStream* receive_stream_;
- } test;
+ test::SingleThreadedTaskQueueForTesting* const task_queue_;
+ } test(&task_queue_);
RunBaseTest(&test);
}
@@ -2429,8 +2504,10 @@
class TriggerMidCallProbingTest : public ProbingTest {
public:
- explicit TriggerMidCallProbingTest(bool* success)
- : ProbingTest(300000), success_(success) {}
+ TriggerMidCallProbingTest(
+ test::SingleThreadedTaskQueueForTesting* task_queue,
+ bool* success)
+ : ProbingTest(300000), success_(success), task_queue_(task_queue) {}
void PerformTest() override {
*success_ = false;
@@ -2446,7 +2523,9 @@
if (stats.send_bandwidth_bps > 5 * 300000) {
Call::Config::BitrateConfig bitrate_config;
bitrate_config.max_bitrate_bps = 100000;
- sender_call_->SetBitrateConfig(bitrate_config);
+ task_queue_->SendTask([this, &bitrate_config]() {
+ sender_call_->SetBitrateConfig(bitrate_config);
+ });
++state_;
}
break;
@@ -2454,7 +2533,9 @@
if (stats.send_bandwidth_bps < 110000) {
Call::Config::BitrateConfig bitrate_config;
bitrate_config.max_bitrate_bps = 2500000;
- sender_call_->SetBitrateConfig(bitrate_config);
+ task_queue_->SendTask([this, &bitrate_config]() {
+ sender_call_->SetBitrateConfig(bitrate_config);
+ });
++state_;
}
break;
@@ -2474,12 +2555,13 @@
private:
const int kTimeoutMs = 5000;
bool* const success_;
+ test::SingleThreadedTaskQueueForTesting* const task_queue_;
};
bool success = false;
const int kMaxAttempts = 3;
for (int i = 0; i < kMaxAttempts; ++i) {
- TriggerMidCallProbingTest test(&success);
+ TriggerMidCallProbingTest test(&task_queue_, &success);
RunBaseTest(&test);
if (success)
return;
@@ -2711,10 +2793,6 @@
metrics::Reset();
RunBaseTest(&test);
- // Delete the call for Call stats to be reported.
- sender_call_.reset();
- receiver_call_.reset();
-
std::string video_prefix =
screenshare ? "WebRTC.Video.Screenshare." : "WebRTC.Video.";
@@ -2881,59 +2959,67 @@
metrics::Reset();
Call::Config send_config(test.GetSenderCallConfig());
- CreateSenderCall(send_config);
Call::Config recv_config(test.GetReceiverCallConfig());
- CreateReceiverCall(recv_config);
- receive_transport_.reset(test.CreateReceiveTransport());
- send_transport_.reset(test.CreateSendTransport(sender_call_.get()));
- send_transport_->SetReceiver(receiver_call_->Receiver());
- receive_transport_->SetReceiver(sender_call_->Receiver());
- receiver_call_->SignalChannelNetworkState(MediaType::VIDEO, kNetworkUp);
- CreateSendConfig(1, 0, 0, send_transport_.get());
- CreateMatchingReceiveConfigs(receive_transport_.get());
+ VideoEncoderConfig encoder_config_with_screenshare;
- // Modify send and receive configs.
- video_send_config_.rtp.nack.rtp_history_ms = kNackRtpHistoryMs;
- video_receive_configs_[0].rtp.nack.rtp_history_ms = kNackRtpHistoryMs;
- video_receive_configs_[0].renderer = &test;
- // RTT needed for RemoteNtpTimeEstimator for the receive stream.
- video_receive_configs_[0].rtp.rtcp_xr.receiver_reference_time_report = true;
- // Start with realtime video.
- video_encoder_config_.content_type =
- VideoEncoderConfig::ContentType::kRealtimeVideo;
- // Second encoder config for the second part of the test uses screenshare
- VideoEncoderConfig encoder_config_with_screenshare_ =
- video_encoder_config_.Copy();
- encoder_config_with_screenshare_.content_type =
- VideoEncoderConfig::ContentType::kScreen;
+ task_queue_.SendTask([this, &test, &override_field_trials, &send_config,
+ &recv_config, &encoder_config_with_screenshare]() {
+ CreateSenderCall(send_config);
+ CreateReceiverCall(recv_config);
- CreateVideoStreams();
- CreateFrameGeneratorCapturer(kDefaultFramerate, kDefaultWidth,
- kDefaultHeight);
- Start();
+ receive_transport_.reset(test.CreateReceiveTransport(&task_queue_));
+ send_transport_.reset(
+ test.CreateSendTransport(&task_queue_, sender_call_.get()));
+ send_transport_->SetReceiver(receiver_call_->Receiver());
+ receive_transport_->SetReceiver(sender_call_->Receiver());
+
+ receiver_call_->SignalChannelNetworkState(MediaType::VIDEO, kNetworkUp);
+ CreateSendConfig(1, 0, 0, send_transport_.get());
+ CreateMatchingReceiveConfigs(receive_transport_.get());
+
+ // Modify send and receive configs.
+ video_send_config_.rtp.nack.rtp_history_ms = kNackRtpHistoryMs;
+ video_receive_configs_[0].rtp.nack.rtp_history_ms = kNackRtpHistoryMs;
+ video_receive_configs_[0].renderer = &test;
+ // RTT needed for RemoteNtpTimeEstimator for the receive stream.
+ video_receive_configs_[0].rtp.rtcp_xr.receiver_reference_time_report = true;
+ // Start with realtime video.
+ video_encoder_config_.content_type =
+ VideoEncoderConfig::ContentType::kRealtimeVideo;
+ // Second encoder config for the second part of the test uses screenshare
+ encoder_config_with_screenshare = video_encoder_config_.Copy();
+ encoder_config_with_screenshare.content_type =
+ VideoEncoderConfig::ContentType::kScreen;
+
+ CreateVideoStreams();
+ CreateFrameGeneratorCapturer(kDefaultFramerate, kDefaultWidth,
+ kDefaultHeight);
+ Start();
+ });
test.PerformTest();
// Replace old send stream.
- sender_call_->DestroyVideoSendStream(video_send_stream_);
- video_send_stream_ = sender_call_->CreateVideoSendStream(
- video_send_config_.Copy(), encoder_config_with_screenshare_.Copy());
- video_send_stream_->SetSource(
- frame_generator_capturer_.get(),
- VideoSendStream::DegradationPreference::kBalanced);
- video_send_stream_->Start();
+ task_queue_.SendTask([this, &encoder_config_with_screenshare]() {
+ sender_call_->DestroyVideoSendStream(video_send_stream_);
+ video_send_stream_ = sender_call_->CreateVideoSendStream(
+ video_send_config_.Copy(), encoder_config_with_screenshare.Copy());
+ video_send_stream_->SetSource(
+ frame_generator_capturer_.get(),
+ VideoSendStream::DegradationPreference::kBalanced);
+ video_send_stream_->Start();
+ });
// Continue to run test but now with screenshare.
test.PerformTest();
- send_transport_->StopSending();
- receive_transport_->StopSending();
- Stop();
- DestroyStreams();
- DestroyCalls();
- // Delete the call for Call stats to be reported.
- sender_call_.reset();
- receiver_call_.reset();
+ task_queue_.SendTask([this]() {
+ Stop();
+ DestroyStreams();
+ send_transport_.reset();
+ receive_transport_.reset();
+ DestroyCalls();
+ });
// Verify that stats have been updated for both screenshare and video.
EXPECT_EQ(1, metrics::NumSamples("WebRTC.Video.EndToEndDelayInMs"));
@@ -3097,9 +3183,11 @@
class EncoderRateStatsTest : public test::EndToEndTest,
public test::FakeEncoder {
public:
- EncoderRateStatsTest()
+ explicit EncoderRateStatsTest(
+ test::SingleThreadedTaskQueueForTesting* task_queue)
: EndToEndTest(kDefaultTimeoutMs),
FakeEncoder(Clock::GetRealTimeClock()),
+ task_queue_(task_queue),
send_stream_(nullptr),
bitrate_kbps_(0) {}
@@ -3131,11 +3219,14 @@
void PerformTest() override {
ASSERT_TRUE(Wait())
<< "Timed out while waiting for encoder SetRates() call.";
- WaitForEncoderTargetBitrateMatchStats();
- send_stream_->Stop();
- WaitForStatsReportZeroTargetBitrate();
- send_stream_->Start();
- WaitForEncoderTargetBitrateMatchStats();
+
+ task_queue_->SendTask([this]() {
+ WaitForEncoderTargetBitrateMatchStats();
+ send_stream_->Stop();
+ WaitForStatsReportZeroTargetBitrate();
+ send_stream_->Start();
+ WaitForEncoderTargetBitrateMatchStats();
+ });
}
void WaitForEncoderTargetBitrateMatchStats() {
@@ -3165,10 +3256,11 @@
}
private:
+ test::SingleThreadedTaskQueueForTesting* const task_queue_;
rtc::CriticalSection crit_;
VideoSendStream* send_stream_;
uint32_t bitrate_kbps_ GUARDED_BY(crit_);
- } test;
+ } test(&task_queue_);
RunBaseTest(&test);
}
@@ -3375,10 +3467,12 @@
return true;
}
- test::PacketTransport* CreateSendTransport(Call* sender_call) override {
+ test::PacketTransport* CreateSendTransport(
+ test::SingleThreadedTaskQueueForTesting* task_queue,
+ Call* sender_call) override {
FakeNetworkPipe::Config network_config;
network_config.loss_percent = 5;
- return new test::PacketTransport(sender_call, this,
+ return new test::PacketTransport(task_queue, sender_call, this,
test::PacketTransport::kSender,
payload_type_map_, network_config);
}
@@ -3483,8 +3577,7 @@
ADD_FAILURE() << "Timed out waiting for filled stats.";
for (std::map<std::string, bool>::const_iterator it =
receive_stats_filled_.begin();
- it != receive_stats_filled_.end();
- ++it) {
+ it != receive_stats_filled_.end(); ++it) {
if (!it->second) {
ADD_FAILURE() << "Missing receive stats: " << it->first;
}
@@ -3492,8 +3585,7 @@
for (std::map<std::string, bool>::const_iterator it =
send_stats_filled_.begin();
- it != send_stats_filled_.end();
- ++it) {
+ it != send_stats_filled_.end(); ++it) {
if (!it->second) {
ADD_FAILURE() << "Missing send stats: " << it->first;
}
@@ -3982,91 +4074,111 @@
std::map<uint32_t, bool> ssrc_observed_ GUARDED_BY(crit_);
} observer(use_rtx);
+ std::unique_ptr<test::PacketTransport> send_transport;
+ std::unique_ptr<test::PacketTransport> receive_transport;
+
Call::Config config(event_log_.get());
- CreateCalls(config, config);
+ VideoEncoderConfig one_stream;
- test::PacketTransport send_transport(
- sender_call_.get(), &observer, test::PacketTransport::kSender,
- payload_type_map_, FakeNetworkPipe::Config());
- test::PacketTransport receive_transport(
- nullptr, &observer, test::PacketTransport::kReceiver, payload_type_map_,
- FakeNetworkPipe::Config());
- send_transport.SetReceiver(receiver_call_->Receiver());
- receive_transport.SetReceiver(sender_call_->Receiver());
+ task_queue_.SendTask([this, &observer, &send_transport, &receive_transport,
+ &config, &one_stream, use_rtx]() {
+ CreateCalls(config, config);
- CreateSendConfig(kNumSsrcs, 0, 0, &send_transport);
+ send_transport = rtc::MakeUnique<test::PacketTransport>(
+ &task_queue_, sender_call_.get(), &observer,
+ test::PacketTransport::kSender, payload_type_map_,
+ FakeNetworkPipe::Config());
+ receive_transport = rtc::MakeUnique<test::PacketTransport>(
+ &task_queue_, nullptr, &observer, test::PacketTransport::kReceiver,
+ payload_type_map_, FakeNetworkPipe::Config());
+ send_transport->SetReceiver(receiver_call_->Receiver());
+ receive_transport->SetReceiver(sender_call_->Receiver());
- if (use_rtx) {
- for (size_t i = 0; i < kNumSsrcs; ++i) {
- video_send_config_.rtp.rtx.ssrcs.push_back(kSendRtxSsrcs[i]);
+ CreateSendConfig(kNumSsrcs, 0, 0, send_transport.get());
+
+ if (use_rtx) {
+ for (size_t i = 0; i < kNumSsrcs; ++i) {
+ video_send_config_.rtp.rtx.ssrcs.push_back(kSendRtxSsrcs[i]);
+ }
+ video_send_config_.rtp.rtx.payload_type = kSendRtxPayloadType;
}
- video_send_config_.rtp.rtx.payload_type = kSendRtxPayloadType;
- }
- video_encoder_config_.video_stream_factory =
- new rtc::RefCountedObject<VideoStreamFactory>();
- // Use the same total bitrates when sending a single stream to avoid lowering
- // the bitrate estimate and requiring a subsequent rampup.
- VideoEncoderConfig one_stream = video_encoder_config_.Copy();
- // one_stream.streams.resize(1);
- one_stream.number_of_streams = 1;
- CreateMatchingReceiveConfigs(&receive_transport);
+ video_encoder_config_.video_stream_factory =
+ new rtc::RefCountedObject<VideoStreamFactory>();
+ // Use the same total bitrates when sending a single stream to avoid
+ // lowering the bitrate estimate and requiring a subsequent rampup.
+ one_stream = video_encoder_config_.Copy();
+ // one_stream.streams.resize(1);
+ one_stream.number_of_streams = 1;
+ CreateMatchingReceiveConfigs(receive_transport.get());
- CreateVideoStreams();
- CreateFrameGeneratorCapturer(30, 1280, 720);
+ CreateVideoStreams();
+ CreateFrameGeneratorCapturer(30, 1280, 720);
- Start();
+ Start();
+ });
+
EXPECT_TRUE(observer.Wait())
<< "Timed out waiting for all SSRCs to send packets.";
// Test stream resetting more than once to make sure that the state doesn't
// get set once (this could be due to using std::map::insert for instance).
for (size_t i = 0; i < 3; ++i) {
- frame_generator_capturer_->Stop();
- sender_call_->DestroyVideoSendStream(video_send_stream_);
+ task_queue_.SendTask([&]() {
+ frame_generator_capturer_->Stop();
+ sender_call_->DestroyVideoSendStream(video_send_stream_);
- // Re-create VideoSendStream with only one stream.
- video_send_stream_ = sender_call_->CreateVideoSendStream(
- video_send_config_.Copy(), one_stream.Copy());
- video_send_stream_->Start();
- if (provoke_rtcpsr_before_rtp) {
- // Rapid Resync Request forces sending RTCP Sender Report back.
- // Using this request speeds up this test because then there is no need
- // to wait for a second for periodic Sender Report.
- rtcp::RapidResyncRequest force_send_sr_back_request;
- rtc::Buffer packet = force_send_sr_back_request.Build();
- static_cast<webrtc::test::DirectTransport&>(receive_transport)
- .SendRtcp(packet.data(), packet.size());
- }
- CreateFrameGeneratorCapturer(30, 1280, 720);
- frame_generator_capturer_->Start();
+ // Re-create VideoSendStream with only one stream.
+ video_send_stream_ = sender_call_->CreateVideoSendStream(
+ video_send_config_.Copy(), one_stream.Copy());
+ video_send_stream_->Start();
+ if (provoke_rtcpsr_before_rtp) {
+ // Rapid Resync Request forces sending RTCP Sender Report back.
+ // Using this request speeds up this test because then there is no need
+ // to wait for a second for periodic Sender Report.
+ rtcp::RapidResyncRequest force_send_sr_back_request;
+ rtc::Buffer packet = force_send_sr_back_request.Build();
+ static_cast<webrtc::test::DirectTransport*>(receive_transport.get())
+ ->SendRtcp(packet.data(), packet.size());
+ }
+ CreateFrameGeneratorCapturer(30, 1280, 720);
+ frame_generator_capturer_->Start();
+ });
observer.ResetExpectedSsrcs(1);
EXPECT_TRUE(observer.Wait()) << "Timed out waiting for single RTP packet.";
// Reconfigure back to use all streams.
- video_send_stream_->ReconfigureVideoEncoder(video_encoder_config_.Copy());
+ task_queue_.SendTask([this]() {
+ video_send_stream_->ReconfigureVideoEncoder(video_encoder_config_.Copy());
+ });
observer.ResetExpectedSsrcs(kNumSsrcs);
EXPECT_TRUE(observer.Wait())
<< "Timed out waiting for all SSRCs to send packets.";
// Reconfigure down to one stream.
- video_send_stream_->ReconfigureVideoEncoder(one_stream.Copy());
+ task_queue_.SendTask([this, &one_stream]() {
+ video_send_stream_->ReconfigureVideoEncoder(one_stream.Copy());
+ });
observer.ResetExpectedSsrcs(1);
EXPECT_TRUE(observer.Wait()) << "Timed out waiting for single RTP packet.";
// Reconfigure back to use all streams.
- video_send_stream_->ReconfigureVideoEncoder(video_encoder_config_.Copy());
+ task_queue_.SendTask([this]() {
+ video_send_stream_->ReconfigureVideoEncoder(video_encoder_config_.Copy());
+ });
observer.ResetExpectedSsrcs(kNumSsrcs);
EXPECT_TRUE(observer.Wait())
<< "Timed out waiting for all SSRCs to send packets.";
}
- send_transport.StopSending();
- receive_transport.StopSending();
-
- Stop();
- DestroyStreams();
+ task_queue_.SendTask([this, &send_transport, &receive_transport]() {
+ Stop();
+ DestroyStreams();
+ send_transport.reset();
+ receive_transport.reset();
+ DestroyCalls();
+ });
}
TEST_F(EndToEndTest, RestartingSendStreamPreservesRtpState) {
@@ -4157,96 +4269,118 @@
rtc::CriticalSection crit_;
} observer;
+ constexpr int kFrameMaxWidth = 320;
+ constexpr int kFrameMaxHeight = 180;
+ constexpr int kFrameRate = 15;
+
Call::Config config(event_log_.get());
- CreateCalls(config, config);
- FakeNetworkPipe::Config lossy_delayed_link;
- lossy_delayed_link.loss_percent = 2;
- lossy_delayed_link.queue_delay_ms = 50;
- test::PacketTransport send_transport(sender_call_.get(), &observer,
- test::PacketTransport::kSender,
- payload_type_map_, lossy_delayed_link);
- send_transport.SetReceiver(receiver_call_->Receiver());
+ std::unique_ptr<test::PacketTransport> send_transport;
+ std::unique_ptr<test::PacketTransport> receive_transport;
+ std::unique_ptr<VideoEncoder> encoder;
- FakeNetworkPipe::Config flawless_link;
- test::PacketTransport receive_transport(nullptr, &observer,
- test::PacketTransport::kReceiver,
- payload_type_map_, flawless_link);
- receive_transport.SetReceiver(sender_call_->Receiver());
+ task_queue_.SendTask([&]() {
+ CreateCalls(config, config);
- // For reduced flakyness, we use a real VP8 encoder together with NACK
- // and RTX.
- const int kNumVideoStreams = 1;
- const int kNumFlexfecStreams = 1;
- CreateSendConfig(kNumVideoStreams, 0, kNumFlexfecStreams, &send_transport);
- std::unique_ptr<VideoEncoder> encoder(VP8Encoder::Create());
- video_send_config_.encoder_settings.encoder = encoder.get();
- video_send_config_.encoder_settings.payload_name = "VP8";
- video_send_config_.encoder_settings.payload_type = kVideoSendPayloadType;
- video_send_config_.rtp.nack.rtp_history_ms = kNackRtpHistoryMs;
- video_send_config_.rtp.rtx.ssrcs.push_back(kSendRtxSsrcs[0]);
- video_send_config_.rtp.rtx.payload_type = kSendRtxPayloadType;
+ FakeNetworkPipe::Config lossy_delayed_link;
+ lossy_delayed_link.loss_percent = 2;
+ lossy_delayed_link.queue_delay_ms = 50;
- CreateMatchingReceiveConfigs(&receive_transport);
- video_receive_configs_[0].rtp.nack.rtp_history_ms = kNackRtpHistoryMs;
- video_receive_configs_[0].rtp.rtx_ssrc = kSendRtxSsrcs[0];
- video_receive_configs_[0].rtp.rtx_payload_types[kVideoSendPayloadType] =
- kSendRtxPayloadType;
+ send_transport = rtc::MakeUnique<test::PacketTransport>(
+ &task_queue_, sender_call_.get(), &observer,
+ test::PacketTransport::kSender, payload_type_map_, lossy_delayed_link);
+ send_transport->SetReceiver(receiver_call_->Receiver());
- // The matching FlexFEC receive config is not created by
- // CreateMatchingReceiveConfigs since this is not a test::BaseTest.
- // Set up the receive config manually instead.
- FlexfecReceiveStream::Config flexfec_receive_config(&receive_transport);
- flexfec_receive_config.payload_type =
- video_send_config_.rtp.flexfec.payload_type;
- flexfec_receive_config.remote_ssrc = video_send_config_.rtp.flexfec.ssrc;
- flexfec_receive_config.protected_media_ssrcs =
- video_send_config_.rtp.flexfec.protected_media_ssrcs;
- flexfec_receive_config.local_ssrc = kReceiverLocalVideoSsrc;
- flexfec_receive_config.transport_cc = true;
- flexfec_receive_config.rtp_header_extensions.emplace_back(
- RtpExtension::kTransportSequenceNumberUri,
- test::kTransportSequenceNumberExtensionId);
- flexfec_receive_configs_.push_back(flexfec_receive_config);
+ FakeNetworkPipe::Config flawless_link;
+ receive_transport = rtc::MakeUnique<test::PacketTransport>(
+ &task_queue_, nullptr, &observer, test::PacketTransport::kReceiver,
+ payload_type_map_, flawless_link);
+ receive_transport->SetReceiver(sender_call_->Receiver());
- CreateFlexfecStreams();
- CreateVideoStreams();
+ // For reduced flakyness, we use a real VP8 encoder together with NACK
+ // and RTX.
+ const int kNumVideoStreams = 1;
+ const int kNumFlexfecStreams = 1;
+ CreateSendConfig(kNumVideoStreams, 0, kNumFlexfecStreams,
+ send_transport.get());
+ encoder = rtc::WrapUnique(VP8Encoder::Create());
+ video_send_config_.encoder_settings.encoder = encoder.get();
+ video_send_config_.encoder_settings.payload_name = "VP8";
+ video_send_config_.encoder_settings.payload_type = kVideoSendPayloadType;
+ video_send_config_.rtp.nack.rtp_history_ms = kNackRtpHistoryMs;
+ video_send_config_.rtp.rtx.ssrcs.push_back(kSendRtxSsrcs[0]);
+ video_send_config_.rtp.rtx.payload_type = kSendRtxPayloadType;
- // RTCP might be disabled if the network is "down".
- sender_call_->SignalChannelNetworkState(MediaType::VIDEO, kNetworkUp);
- receiver_call_->SignalChannelNetworkState(MediaType::VIDEO, kNetworkUp);
+ CreateMatchingReceiveConfigs(receive_transport.get());
+ video_receive_configs_[0].rtp.nack.rtp_history_ms = kNackRtpHistoryMs;
+ video_receive_configs_[0].rtp.rtx_ssrc = kSendRtxSsrcs[0];
+ video_receive_configs_[0].rtp.rtx_payload_types[kVideoSendPayloadType] =
+ kSendRtxPayloadType;
- const int kFrameMaxWidth = 320;
- const int kFrameMaxHeight = 180;
- const int kFrameRate = 15;
- CreateFrameGeneratorCapturer(kFrameRate, kFrameMaxWidth, kFrameMaxHeight);
+ // The matching FlexFEC receive config is not created by
+ // CreateMatchingReceiveConfigs since this is not a test::BaseTest.
+ // Set up the receive config manually instead.
+ FlexfecReceiveStream::Config flexfec_receive_config(
+ receive_transport.get());
+ flexfec_receive_config.payload_type =
+ video_send_config_.rtp.flexfec.payload_type;
+ flexfec_receive_config.remote_ssrc = video_send_config_.rtp.flexfec.ssrc;
+ flexfec_receive_config.protected_media_ssrcs =
+ video_send_config_.rtp.flexfec.protected_media_ssrcs;
+ flexfec_receive_config.local_ssrc = kReceiverLocalVideoSsrc;
+ flexfec_receive_config.transport_cc = true;
+ flexfec_receive_config.rtp_header_extensions.emplace_back(
+ RtpExtension::kTransportSequenceNumberUri,
+ test::kTransportSequenceNumberExtensionId);
+ flexfec_receive_configs_.push_back(flexfec_receive_config);
+
+ CreateFlexfecStreams();
+ CreateVideoStreams();
+
+ // RTCP might be disabled if the network is "down".
+ sender_call_->SignalChannelNetworkState(MediaType::VIDEO, kNetworkUp);
+ receiver_call_->SignalChannelNetworkState(MediaType::VIDEO, kNetworkUp);
+
+ CreateFrameGeneratorCapturer(kFrameRate, kFrameMaxWidth, kFrameMaxHeight);
+
+ Start();
+ });
// Initial test.
- Start();
EXPECT_TRUE(observer.Wait()) << "Timed out waiting for packets.";
- // Ensure monotonicity when the VideoSendStream is restarted.
- Stop();
- observer.ResetPacketCount();
- Start();
+ task_queue_.SendTask([this, &observer]() {
+ // Ensure monotonicity when the VideoSendStream is restarted.
+ Stop();
+ observer.ResetPacketCount();
+ Start();
+ });
+
EXPECT_TRUE(observer.Wait()) << "Timed out waiting for packets.";
- // Ensure monotonicity when the VideoSendStream is recreated.
- frame_generator_capturer_->Stop();
- sender_call_->DestroyVideoSendStream(video_send_stream_);
- observer.ResetPacketCount();
- video_send_stream_ = sender_call_->CreateVideoSendStream(
- video_send_config_.Copy(), video_encoder_config_.Copy());
- video_send_stream_->Start();
- CreateFrameGeneratorCapturer(kFrameRate, kFrameMaxWidth, kFrameMaxHeight);
- frame_generator_capturer_->Start();
+ task_queue_.SendTask([this, &observer,
+ kFrameRate, kFrameMaxWidth, kFrameMaxHeight]() {
+ // Ensure monotonicity when the VideoSendStream is recreated.
+ frame_generator_capturer_->Stop();
+ sender_call_->DestroyVideoSendStream(video_send_stream_);
+ observer.ResetPacketCount();
+ video_send_stream_ = sender_call_->CreateVideoSendStream(
+ video_send_config_.Copy(), video_encoder_config_.Copy());
+ video_send_stream_->Start();
+ CreateFrameGeneratorCapturer(kFrameRate, kFrameMaxWidth, kFrameMaxHeight);
+ frame_generator_capturer_->Start();
+ });
+
EXPECT_TRUE(observer.Wait()) << "Timed out waiting for packets.";
// Cleanup.
- send_transport.StopSending();
- receive_transport.StopSending();
- Stop();
- DestroyStreams();
+ task_queue_.SendTask([this, &send_transport, &receive_transport]() {
+ Stop();
+ DestroyStreams();
+ send_transport.reset();
+ receive_transport.reset();
+ DestroyCalls();
+ });
}
TEST_F(EndToEndTest, RespectsNetworkState) {
@@ -4261,9 +4395,11 @@
static const int kNumAcceptedDowntimeRtcp = 1;
class NetworkStateTest : public test::EndToEndTest, public test::FakeEncoder {
public:
- NetworkStateTest()
+ explicit NetworkStateTest(
+ test::SingleThreadedTaskQueueForTesting* task_queue)
: EndToEndTest(kDefaultTimeoutMs),
FakeEncoder(Clock::GetRealTimeClock()),
+ task_queue_(task_queue),
encoded_frames_(false, false),
packet_event_(false, false),
sender_call_(nullptr),
@@ -4320,51 +4456,57 @@
void PerformTest() override {
EXPECT_TRUE(encoded_frames_.Wait(kDefaultTimeoutMs))
<< "No frames received by the encoder.";
- // Wait for packets from both sender/receiver.
- WaitForPacketsOrSilence(false, false);
- // Sender-side network down for audio; there should be no effect on video
- sender_call_->SignalChannelNetworkState(MediaType::AUDIO, kNetworkDown);
- WaitForPacketsOrSilence(false, false);
+ task_queue_->SendTask([this]() {
+ // Wait for packets from both sender/receiver.
+ WaitForPacketsOrSilence(false, false);
- // Receiver-side network down for audio; no change expected
- receiver_call_->SignalChannelNetworkState(MediaType::AUDIO, kNetworkDown);
- WaitForPacketsOrSilence(false, false);
+ // Sender-side network down for audio; there should be no effect on
+ // video
+ sender_call_->SignalChannelNetworkState(MediaType::AUDIO, kNetworkDown);
+ WaitForPacketsOrSilence(false, false);
- // Sender-side network down.
- sender_call_->SignalChannelNetworkState(MediaType::VIDEO, kNetworkDown);
- {
- rtc::CritScope lock(&test_crit_);
- // After network goes down we shouldn't be encoding more frames.
- sender_state_ = kNetworkDown;
- }
- // Wait for receiver-packets and no sender packets.
- WaitForPacketsOrSilence(true, false);
+ // Receiver-side network down for audio; no change expected
+ receiver_call_->SignalChannelNetworkState(MediaType::AUDIO,
+ kNetworkDown);
+ WaitForPacketsOrSilence(false, false);
- // Receiver-side network down.
- receiver_call_->SignalChannelNetworkState(MediaType::VIDEO, kNetworkDown);
- WaitForPacketsOrSilence(true, true);
+ // Sender-side network down.
+ sender_call_->SignalChannelNetworkState(MediaType::VIDEO, kNetworkDown);
+ {
+ rtc::CritScope lock(&test_crit_);
+ // After network goes down we shouldn't be encoding more frames.
+ sender_state_ = kNetworkDown;
+ }
+ // Wait for receiver-packets and no sender packets.
+ WaitForPacketsOrSilence(true, false);
- // Network up for audio for both sides; video is still not expected to
- // start
- sender_call_->SignalChannelNetworkState(MediaType::AUDIO, kNetworkUp);
- receiver_call_->SignalChannelNetworkState(MediaType::AUDIO, kNetworkUp);
- WaitForPacketsOrSilence(true, true);
+ // Receiver-side network down.
+ receiver_call_->SignalChannelNetworkState(MediaType::VIDEO,
+ kNetworkDown);
+ WaitForPacketsOrSilence(true, true);
- // Network back up again for both.
- {
- rtc::CritScope lock(&test_crit_);
- // It's OK to encode frames again, as we're about to bring up the
- // network.
- sender_state_ = kNetworkUp;
- }
- sender_call_->SignalChannelNetworkState(MediaType::VIDEO, kNetworkUp);
- receiver_call_->SignalChannelNetworkState(MediaType::VIDEO, kNetworkUp);
- WaitForPacketsOrSilence(false, false);
+ // Network up for audio for both sides; video is still not expected to
+ // start
+ sender_call_->SignalChannelNetworkState(MediaType::AUDIO, kNetworkUp);
+ receiver_call_->SignalChannelNetworkState(MediaType::AUDIO, kNetworkUp);
+ WaitForPacketsOrSilence(true, true);
- // TODO(skvlad): add tests to verify that the audio streams are stopped
- // when the network goes down for audio once the workaround in
- // paced_sender.cc is removed.
+ // Network back up again for both.
+ {
+ rtc::CritScope lock(&test_crit_);
+ // It's OK to encode frames again, as we're about to bring up the
+ // network.
+ sender_state_ = kNetworkUp;
+ }
+ sender_call_->SignalChannelNetworkState(MediaType::VIDEO, kNetworkUp);
+ receiver_call_->SignalChannelNetworkState(MediaType::VIDEO, kNetworkUp);
+ WaitForPacketsOrSilence(false, false);
+
+ // TODO(skvlad): add tests to verify that the audio streams are stopped
+ // when the network goes down for audio once the workaround in
+ // paced_sender.cc is removed.
+ });
}
int32_t Encode(const VideoFrame& input_image,
@@ -4434,6 +4576,7 @@
}
}
+ test::SingleThreadedTaskQueueForTesting* const task_queue_;
rtc::CriticalSection test_crit_;
rtc::Event encoded_frames_;
rtc::Event packet_event_;
@@ -4445,7 +4588,7 @@
int sender_rtcp_ GUARDED_BY(test_crit_);
int receiver_rtcp_ GUARDED_BY(test_crit_);
int down_frames_ GUARDED_BY(test_crit_);
- } test;
+ } test(&task_queue_);
RunBaseTest(&test);
}
@@ -4453,25 +4596,30 @@
TEST_F(EndToEndTest, CallReportsRttForSender) {
static const int kSendDelayMs = 30;
static const int kReceiveDelayMs = 70;
- CreateCalls(Call::Config(event_log_.get()), Call::Config(event_log_.get()));
- FakeNetworkPipe::Config config;
- config.queue_delay_ms = kSendDelayMs;
- test::DirectTransport sender_transport(config, sender_call_.get(),
- payload_type_map_);
- config.queue_delay_ms = kReceiveDelayMs;
- test::DirectTransport receiver_transport(config, receiver_call_.get(),
- payload_type_map_);
- sender_transport.SetReceiver(receiver_call_->Receiver());
- receiver_transport.SetReceiver(sender_call_->Receiver());
+ std::unique_ptr<test::DirectTransport> sender_transport;
+ std::unique_ptr<test::DirectTransport> receiver_transport;
- CreateSendConfig(1, 0, 0, &sender_transport);
- CreateMatchingReceiveConfigs(&receiver_transport);
+ task_queue_.SendTask([this, &sender_transport, &receiver_transport]() {
+ FakeNetworkPipe::Config config;
+ config.queue_delay_ms = kSendDelayMs;
+ CreateCalls(Call::Config(event_log_.get()), Call::Config(event_log_.get()));
+ sender_transport = rtc::MakeUnique<test::DirectTransport>(
+ &task_queue_, config, sender_call_.get(), payload_type_map_);
+ config.queue_delay_ms = kReceiveDelayMs;
+ receiver_transport = rtc::MakeUnique<test::DirectTransport>(
+ &task_queue_, config, receiver_call_.get(), payload_type_map_);
+ sender_transport->SetReceiver(receiver_call_->Receiver());
+ receiver_transport->SetReceiver(sender_call_->Receiver());
- CreateVideoStreams();
- CreateFrameGeneratorCapturer(kDefaultFramerate, kDefaultWidth,
- kDefaultHeight);
- Start();
+ CreateSendConfig(1, 0, 0, sender_transport.get());
+ CreateMatchingReceiveConfigs(receiver_transport.get());
+
+ CreateVideoStreams();
+ CreateFrameGeneratorCapturer(kDefaultFramerate, kDefaultWidth,
+ kDefaultHeight);
+ Start();
+ });
int64_t start_time_ms = clock_->TimeInMilliseconds();
while (true) {
@@ -4489,55 +4637,70 @@
SleepMs(10);
}
- sender_transport.StopSending();
- receiver_transport.StopSending();
- Stop();
- DestroyStreams();
- DestroyCalls();
+ task_queue_.SendTask([this, &sender_transport, &receiver_transport]() {
+ Stop();
+ DestroyStreams();
+ sender_transport.reset();
+ receiver_transport.reset();
+ DestroyCalls();
+ });
}
void EndToEndTest::VerifyNewVideoSendStreamsRespectNetworkState(
MediaType network_to_bring_up,
VideoEncoder* encoder,
Transport* transport) {
- CreateSenderCall(Call::Config(event_log_.get()));
- sender_call_->SignalChannelNetworkState(network_to_bring_up, kNetworkUp);
+ task_queue_.SendTask([this, network_to_bring_up, encoder, transport]() {
+ CreateSenderCall(Call::Config(event_log_.get()));
+ sender_call_->SignalChannelNetworkState(network_to_bring_up, kNetworkUp);
- CreateSendConfig(1, 0, 0, transport);
- video_send_config_.encoder_settings.encoder = encoder;
- CreateVideoStreams();
- CreateFrameGeneratorCapturer(kDefaultFramerate, kDefaultWidth,
- kDefaultHeight);
+ CreateSendConfig(1, 0, 0, transport);
+ video_send_config_.encoder_settings.encoder = encoder;
+ CreateVideoStreams();
+ CreateFrameGeneratorCapturer(kDefaultFramerate, kDefaultWidth,
+ kDefaultHeight);
- Start();
+ Start();
+ });
+
SleepMs(kSilenceTimeoutMs);
- Stop();
- DestroyStreams();
+ task_queue_.SendTask([this]() {
+ Stop();
+ DestroyStreams();
+ DestroyCalls();
+ });
}
void EndToEndTest::VerifyNewVideoReceiveStreamsRespectNetworkState(
MediaType network_to_bring_up,
Transport* transport) {
- Call::Config config(event_log_.get());
- CreateCalls(config, config);
- receiver_call_->SignalChannelNetworkState(network_to_bring_up, kNetworkUp);
+ std::unique_ptr<test::DirectTransport> sender_transport;
- test::DirectTransport sender_transport(sender_call_.get(), payload_type_map_);
- sender_transport.SetReceiver(receiver_call_->Receiver());
- CreateSendConfig(1, 0, 0, &sender_transport);
- CreateMatchingReceiveConfigs(transport);
- CreateVideoStreams();
- CreateFrameGeneratorCapturer(kDefaultFramerate, kDefaultWidth,
- kDefaultHeight);
+ task_queue_.SendTask([this, &sender_transport, network_to_bring_up,
+ transport]() {
+ Call::Config config(event_log_.get());
+ CreateCalls(config, config);
+ receiver_call_->SignalChannelNetworkState(network_to_bring_up, kNetworkUp);
+ sender_transport = rtc::MakeUnique<test::DirectTransport>(
+ &task_queue_, sender_call_.get(), payload_type_map_);
+ sender_transport->SetReceiver(receiver_call_->Receiver());
+ CreateSendConfig(1, 0, 0, sender_transport.get());
+ CreateMatchingReceiveConfigs(transport);
+ CreateVideoStreams();
+ CreateFrameGeneratorCapturer(kDefaultFramerate, kDefaultWidth,
+ kDefaultHeight);
+ Start();
+ });
- Start();
SleepMs(kSilenceTimeoutMs);
- Stop();
- sender_transport.StopSending();
-
- DestroyStreams();
+ task_queue_.SendTask([this, &sender_transport]() {
+ Stop();
+ DestroyStreams();
+ sender_transport.reset();
+ DestroyCalls();
+ });
}
TEST_F(EndToEndTest, NewVideoSendStreamsRespectVideoNetworkDown) {
diff --git a/webrtc/video/picture_id_tests.cc b/webrtc/video/picture_id_tests.cc
index a1cb405..13574b8 100644
--- a/webrtc/video/picture_id_tests.cc
+++ b/webrtc/video/picture_id_tests.cc
@@ -139,6 +139,14 @@
virtual ~PictureIdTest() {
EXPECT_EQ(nullptr, video_send_stream_);
EXPECT_TRUE(video_receive_streams_.empty());
+
+ task_queue_.SendTask([this]() {
+ Stop();
+ DestroyStreams();
+ send_transport_.reset();
+ receive_transport_.reset();
+ DestroyCalls();
+ });
}
void SetupEncoder(VideoEncoder* encoder);
@@ -196,28 +204,34 @@
};
void PictureIdTest::SetupEncoder(VideoEncoder* encoder) {
- Call::Config config(event_log_.get());
- CreateCalls(config, config);
+ task_queue_.SendTask([this, &encoder]() {
+ Call::Config config(event_log_.get());
+ CreateCalls(config, config);
- send_transport_.reset(new test::PacketTransport(
- sender_call_.get(), &observer, test::PacketTransport::kSender,
- payload_type_map_, FakeNetworkPipe::Config()));
+ send_transport_.reset(new test::PacketTransport(
+ &task_queue_, sender_call_.get(), &observer,
+ test::PacketTransport::kSender, payload_type_map_,
+ FakeNetworkPipe::Config()));
- CreateSendConfig(kNumSsrcs, 0, 0, send_transport_.get());
- video_send_config_.encoder_settings.encoder = encoder;
- video_send_config_.encoder_settings.payload_name = "VP8";
- video_encoder_config_.video_stream_factory =
- new rtc::RefCountedObject<VideoStreamFactory>();
- video_encoder_config_.number_of_streams = 1;
+ CreateSendConfig(kNumSsrcs, 0, 0, send_transport_.get());
+ video_send_config_.encoder_settings.encoder = encoder;
+ video_send_config_.encoder_settings.payload_name = "VP8";
+ video_encoder_config_.video_stream_factory =
+ new rtc::RefCountedObject<VideoStreamFactory>();
+ video_encoder_config_.number_of_streams = 1;
+ });
}
void PictureIdTest::TestPictureIdContinuousAfterReconfigure(
const std::vector<int>& ssrc_counts) {
- CreateVideoStreams();
- CreateFrameGeneratorCapturer(kFrameRate, kFrameMaxWidth, kFrameMaxHeight);
+ task_queue_.SendTask([this]() {
+ CreateVideoStreams();
+ CreateFrameGeneratorCapturer(kFrameRate, kFrameMaxWidth, kFrameMaxHeight);
- // Initial test with a single stream.
- Start();
+ // Initial test with a single stream.
+ Start();
+ });
+
EXPECT_TRUE(observer.Wait()) << "Timed out waiting for packets.";
// Reconfigure VideoEncoder and test picture id increase.
@@ -228,21 +242,31 @@
observer.SetExpectedSsrcs(ssrc_count);
observer.ResetObservedSsrcs();
// Make sure the picture_id sequence is continuous on reinit and recreate.
- video_send_stream_->ReconfigureVideoEncoder(video_encoder_config_.Copy());
+ task_queue_.SendTask([this]() {
+ video_send_stream_->ReconfigureVideoEncoder(video_encoder_config_.Copy());
+ });
EXPECT_TRUE(observer.Wait()) << "Timed out waiting for packets.";
}
- Stop();
- DestroyStreams();
+ task_queue_.SendTask([this]() {
+ Stop();
+ DestroyStreams();
+ send_transport_.reset();
+ receive_transport_.reset();
+ DestroyCalls();
+ });
}
void PictureIdTest::TestPictureIdIncreaseAfterRecreateStreams(
const std::vector<int>& ssrc_counts) {
- CreateVideoStreams();
- CreateFrameGeneratorCapturer(kFrameRate, kFrameMaxWidth, kFrameMaxHeight);
+ task_queue_.SendTask([this]() {
+ CreateVideoStreams();
+ CreateFrameGeneratorCapturer(kFrameRate, kFrameMaxWidth, kFrameMaxHeight);
- // Initial test with a single stream.
- Start();
+ // Initial test with a single stream.
+ Start();
+ });
+
EXPECT_TRUE(observer.Wait()) << "Timed out waiting for packets.";
// Recreate VideoSendStream and test picture id increase.
@@ -250,24 +274,31 @@
// with it, therefore it is expected that some frames might be lost.
observer.SetMaxExpectedPictureIdGap(kMaxFramesLost);
for (int ssrc_count : ssrc_counts) {
- video_encoder_config_.number_of_streams = ssrc_count;
+ task_queue_.SendTask([this, &ssrc_count]() {
+ video_encoder_config_.number_of_streams = ssrc_count;
- frame_generator_capturer_->Stop();
- sender_call_->DestroyVideoSendStream(video_send_stream_);
+ frame_generator_capturer_->Stop();
+ sender_call_->DestroyVideoSendStream(video_send_stream_);
- observer.SetExpectedSsrcs(ssrc_count);
- observer.ResetObservedSsrcs();
+ observer.SetExpectedSsrcs(ssrc_count);
+ observer.ResetObservedSsrcs();
- video_send_stream_ = sender_call_->CreateVideoSendStream(
- video_send_config_.Copy(), video_encoder_config_.Copy());
- video_send_stream_->Start();
- CreateFrameGeneratorCapturer(kFrameRate, kFrameMaxWidth, kFrameMaxHeight);
- frame_generator_capturer_->Start();
+ video_send_stream_ = sender_call_->CreateVideoSendStream(
+ video_send_config_.Copy(), video_encoder_config_.Copy());
+ video_send_stream_->Start();
+ CreateFrameGeneratorCapturer(kFrameRate, kFrameMaxWidth, kFrameMaxHeight);
+ frame_generator_capturer_->Start();
+ });
+
EXPECT_TRUE(observer.Wait()) << "Timed out waiting for packets.";
}
- Stop();
- DestroyStreams();
+ task_queue_.SendTask([this]() {
+ Stop();
+ DestroyStreams();
+ send_transport_.reset();
+ receive_transport_.reset();
+ });
}
TEST_F(PictureIdTest, PictureIdContinuousAfterReconfigureVp8) {
diff --git a/webrtc/video/video_quality_test.cc b/webrtc/video/video_quality_test.cc
index 6ad64bc..16d7356 100644
--- a/webrtc/video/video_quality_test.cc
+++ b/webrtc/video/video_quality_test.cc
@@ -37,6 +37,7 @@
#include "webrtc/rtc_base/memory_usage.h"
#include "webrtc/rtc_base/optional.h"
#include "webrtc/rtc_base/platform_file.h"
+#include "webrtc/rtc_base/ptr_util.h"
#include "webrtc/rtc_base/timeutils.h"
#include "webrtc/system_wrappers/include/cpu_info.h"
#include "webrtc/system_wrappers/include/field_trial.h"
@@ -1651,6 +1652,10 @@
sender_call_->DestroyVideoReceiveStream(thumbnail_receive_stream);
thumbnail_send_streams_.clear();
thumbnail_receive_streams_.clear();
+ for (std::unique_ptr<test::VideoCapturer>& video_caputurer :
+ thumbnail_capturers_) {
+ video_caputurer.reset();
+ }
}
void VideoQualityTest::SetupScreenshareOrSVC() {
@@ -1769,6 +1774,11 @@
}
void VideoQualityTest::RunWithAnalyzer(const Params& params) {
+ std::unique_ptr<test::LayerFilteringTransport> send_transport;
+ std::unique_ptr<test::DirectTransport> recv_transport;
+ FILE* graph_data_output_file = nullptr;
+ std::unique_ptr<VideoAnalyzer> analyzer;
+
params_ = params;
RTC_CHECK(!params_.audio.enabled);
@@ -1776,7 +1786,6 @@
// differentiate between the analyzer and the renderer case.
CheckParams();
- FILE* graph_data_output_file = nullptr;
if (!params_.analyzer.graph_data_output_filename.empty()) {
graph_data_output_file =
fopen(params_.analyzer.graph_data_output_filename.c_str(), "w");
@@ -1794,22 +1803,26 @@
Call::Config call_config(event_log_.get());
call_config.bitrate_config = params.call.call_bitrate_config;
- CreateCalls(call_config, call_config);
- test::LayerFilteringTransport send_transport(
- params_.pipe, sender_call_.get(), kPayloadTypeVP8, kPayloadTypeVP9,
- params_.video.selected_tl, params_.ss.selected_sl, payload_type_map_);
+ task_queue_.SendTask([this, &call_config, &send_transport,
+ &recv_transport]() {
+ CreateCalls(call_config, call_config);
- test::DirectTransport recv_transport(params_.pipe, receiver_call_.get(),
- payload_type_map_);
+ send_transport = rtc::MakeUnique<test::LayerFilteringTransport>(
+ &task_queue_, params_.pipe, sender_call_.get(), kPayloadTypeVP8,
+ kPayloadTypeVP9, params_.video.selected_tl, params_.ss.selected_sl,
+ payload_type_map_);
+
+ recv_transport = rtc::MakeUnique<test::DirectTransport>(
+ &task_queue_, params_.pipe, receiver_call_.get(), payload_type_map_);
+ });
std::string graph_title = params_.analyzer.graph_title;
if (graph_title.empty())
graph_title = VideoQualityTest::GenerateGraphTitle();
-
bool is_quick_test_enabled = field_trial::IsEnabled("WebRTC-QuickPerfTest");
- VideoAnalyzer analyzer(
- &send_transport, params_.analyzer.test_label,
+ analyzer = rtc::MakeUnique<VideoAnalyzer>(
+ send_transport.get(), params_.analyzer.test_label,
params_.analyzer.avg_psnr_threshold, params_.analyzer.avg_ssim_threshold,
is_quick_test_enabled
? kFramesSentInQuickTest
@@ -1820,82 +1833,91 @@
static_cast<size_t>(params_.ss.selected_stream), params.ss.selected_sl,
params_.video.selected_tl, is_quick_test_enabled, clock_,
params_.logging.rtp_dump_name);
- analyzer.SetCall(sender_call_.get());
- analyzer.SetReceiver(receiver_call_->Receiver());
- send_transport.SetReceiver(&analyzer);
- recv_transport.SetReceiver(sender_call_->Receiver());
- SetupVideo(&analyzer, &recv_transport);
- SetupThumbnails(&analyzer, &recv_transport);
- video_receive_configs_[params_.ss.selected_stream].renderer = &analyzer;
- video_send_config_.pre_encode_callback = analyzer.pre_encode_proxy();
- RTC_DCHECK(!video_send_config_.post_encode_callback);
- video_send_config_.post_encode_callback = analyzer.encode_timing_proxy();
+ task_queue_.SendTask([&]() {
+ analyzer->SetCall(sender_call_.get());
+ analyzer->SetReceiver(receiver_call_->Receiver());
+ send_transport->SetReceiver(analyzer.get());
+ recv_transport->SetReceiver(sender_call_->Receiver());
- SetupScreenshareOrSVC();
+ SetupVideo(analyzer.get(), recv_transport.get());
+ SetupThumbnails(analyzer.get(), recv_transport.get());
+ video_receive_configs_[params_.ss.selected_stream].renderer =
+ analyzer.get();
+ video_send_config_.pre_encode_callback = analyzer->pre_encode_proxy();
+ RTC_DCHECK(!video_send_config_.post_encode_callback);
+ video_send_config_.post_encode_callback = analyzer->encode_timing_proxy();
- CreateFlexfecStreams();
- CreateVideoStreams();
- analyzer.SetSendStream(video_send_stream_);
- if (video_receive_streams_.size() == 1)
- analyzer.SetReceiveStream(video_receive_streams_[0]);
+ SetupScreenshareOrSVC();
- video_send_stream_->SetSource(analyzer.OutputInterface(),
- degradation_preference_);
+ CreateFlexfecStreams();
+ CreateVideoStreams();
+ analyzer->SetSendStream(video_send_stream_);
+ if (video_receive_streams_.size() == 1)
+ analyzer->SetReceiveStream(video_receive_streams_[0]);
- SetupThumbnailCapturers(params_.call.num_thumbnails);
- for (size_t i = 0; i < thumbnail_send_streams_.size(); ++i) {
- thumbnail_send_streams_[i]->SetSource(thumbnail_capturers_[i].get(),
- degradation_preference_);
- }
+ video_send_stream_->SetSource(analyzer->OutputInterface(),
+ degradation_preference_);
- CreateCapturer();
+ SetupThumbnailCapturers(params_.call.num_thumbnails);
+ for (size_t i = 0; i < thumbnail_send_streams_.size(); ++i) {
+ thumbnail_send_streams_[i]->SetSource(thumbnail_capturers_[i].get(),
+ degradation_preference_);
+ }
- analyzer.SetSource(video_capturer_.get(), params_.ss.infer_streams);
+ CreateCapturer();
- StartEncodedFrameLogs(video_send_stream_);
- StartEncodedFrameLogs(video_receive_streams_[params_.ss.selected_stream]);
- video_send_stream_->Start();
- for (VideoSendStream* thumbnail_send_stream : thumbnail_send_streams_)
- thumbnail_send_stream->Start();
- for (VideoReceiveStream* receive_stream : video_receive_streams_)
- receive_stream->Start();
- for (VideoReceiveStream* thumbnail_receive_stream :
- thumbnail_receive_streams_)
- thumbnail_receive_stream->Start();
+ analyzer->SetSource(video_capturer_.get(), params_.ss.infer_streams);
- analyzer.StartMeasuringCpuProcessTime();
+ StartEncodedFrameLogs(video_send_stream_);
+ StartEncodedFrameLogs(video_receive_streams_[params_.ss.selected_stream]);
+ video_send_stream_->Start();
+ for (VideoSendStream* thumbnail_send_stream : thumbnail_send_streams_)
+ thumbnail_send_stream->Start();
+ for (VideoReceiveStream* receive_stream : video_receive_streams_)
+ receive_stream->Start();
+ for (VideoReceiveStream* thumbnail_receive_stream :
+ thumbnail_receive_streams_)
+ thumbnail_receive_stream->Start();
- video_capturer_->Start();
- for (std::unique_ptr<test::VideoCapturer>& video_caputurer :
- thumbnail_capturers_) {
- video_caputurer->Start();
- }
+ analyzer->StartMeasuringCpuProcessTime();
- analyzer.Wait();
+ video_capturer_->Start();
+ for (std::unique_ptr<test::VideoCapturer>& video_caputurer :
+ thumbnail_capturers_) {
+ video_caputurer->Start();
+ }
+ });
- send_transport.StopSending();
- recv_transport.StopSending();
+ analyzer->Wait();
- for (std::unique_ptr<test::VideoCapturer>& video_caputurer :
- thumbnail_capturers_)
- video_caputurer->Stop();
- video_capturer_->Stop();
- for (VideoReceiveStream* thumbnail_receive_stream :
- thumbnail_receive_streams_)
- thumbnail_receive_stream->Stop();
- for (VideoReceiveStream* receive_stream : video_receive_streams_)
- receive_stream->Stop();
- for (VideoSendStream* thumbnail_send_stream : thumbnail_send_streams_)
- thumbnail_send_stream->Stop();
- video_send_stream_->Stop();
+ task_queue_.SendTask([&]() {
+ for (std::unique_ptr<test::VideoCapturer>& video_caputurer :
+ thumbnail_capturers_)
+ video_caputurer->Stop();
+ video_capturer_->Stop();
+ for (VideoReceiveStream* thumbnail_receive_stream :
+ thumbnail_receive_streams_)
+ thumbnail_receive_stream->Stop();
+ for (VideoReceiveStream* receive_stream : video_receive_streams_)
+ receive_stream->Stop();
+ for (VideoSendStream* thumbnail_send_stream : thumbnail_send_streams_)
+ thumbnail_send_stream->Stop();
+ video_send_stream_->Stop();
- DestroyStreams();
- DestroyThumbnailStreams();
+ DestroyStreams();
+ DestroyThumbnailStreams();
- event_log_->StopLogging();
- if (graph_data_output_file)
- fclose(graph_data_output_file);
+ event_log_->StopLogging();
+ if (graph_data_output_file)
+ fclose(graph_data_output_file);
+
+ video_capturer_.reset();
+ send_transport.reset();
+ recv_transport.reset();
+
+ DestroyCalls();
+ });
}
void VideoQualityTest::SetupAudio(int send_channel_id,
@@ -1942,162 +1964,177 @@
}
void VideoQualityTest::RunWithRenderers(const Params& params) {
- params_ = params;
- CheckParams();
-
- // TODO(ivica): Remove bitrate_config and use the default Call::Config(), to
- // match the full stack tests.
- Call::Config call_config(event_log_.get());
- call_config.bitrate_config = params_.call.call_bitrate_config;
-
+ std::unique_ptr<test::LayerFilteringTransport> send_transport;
+ std::unique_ptr<test::DirectTransport> recv_transport;
::VoiceEngineState voe;
- rtc::scoped_refptr<webrtc::AudioProcessing> audio_processing(
- webrtc::AudioProcessing::Create());
-
- if (params_.audio.enabled) {
- CreateVoiceEngine(&voe, audio_processing.get(), decoder_factory_);
- AudioState::Config audio_state_config;
- audio_state_config.voice_engine = voe.voice_engine;
- audio_state_config.audio_mixer = AudioMixerImpl::Create();
- audio_state_config.audio_processing = audio_processing;
- call_config.audio_state = AudioState::Create(audio_state_config);
- }
-
- CreateCalls(call_config, call_config);
-
- // TODO(minyue): consider if this is a good transport even for audio only
- // calls.
- test::LayerFilteringTransport send_transport(
- params.pipe, sender_call_.get(), kPayloadTypeVP8, kPayloadTypeVP9,
- params.video.selected_tl, params_.ss.selected_sl, payload_type_map_);
-
- test::DirectTransport recv_transport(params_.pipe, receiver_call_.get(),
- payload_type_map_);
-
- // TODO(ivica): Use two calls to be able to merge with RunWithAnalyzer or at
- // least share as much code as possible. That way this test would also match
- // the full stack tests better.
- send_transport.SetReceiver(receiver_call_->Receiver());
- recv_transport.SetReceiver(sender_call_->Receiver());
-
std::unique_ptr<test::VideoRenderer> local_preview;
- std::vector<std::unique_ptr<test::VideoRenderer>> loopback_renderers_;
- if (params_.video.enabled) {
- // Create video renderers.
- local_preview.reset(test::VideoRenderer::Create(
- "Local Preview", params_.video.width, params_.video.height));
-
- const size_t selected_stream_id = params_.ss.selected_stream;
- const size_t num_streams = params_.ss.streams.size();
-
- if (selected_stream_id == num_streams) {
- for (size_t stream_id = 0; stream_id < num_streams; ++stream_id) {
- std::ostringstream oss;
- oss << "Loopback Video - Stream #" << static_cast<int>(stream_id);
- loopback_renderers_.emplace_back(test::VideoRenderer::Create(
- oss.str().c_str(), params_.ss.streams[stream_id].width,
- params_.ss.streams[stream_id].height));
- }
- } else {
- loopback_renderers_.emplace_back(test::VideoRenderer::Create(
- "Loopback Video", params_.ss.streams[selected_stream_id].width,
- params_.ss.streams[selected_stream_id].height));
- }
-
- SetupVideo(&send_transport, &recv_transport);
-
- video_send_config_.pre_encode_callback = local_preview.get();
- if (selected_stream_id == num_streams) {
- for (size_t stream_id = 0; stream_id < num_streams; ++stream_id) {
- video_receive_configs_[stream_id].renderer =
- loopback_renderers_[stream_id].get();
- if (params_.audio.enabled && params_.audio.sync_video)
- video_receive_configs_[stream_id].sync_group = kSyncGroup;
- }
- } else {
- video_receive_configs_[selected_stream_id].renderer =
- loopback_renderers_.back().get();
- if (params_.audio.enabled && params_.audio.sync_video)
- video_receive_configs_[selected_stream_id].sync_group = kSyncGroup;
- }
-
- if (params_.screenshare.enabled)
- SetupScreenshareOrSVC();
-
- CreateFlexfecStreams();
- CreateVideoStreams();
-
- CreateCapturer();
- video_send_stream_->SetSource(video_capturer_.get(),
- degradation_preference_);
- }
-
+ std::vector<std::unique_ptr<test::VideoRenderer>> loopback_renderers;
AudioReceiveStream* audio_receive_stream = nullptr;
- if (params_.audio.enabled) {
- SetupAudio(voe.send_channel_id, voe.receive_channel_id, &send_transport,
- &audio_receive_stream);
- }
- for (VideoReceiveStream* receive_stream : video_receive_streams_)
- StartEncodedFrameLogs(receive_stream);
- StartEncodedFrameLogs(video_send_stream_);
+ task_queue_.SendTask([&]() {
+ params_ = params;
+ CheckParams();
- // Start sending and receiving video.
- if (params_.video.enabled) {
- for (VideoReceiveStream* video_receive_stream : video_receive_streams_)
- video_receive_stream->Start();
+ // TODO(ivica): Remove bitrate_config and use the default Call::Config(), to
+ // match the full stack tests.
+ Call::Config call_config(event_log_.get());
+ call_config.bitrate_config = params_.call.call_bitrate_config;
- video_send_stream_->Start();
- video_capturer_->Start();
- }
+ rtc::scoped_refptr<webrtc::AudioProcessing> audio_processing(
+ webrtc::AudioProcessing::Create());
- if (params_.audio.enabled) {
- // Start receiving audio.
- audio_receive_stream->Start();
- EXPECT_EQ(0, voe.base->StartPlayout(voe.receive_channel_id));
+ if (params_.audio.enabled) {
+ CreateVoiceEngine(&voe, audio_processing.get(), decoder_factory_);
+ AudioState::Config audio_state_config;
+ audio_state_config.voice_engine = voe.voice_engine;
+ audio_state_config.audio_mixer = AudioMixerImpl::Create();
+ audio_state_config.audio_processing = audio_processing;
+ call_config.audio_state = AudioState::Create(audio_state_config);
+ }
- // Start sending audio.
- audio_send_stream_->Start();
- EXPECT_EQ(0, voe.base->StartSend(voe.send_channel_id));
- }
+ CreateCalls(call_config, call_config);
+
+ // TODO(minyue): consider if this is a good transport even for audio only
+ // calls.
+ send_transport = rtc::MakeUnique<test::LayerFilteringTransport>(
+ &task_queue_, params.pipe, sender_call_.get(), kPayloadTypeVP8,
+ kPayloadTypeVP9, params.video.selected_tl, params_.ss.selected_sl,
+ payload_type_map_);
+
+ recv_transport = rtc::MakeUnique<test::DirectTransport>(
+ &task_queue_, params_.pipe, receiver_call_.get(), payload_type_map_);
+
+ // TODO(ivica): Use two calls to be able to merge with RunWithAnalyzer or at
+ // least share as much code as possible. That way this test would also match
+ // the full stack tests better.
+ send_transport->SetReceiver(receiver_call_->Receiver());
+ recv_transport->SetReceiver(sender_call_->Receiver());
+
+ if (params_.video.enabled) {
+ // Create video renderers.
+ local_preview.reset(test::VideoRenderer::Create(
+ "Local Preview", params_.video.width, params_.video.height));
+
+ const size_t selected_stream_id = params_.ss.selected_stream;
+ const size_t num_streams = params_.ss.streams.size();
+
+ if (selected_stream_id == num_streams) {
+ for (size_t stream_id = 0; stream_id < num_streams; ++stream_id) {
+ std::ostringstream oss;
+ oss << "Loopback Video - Stream #" << static_cast<int>(stream_id);
+ loopback_renderers.emplace_back(test::VideoRenderer::Create(
+ oss.str().c_str(), params_.ss.streams[stream_id].width,
+ params_.ss.streams[stream_id].height));
+ }
+ } else {
+ loopback_renderers.emplace_back(test::VideoRenderer::Create(
+ "Loopback Video", params_.ss.streams[selected_stream_id].width,
+ params_.ss.streams[selected_stream_id].height));
+ }
+
+ SetupVideo(send_transport.get(), recv_transport.get());
+
+ video_send_config_.pre_encode_callback = local_preview.get();
+ if (selected_stream_id == num_streams) {
+ for (size_t stream_id = 0; stream_id < num_streams; ++stream_id) {
+ video_receive_configs_[stream_id].renderer =
+ loopback_renderers[stream_id].get();
+ if (params_.audio.enabled && params_.audio.sync_video)
+ video_receive_configs_[stream_id].sync_group = kSyncGroup;
+ }
+ } else {
+ video_receive_configs_[selected_stream_id].renderer =
+ loopback_renderers.back().get();
+ if (params_.audio.enabled && params_.audio.sync_video)
+ video_receive_configs_[selected_stream_id].sync_group = kSyncGroup;
+ }
+
+ if (params_.screenshare.enabled)
+ SetupScreenshareOrSVC();
+
+ CreateFlexfecStreams();
+ CreateVideoStreams();
+
+ CreateCapturer();
+ video_send_stream_->SetSource(video_capturer_.get(),
+ degradation_preference_);
+ }
+
+ if (params_.audio.enabled) {
+ SetupAudio(voe.send_channel_id, voe.receive_channel_id,
+ send_transport.get(), &audio_receive_stream);
+ }
+
+ for (VideoReceiveStream* receive_stream : video_receive_streams_)
+ StartEncodedFrameLogs(receive_stream);
+ StartEncodedFrameLogs(video_send_stream_);
+
+ // Start sending and receiving video.
+ if (params_.video.enabled) {
+ for (VideoReceiveStream* video_receive_stream : video_receive_streams_)
+ video_receive_stream->Start();
+
+ video_send_stream_->Start();
+ video_capturer_->Start();
+ }
+
+ if (params_.audio.enabled) {
+ // Start receiving audio.
+ audio_receive_stream->Start();
+ EXPECT_EQ(0, voe.base->StartPlayout(voe.receive_channel_id));
+
+ // Start sending audio.
+ audio_send_stream_->Start();
+ EXPECT_EQ(0, voe.base->StartSend(voe.send_channel_id));
+ }
+ });
test::PressEnterToContinue();
- if (params_.audio.enabled) {
- // Stop sending audio.
- EXPECT_EQ(0, voe.base->StopSend(voe.send_channel_id));
- audio_send_stream_->Stop();
+ task_queue_.SendTask([&]() {
+ if (params_.audio.enabled) {
+ // Stop sending audio.
+ EXPECT_EQ(0, voe.base->StopSend(voe.send_channel_id));
+ audio_send_stream_->Stop();
- // Stop receiving audio.
- EXPECT_EQ(0, voe.base->StopPlayout(voe.receive_channel_id));
- audio_receive_stream->Stop();
- sender_call_->DestroyAudioSendStream(audio_send_stream_);
- receiver_call_->DestroyAudioReceiveStream(audio_receive_stream);
- }
+ // Stop receiving audio.
+ EXPECT_EQ(0, voe.base->StopPlayout(voe.receive_channel_id));
+ audio_receive_stream->Stop();
+ sender_call_->DestroyAudioSendStream(audio_send_stream_);
+ receiver_call_->DestroyAudioReceiveStream(audio_receive_stream);
+ }
- // Stop receiving and sending video.
- if (params_.video.enabled) {
- video_capturer_->Stop();
- video_send_stream_->Stop();
- for (FlexfecReceiveStream* flexfec_receive_stream :
- flexfec_receive_streams_) {
- for (VideoReceiveStream* video_receive_stream : video_receive_streams_) {
- video_receive_stream->RemoveSecondarySink(flexfec_receive_stream);
+ // Stop receiving and sending video.
+ if (params_.video.enabled) {
+ video_capturer_->Stop();
+ video_send_stream_->Stop();
+ for (FlexfecReceiveStream* flexfec_receive_stream :
+ flexfec_receive_streams_) {
+ for (VideoReceiveStream* video_receive_stream :
+ video_receive_streams_) {
+ video_receive_stream->RemoveSecondarySink(flexfec_receive_stream);
+ }
+ receiver_call_->DestroyFlexfecReceiveStream(flexfec_receive_stream);
}
- receiver_call_->DestroyFlexfecReceiveStream(flexfec_receive_stream);
+ for (VideoReceiveStream* receive_stream : video_receive_streams_) {
+ receive_stream->Stop();
+ receiver_call_->DestroyVideoReceiveStream(receive_stream);
+ }
+ sender_call_->DestroyVideoSendStream(video_send_stream_);
}
- for (VideoReceiveStream* receive_stream : video_receive_streams_) {
- receive_stream->Stop();
- receiver_call_->DestroyVideoReceiveStream(receive_stream);
- }
- sender_call_->DestroyVideoSendStream(video_send_stream_);
- }
- send_transport.StopSending();
- recv_transport.StopSending();
+ video_capturer_.reset();
+ send_transport.reset();
+ recv_transport.reset();
- if (params_.audio.enabled)
- DestroyVoiceEngine(&voe);
+ if (params_.audio.enabled)
+ DestroyVoiceEngine(&voe);
+
+ local_preview.reset();
+ loopback_renderers.clear();
+
+ DestroyCalls();
+ });
}
void VideoQualityTest::StartEncodedFrameLogs(VideoSendStream* stream) {
diff --git a/webrtc/video/video_send_stream_tests.cc b/webrtc/video/video_send_stream_tests.cc
index 0d5ba7d..9db0b65 100644
--- a/webrtc/video/video_send_stream_tests.cc
+++ b/webrtc/video/video_send_stream_tests.cc
@@ -67,25 +67,31 @@
};
TEST_F(VideoSendStreamTest, CanStartStartedStream) {
- CreateSenderCall(Call::Config(event_log_.get()));
+ task_queue_.SendTask([this]() {
+ CreateSenderCall(Call::Config(event_log_.get()));
- test::NullTransport transport;
- CreateSendConfig(1, 0, 0, &transport);
- CreateVideoStreams();
- video_send_stream_->Start();
- video_send_stream_->Start();
- DestroyStreams();
+ test::NullTransport transport;
+ CreateSendConfig(1, 0, 0, &transport);
+ CreateVideoStreams();
+ video_send_stream_->Start();
+ video_send_stream_->Start();
+ DestroyStreams();
+ DestroyCalls();
+ });
}
TEST_F(VideoSendStreamTest, CanStopStoppedStream) {
- CreateSenderCall(Call::Config(event_log_.get()));
+ task_queue_.SendTask([this]() {
+ CreateSenderCall(Call::Config(event_log_.get()));
- test::NullTransport transport;
- CreateSendConfig(1, 0, 0, &transport);
- CreateVideoStreams();
- video_send_stream_->Stop();
- video_send_stream_->Stop();
- DestroyStreams();
+ test::NullTransport transport;
+ CreateSendConfig(1, 0, 0, &transport);
+ CreateVideoStreams();
+ video_send_stream_->Stop();
+ video_send_stream_->Stop();
+ DestroyStreams();
+ DestroyCalls();
+ });
}
TEST_F(VideoSendStreamTest, SupportsCName) {
@@ -480,7 +486,9 @@
return SEND_PACKET;
}
- test::PacketTransport* CreateSendTransport(Call* sender_call) override {
+ test::PacketTransport* CreateSendTransport(
+ test::SingleThreadedTaskQueueForTesting* task_queue,
+ Call* sender_call) override {
// At low RTT (< kLowRttNackMs) -> NACK only, no FEC.
// Configure some network delay.
const int kNetworkDelayMs = 100;
@@ -488,7 +496,7 @@
config.loss_percent = 5;
config.queue_delay_ms = kNetworkDelayMs;
return new test::PacketTransport(
- sender_call, this, test::PacketTransport::kSender,
+ task_queue, sender_call, this, test::PacketTransport::kSender,
VideoSendStreamTest::payload_type_map_, config);
}
@@ -636,7 +644,9 @@
return SEND_PACKET;
}
- test::PacketTransport* CreateSendTransport(Call* sender_call) override {
+ test::PacketTransport* CreateSendTransport(
+ test::SingleThreadedTaskQueueForTesting* task_queue,
+ Call* sender_call) override {
// At low RTT (< kLowRttNackMs) -> NACK only, no FEC.
// Therefore we need some network delay.
const int kNetworkDelayMs = 100;
@@ -644,7 +654,7 @@
config.loss_percent = 5;
config.queue_delay_ms = kNetworkDelayMs;
return new test::PacketTransport(
- sender_call, this, test::PacketTransport::kSender,
+ task_queue, sender_call, this, test::PacketTransport::kSender,
VideoSendStreamTest::payload_type_map_, config);
}
@@ -1337,13 +1347,15 @@
return SEND_PACKET;
}
- test::PacketTransport* CreateSendTransport(Call* sender_call) override {
+ test::PacketTransport* CreateSendTransport(
+ test::SingleThreadedTaskQueueForTesting* task_queue,
+ Call* sender_call) override {
const int kNetworkDelayMs = 50;
FakeNetworkPipe::Config config;
config.loss_percent = 10;
config.link_capacity_kbps = kCapacityKbps;
config.queue_delay_ms = kNetworkDelayMs;
- return new test::PacketTransport(sender_call, this,
+ return new test::PacketTransport(task_queue, sender_call, this,
test::PacketTransport::kSender,
payload_type_map_, config);
}
@@ -1476,8 +1488,11 @@
static const uint8_t kExtensionId = test::kTransportSequenceNumberExtensionId;
class ChangingNetworkRouteTest : public test::EndToEndTest {
public:
- ChangingNetworkRouteTest()
- : EndToEndTest(test::CallTest::kDefaultTimeoutMs), call_(nullptr) {
+ explicit ChangingNetworkRouteTest(
+ test::SingleThreadedTaskQueueForTesting* task_queue)
+ : EndToEndTest(test::CallTest::kDefaultTimeoutMs),
+ task_queue_(task_queue),
+ call_(nullptr) {
EXPECT_TRUE(parser_->RegisterRtpHeaderExtension(
kRtpExtensionTransportSequenceNumber, kExtensionId));
}
@@ -1518,26 +1533,33 @@
void PerformTest() override {
rtc::NetworkRoute new_route(true, 10, 20, -1);
- call_->OnNetworkRouteChanged("transport", new_route);
Call::Config::BitrateConfig bitrate_config;
- bitrate_config.start_bitrate_bps = kStartBitrateBps;
- call_->SetBitrateConfig(bitrate_config);
+
+ task_queue_->SendTask([this, &new_route, &bitrate_config]() {
+ call_->OnNetworkRouteChanged("transport", new_route);
+ bitrate_config.start_bitrate_bps = kStartBitrateBps;
+ call_->SetBitrateConfig(bitrate_config);
+ });
+
EXPECT_TRUE(Wait())
<< "Timed out while waiting for start bitrate to be exceeded.";
- bitrate_config.start_bitrate_bps = -1;
- bitrate_config.max_bitrate_bps = kNewMaxBitrateBps;
- call_->SetBitrateConfig(bitrate_config);
- // TODO(holmer): We should set the last sent packet id here and verify
- // that we correctly ignore any packet loss reported prior to that id.
- ++new_route.local_network_id;
- call_->OnNetworkRouteChanged("transport", new_route);
- EXPECT_GE(call_->GetStats().send_bandwidth_bps, kStartBitrateBps);
+ task_queue_->SendTask([this, &new_route, &bitrate_config]() {
+ bitrate_config.start_bitrate_bps = -1;
+ bitrate_config.max_bitrate_bps = kNewMaxBitrateBps;
+ call_->SetBitrateConfig(bitrate_config);
+ // TODO(holmer): We should set the last sent packet id here and verify
+ // that we correctly ignore any packet loss reported prior to that id.
+ ++new_route.local_network_id;
+ call_->OnNetworkRouteChanged("transport", new_route);
+ EXPECT_GE(call_->GetStats().send_bandwidth_bps, kStartBitrateBps);
+ });
}
private:
+ test::SingleThreadedTaskQueueForTesting* const task_queue_;
Call* call_;
- } test;
+ } test(&task_queue_);
RunBaseTest(&test);
}
@@ -1545,8 +1567,10 @@
TEST_F(VideoSendStreamTest, ChangingTransportOverhead) {
class ChangingTransportOverheadTest : public test::EndToEndTest {
public:
- ChangingTransportOverheadTest()
+ explicit ChangingTransportOverheadTest(
+ test::SingleThreadedTaskQueueForTesting* task_queue)
: EndToEndTest(test::CallTest::kDefaultTimeoutMs),
+ task_queue_(task_queue),
call_(nullptr),
packets_sent_(0),
transport_overhead_(0) {}
@@ -1572,27 +1596,36 @@
}
void PerformTest() override {
- transport_overhead_ = 100;
- call_->OnTransportOverheadChanged(webrtc::MediaType::VIDEO,
- transport_overhead_);
+ task_queue_->SendTask([this]() {
+ transport_overhead_ = 100;
+ call_->OnTransportOverheadChanged(webrtc::MediaType::VIDEO,
+ transport_overhead_);
+ });
+
EXPECT_TRUE(Wait());
+
{
rtc::CritScope cs(&lock_);
packets_sent_ = 0;
}
- transport_overhead_ = 500;
- call_->OnTransportOverheadChanged(webrtc::MediaType::VIDEO,
- transport_overhead_);
+
+ task_queue_->SendTask([this]() {
+ transport_overhead_ = 500;
+ call_->OnTransportOverheadChanged(webrtc::MediaType::VIDEO,
+ transport_overhead_);
+ });
+
EXPECT_TRUE(Wait());
}
private:
+ test::SingleThreadedTaskQueueForTesting* const task_queue_;
Call* call_;
rtc::CriticalSection lock_;
int packets_sent_ GUARDED_BY(lock_);
int transport_overhead_;
const size_t kMaxRtpPacketSize = 1000;
- } test;
+ } test(&task_queue_);
RunBaseTest(&test);
}
@@ -1716,16 +1749,18 @@
// Function for removing and recreating the send stream with a new config.
auto reset_fun = [this](const VideoSendStream::Config& send_stream_config,
const VideoEncoderConfig& encoder_config) {
- Stop();
- sender_call_->DestroyVideoSendStream(video_send_stream_);
- video_send_config_ = send_stream_config.Copy();
- video_encoder_config_ = encoder_config.Copy();
- video_send_stream_ = sender_call_->CreateVideoSendStream(
- video_send_config_.Copy(), video_encoder_config_.Copy());
- video_send_stream_->SetSource(
- frame_generator_capturer_.get(),
- VideoSendStream::DegradationPreference::kMaintainResolution);
- Start();
+ task_queue_.SendTask([this, &send_stream_config, &encoder_config]() {
+ Stop();
+ sender_call_->DestroyVideoSendStream(video_send_stream_);
+ video_send_config_ = send_stream_config.Copy();
+ video_encoder_config_ = encoder_config.Copy();
+ video_send_stream_ = sender_call_->CreateVideoSendStream(
+ video_send_config_.Copy(), video_encoder_config_.Copy());
+ video_send_stream_->SetSource(
+ frame_generator_capturer_.get(),
+ VideoSendStream::DegradationPreference::kMaintainResolution);
+ Start();
+ });
};
MaxPaddingSetTest<decltype(reset_fun)> test(true, &reset_fun);
RunBaseTest(&test);
@@ -1791,21 +1826,32 @@
int last_initialized_frame_height_ GUARDED_BY(&crit_);
};
- CreateSenderCall(Call::Config(event_log_.get()));
test::NullTransport transport;
- CreateSendConfig(1, 0, 0, &transport);
EncoderObserver encoder;
- video_send_config_.encoder_settings.encoder = &encoder;
- CreateVideoStreams();
- CreateFrameGeneratorCapturer(kDefaultFramerate, kDefaultWidth,
- kDefaultHeight);
- frame_generator_capturer_->Start();
+
+ task_queue_.SendTask([this, &transport, &encoder]() {
+ CreateSenderCall(Call::Config(event_log_.get()));
+ CreateSendConfig(1, 0, 0, &transport);
+ video_send_config_.encoder_settings.encoder = &encoder;
+ CreateVideoStreams();
+ CreateFrameGeneratorCapturer(kDefaultFramerate, kDefaultWidth,
+ kDefaultHeight);
+ frame_generator_capturer_->Start();
+ });
encoder.WaitForResolution(kDefaultWidth, kDefaultHeight);
- frame_generator_capturer_->ChangeResolution(kDefaultWidth * 2,
- kDefaultHeight * 2);
+
+ task_queue_.SendTask([this]() {
+ frame_generator_capturer_->ChangeResolution(kDefaultWidth * 2,
+ kDefaultHeight * 2);
+ });
+
encoder.WaitForResolution(kDefaultWidth * 2, kDefaultHeight * 2);
- DestroyStreams();
+
+ task_queue_.SendTask([this]() {
+ DestroyStreams();
+ DestroyCalls();
+ });
}
TEST_F(VideoSendStreamTest, CanReconfigureToUseStartBitrateAbovePreviousMax) {
@@ -1937,31 +1983,42 @@
rtc::Optional<int> bitrate_kbps_ GUARDED_BY(crit_);
};
- CreateSenderCall(Call::Config(event_log_.get()));
-
test::NullTransport transport;
- CreateSendConfig(1, 0, 0, &transport);
-
- sender_call_->SignalChannelNetworkState(MediaType::VIDEO, kNetworkUp);
-
StartStopBitrateObserver encoder;
- video_send_config_.encoder_settings.encoder = &encoder;
- video_send_config_.encoder_settings.internal_source = true;
- CreateVideoStreams();
+ task_queue_.SendTask([this, &transport, &encoder]() {
+ CreateSenderCall(Call::Config(event_log_.get()));
+ CreateSendConfig(1, 0, 0, &transport);
+
+ sender_call_->SignalChannelNetworkState(MediaType::VIDEO, kNetworkUp);
+
+ video_send_config_.encoder_settings.encoder = &encoder;
+ video_send_config_.encoder_settings.internal_source = true;
+
+ CreateVideoStreams();
+ });
EXPECT_TRUE(encoder.WaitForEncoderInit());
- video_send_stream_->Start();
+ task_queue_.SendTask([this]() {
+ video_send_stream_->Start();
+ });
EXPECT_TRUE(encoder.WaitBitrateChanged(true));
- video_send_stream_->Stop();
+ task_queue_.SendTask([this]() {
+ video_send_stream_->Stop();
+ });
EXPECT_TRUE(encoder.WaitBitrateChanged(false));
- video_send_stream_->Start();
+ task_queue_.SendTask([this]() {
+ video_send_stream_->Start();
+ });
EXPECT_TRUE(encoder.WaitBitrateChanged(true));
- DestroyStreams();
+ task_queue_.SendTask([this]() {
+ DestroyStreams();
+ DestroyCalls();
+ });
}
TEST_F(VideoSendStreamTest, CapturesTextureAndVideoFrames) {
@@ -1992,49 +2049,55 @@
rtc::Event output_frame_event_;
};
- // Initialize send stream.
- CreateSenderCall(Call::Config(event_log_.get()));
-
test::NullTransport transport;
- CreateSendConfig(1, 0, 0, &transport);
FrameObserver observer;
- video_send_config_.pre_encode_callback = &observer;
- CreateVideoStreams();
-
- // Prepare five input frames. Send ordinary VideoFrame and texture frames
- // alternatively.
std::vector<VideoFrame> input_frames;
- int width = 168;
- int height = 132;
- input_frames.push_back(test::FakeNativeBuffer::CreateFrame(
- width, height, 1, 1, kVideoRotation_0));
- input_frames.push_back(test::FakeNativeBuffer::CreateFrame(
- width, height, 2, 2, kVideoRotation_0));
- input_frames.push_back(CreateVideoFrame(width, height, 3));
- input_frames.push_back(CreateVideoFrame(width, height, 4));
- input_frames.push_back(test::FakeNativeBuffer::CreateFrame(
- width, height, 5, 5, kVideoRotation_0));
+ task_queue_.SendTask([this, &transport, &observer, &input_frames]() {
+ // Initialize send stream.
+ CreateSenderCall(Call::Config(event_log_.get()));
- video_send_stream_->Start();
- test::FrameForwarder forwarder;
- video_send_stream_->SetSource(
- &forwarder, VideoSendStream::DegradationPreference::kMaintainFramerate);
- for (size_t i = 0; i < input_frames.size(); i++) {
- forwarder.IncomingCapturedFrame(input_frames[i]);
- // Wait until the output frame is received before sending the next input
- // frame. Or the previous input frame may be replaced without delivering.
- observer.WaitOutputFrame();
- }
- video_send_stream_->Stop();
- video_send_stream_->SetSource(
- nullptr, VideoSendStream::DegradationPreference::kMaintainFramerate);
+ CreateSendConfig(1, 0, 0, &transport);
+ video_send_config_.pre_encode_callback = &observer;
+ CreateVideoStreams();
+
+ // Prepare five input frames. Send ordinary VideoFrame and texture frames
+ // alternatively.
+ int width = 168;
+ int height = 132;
+
+ input_frames.push_back(test::FakeNativeBuffer::CreateFrame(
+ width, height, 1, 1, kVideoRotation_0));
+ input_frames.push_back(test::FakeNativeBuffer::CreateFrame(
+ width, height, 2, 2, kVideoRotation_0));
+ input_frames.push_back(CreateVideoFrame(width, height, 3));
+ input_frames.push_back(CreateVideoFrame(width, height, 4));
+ input_frames.push_back(test::FakeNativeBuffer::CreateFrame(
+ width, height, 5, 5, kVideoRotation_0));
+
+ video_send_stream_->Start();
+ test::FrameForwarder forwarder;
+ video_send_stream_->SetSource(
+ &forwarder, VideoSendStream::DegradationPreference::kMaintainFramerate);
+ for (size_t i = 0; i < input_frames.size(); i++) {
+ forwarder.IncomingCapturedFrame(input_frames[i]);
+ // Wait until the output frame is received before sending the next input
+ // frame. Or the previous input frame may be replaced without delivering.
+ observer.WaitOutputFrame();
+ }
+ video_send_stream_->Stop();
+ video_send_stream_->SetSource(
+ nullptr, VideoSendStream::DegradationPreference::kMaintainFramerate);
+ });
// Test if the input and output frames are the same. render_time_ms and
// timestamp are not compared because capturer sets those values.
ExpectEqualFramesVector(input_frames, observer.output_frames());
- DestroyStreams();
+ task_queue_.SendTask([this]() {
+ DestroyStreams();
+ DestroyCalls();
+ });
}
void ExpectEqualFramesVector(const std::vector<VideoFrame>& frames1,
@@ -2060,8 +2123,10 @@
TEST_F(VideoSendStreamTest, EncoderIsProperlyInitializedAndDestroyed) {
class EncoderStateObserver : public test::SendTest, public VideoEncoder {
public:
- EncoderStateObserver()
+ explicit EncoderStateObserver(
+ test::SingleThreadedTaskQueueForTesting* task_queue)
: SendTest(kDefaultTimeoutMs),
+ task_queue_(task_queue),
stream_(nullptr),
initialized_(false),
callback_registered_(false),
@@ -2148,18 +2213,23 @@
void PerformTest() override {
EXPECT_TRUE(Wait()) << "Timed out while waiting for Encode.";
- EXPECT_EQ(0u, num_releases());
- stream_->ReconfigureVideoEncoder(std::move(encoder_config_));
- EXPECT_EQ(0u, num_releases());
- stream_->Stop();
- // Encoder should not be released before destroying the VideoSendStream.
- EXPECT_FALSE(IsReleased());
- EXPECT_TRUE(IsReadyForEncode());
- stream_->Start();
+
+ task_queue_->SendTask([this]() {
+ EXPECT_EQ(0u, num_releases());
+ stream_->ReconfigureVideoEncoder(std::move(encoder_config_));
+ EXPECT_EQ(0u, num_releases());
+ stream_->Stop();
+ // Encoder should not be released before destroying the VideoSendStream.
+ EXPECT_FALSE(IsReleased());
+ EXPECT_TRUE(IsReadyForEncode());
+ stream_->Start();
+ });
+
// Sanity check, make sure we still encode frames with this encoder.
EXPECT_TRUE(Wait()) << "Timed out while waiting for Encode.";
}
+ test::SingleThreadedTaskQueueForTesting* const task_queue_;
rtc::CriticalSection crit_;
VideoSendStream* stream_;
bool initialized_ GUARDED_BY(crit_);
@@ -2167,7 +2237,7 @@
size_t num_releases_ GUARDED_BY(crit_);
bool released_ GUARDED_BY(crit_);
VideoEncoderConfig encoder_config_;
- } test_encoder;
+ } test_encoder(&task_queue_);
RunBaseTest(&test_encoder);
@@ -2553,9 +2623,11 @@
class EncoderBitrateThresholdObserver : public test::SendTest,
public test::FakeEncoder {
public:
- EncoderBitrateThresholdObserver()
+ explicit EncoderBitrateThresholdObserver(
+ test::SingleThreadedTaskQueueForTesting* task_queue)
: SendTest(kDefaultTimeoutMs),
FakeEncoder(Clock::GetRealTimeClock()),
+ task_queue_(task_queue),
init_encode_event_(false, false),
bitrate_changed_event_(false, false),
target_bitrate_(0),
@@ -2680,7 +2752,9 @@
Call::Config::BitrateConfig bitrate_config;
bitrate_config.start_bitrate_bps = kIncreasedStartBitrateKbps * 1000;
bitrate_config.max_bitrate_bps = kIncreasedMaxBitrateKbps * 1000;
- call_->SetBitrateConfig(bitrate_config);
+ task_queue_->SendTask([this, &bitrate_config]() {
+ call_->SetBitrateConfig(bitrate_config);
+ });
// Encoder rate is capped by EncoderConfig max_bitrate_bps.
WaitForSetRates(kMaxBitrateKbps);
encoder_config_.max_bitrate_bps = kLowerMaxBitrateKbps * 1000;
@@ -2702,6 +2776,7 @@
WaitForSetRates(kIncreasedStartBitrateKbps);
}
+ test::SingleThreadedTaskQueueForTesting* const task_queue_;
rtc::Event init_encode_event_;
rtc::Event bitrate_changed_event_;
rtc::CriticalSection crit_;
@@ -2711,7 +2786,7 @@
webrtc::Call* call_;
webrtc::VideoSendStream* send_stream_;
webrtc::VideoEncoderConfig encoder_config_;
- } test;
+ } test(&task_queue_);
RunBaseTest(&test);
}
@@ -3302,9 +3377,11 @@
class RemoveOverheadFromBandwidthTest : public test::EndToEndTest,
public test::FakeEncoder {
public:
- RemoveOverheadFromBandwidthTest()
+ explicit RemoveOverheadFromBandwidthTest(
+ test::SingleThreadedTaskQueueForTesting* task_queue)
: EndToEndTest(test::CallTest::kDefaultTimeoutMs),
FakeEncoder(Clock::GetRealTimeClock()),
+ task_queue_(task_queue),
call_(nullptr),
max_bitrate_bps_(0),
first_packet_sent_(false),
@@ -3349,8 +3426,10 @@
bitrate_config.start_bitrate_bps = kStartBitrateBps;
bitrate_config.max_bitrate_bps = kMaxBitrateBps;
bitrate_config.min_bitrate_bps = kMinBitrateBps;
- call_->SetBitrateConfig(bitrate_config);
- call_->OnTransportOverheadChanged(webrtc::MediaType::VIDEO, 40);
+ task_queue_->SendTask([this, &bitrate_config]() {
+ call_->SetBitrateConfig(bitrate_config);
+ call_->OnTransportOverheadChanged(webrtc::MediaType::VIDEO, 40);
+ });
// At a bitrate of 60kbps with a packet size of 1200B video and an
// overhead of 40B per packet video produces 2240bps overhead.
@@ -3363,13 +3442,13 @@
}
private:
+ test::SingleThreadedTaskQueueForTesting* const task_queue_;
Call* call_;
rtc::CriticalSection crit_;
uint32_t max_bitrate_bps_ GUARDED_BY(&crit_);
bool first_packet_sent_ GUARDED_BY(&crit_);
rtc::Event bitrate_changed_event_;
- } test;
-
+ } test(&task_queue_);
RunBaseTest(&test);
}