Use SingleThreadedTaskQueue in DirectTransport
DirectTransport has so far used its own thread, which led to a different threading-model for in the unit-tests than is used in actual WebRTC. Because of that, some critical-sections that weren't truly necessary in WebRTC could not be replaced with thread-checks, because those checks failed in unit-tests.
This CL introduces SingleThreadedTaskQueue - a TaskQueue which guarantees to run all of its tasks on the same thread (rtc::TaskQueue doesn't guarantee that on Mac) - and uses that for DirectTransport. CLs based on top of this will uncomment thread-checks which had to be commented out before, and remove unnecessary critical-sections.
Future work would probably replace the thread-checkers by more sophisticated serialized-access checks, allowing us to move from the SingleThreadedTaskQueue to a normal TaskQueue.
Related implementation notes:
* This CL has made DirectTransport::StopSending() superfluous, and so it was deleted.
BUG=webrtc:8113, webrtc:7405, webrtc:8056, webrtc:8116
Review-Url: https://codereview.webrtc.org/2998923002
Cr-Commit-Position: refs/heads/master@{#19445}
diff --git a/webrtc/test/BUILD.gn b/webrtc/test/BUILD.gn
index 2db6120..f6baa27 100644
--- a/webrtc/test/BUILD.gn
+++ b/webrtc/test/BUILD.gn
@@ -287,6 +287,7 @@
"frame_generator_unittest.cc",
"rtp_file_reader_unittest.cc",
"rtp_file_writer_unittest.cc",
+ "single_threaded_task_queue_unittest.cc",
"testsupport/always_passing_unittest.cc",
"testsupport/metrics/video_metrics_unittest.cc",
"testsupport/packet_reader_unittest.cc",
@@ -401,8 +402,23 @@
"../call",
"../modules/rtp_rtcp",
"../rtc_base:rtc_base_approved",
+ "../rtc_base:sequenced_task_checker",
"../system_wrappers",
]
+ public_deps = [
+ ":single_threaded_task_queue",
+ ]
+}
+
+rtc_source_set("single_threaded_task_queue") {
+ testonly = true
+ sources = [
+ "single_threaded_task_queue.cc",
+ "single_threaded_task_queue.h",
+ ]
+ deps = [
+ "../rtc_base:rtc_base_approved",
+ ]
}
rtc_source_set("fake_audio_device") {
diff --git a/webrtc/test/call_test.cc b/webrtc/test/call_test.cc
index 51816d8..3efc022 100644
--- a/webrtc/test/call_test.cc
+++ b/webrtc/test/call_test.cc
@@ -18,8 +18,11 @@
#include "webrtc/config.h"
#include "webrtc/modules/audio_mixer/audio_mixer_impl.h"
#include "webrtc/rtc_base/checks.h"
+#include "webrtc/rtc_base/event.h"
+#include "webrtc/rtc_base/ptr_util.h"
#include "webrtc/test/testsupport/fileutils.h"
#include "webrtc/voice_engine/include/voe_base.h"
+
namespace webrtc {
namespace test {
@@ -41,112 +44,124 @@
num_flexfec_streams_(0),
decoder_factory_(CreateBuiltinAudioDecoderFactory()),
encoder_factory_(CreateBuiltinAudioEncoderFactory()),
+ task_queue_("CallTestTaskQueue"),
fake_send_audio_device_(nullptr),
fake_recv_audio_device_(nullptr) {}
CallTest::~CallTest() {
+ task_queue_.SendTask([this]() {
+ fake_send_audio_device_.reset();
+ fake_recv_audio_device_.reset();
+ frame_generator_capturer_.reset();
+ });
}
void CallTest::RunBaseTest(BaseTest* test) {
- num_video_streams_ = test->GetNumVideoStreams();
- num_audio_streams_ = test->GetNumAudioStreams();
- num_flexfec_streams_ = test->GetNumFlexfecStreams();
- RTC_DCHECK(num_video_streams_ > 0 || num_audio_streams_ > 0);
- Call::Config send_config(test->GetSenderCallConfig());
- if (num_audio_streams_ > 0) {
- CreateFakeAudioDevices(test->CreateCapturer(), test->CreateRenderer());
- test->OnFakeAudioDevicesCreated(fake_send_audio_device_.get(),
- fake_recv_audio_device_.get());
- apm_send_ = AudioProcessing::Create();
- apm_recv_ = AudioProcessing::Create();
- CreateVoiceEngines();
- AudioState::Config audio_state_config;
- audio_state_config.voice_engine = voe_send_.voice_engine;
- audio_state_config.audio_mixer = AudioMixerImpl::Create();
- audio_state_config.audio_processing = apm_send_;
- send_config.audio_state = AudioState::Create(audio_state_config);
- }
- CreateSenderCall(send_config);
- if (sender_call_transport_controller_ != nullptr) {
- test->OnRtpTransportControllerSendCreated(
- sender_call_transport_controller_);
- }
- if (test->ShouldCreateReceivers()) {
- Call::Config recv_config(test->GetReceiverCallConfig());
+ task_queue_.SendTask([this, test]() {
+ num_video_streams_ = test->GetNumVideoStreams();
+ num_audio_streams_ = test->GetNumAudioStreams();
+ num_flexfec_streams_ = test->GetNumFlexfecStreams();
+ RTC_DCHECK(num_video_streams_ > 0 || num_audio_streams_ > 0);
+ Call::Config send_config(test->GetSenderCallConfig());
if (num_audio_streams_ > 0) {
+ CreateFakeAudioDevices(test->CreateCapturer(), test->CreateRenderer());
+ test->OnFakeAudioDevicesCreated(fake_send_audio_device_.get(),
+ fake_recv_audio_device_.get());
+ apm_send_ = AudioProcessing::Create();
+ apm_recv_ = AudioProcessing::Create();
+ CreateVoiceEngines();
AudioState::Config audio_state_config;
- audio_state_config.voice_engine = voe_recv_.voice_engine;
+ audio_state_config.voice_engine = voe_send_.voice_engine;
audio_state_config.audio_mixer = AudioMixerImpl::Create();
- audio_state_config.audio_processing = apm_recv_;
- recv_config.audio_state = AudioState::Create(audio_state_config);
+ audio_state_config.audio_processing = apm_send_;
+ send_config.audio_state = AudioState::Create(audio_state_config);
}
- CreateReceiverCall(recv_config);
- }
- test->OnCallsCreated(sender_call_.get(), receiver_call_.get());
- receive_transport_.reset(test->CreateReceiveTransport());
- send_transport_.reset(test->CreateSendTransport(sender_call_.get()));
+ CreateSenderCall(send_config);
+ if (sender_call_transport_controller_ != nullptr) {
+ test->OnRtpTransportControllerSendCreated(
+ sender_call_transport_controller_);
+ }
+ if (test->ShouldCreateReceivers()) {
+ Call::Config recv_config(test->GetReceiverCallConfig());
+ if (num_audio_streams_ > 0) {
+ AudioState::Config audio_state_config;
+ audio_state_config.voice_engine = voe_recv_.voice_engine;
+ audio_state_config.audio_mixer = AudioMixerImpl::Create();
+ audio_state_config.audio_processing = apm_recv_;
+ recv_config.audio_state = AudioState::Create(audio_state_config);
+ }
+ CreateReceiverCall(recv_config);
+ }
+ test->OnCallsCreated(sender_call_.get(), receiver_call_.get());
+ receive_transport_.reset(test->CreateReceiveTransport(&task_queue_));
+ send_transport_.reset(
+ test->CreateSendTransport(&task_queue_, sender_call_.get()));
- if (test->ShouldCreateReceivers()) {
- send_transport_->SetReceiver(receiver_call_->Receiver());
- receive_transport_->SetReceiver(sender_call_->Receiver());
- if (num_video_streams_ > 0)
- receiver_call_->SignalChannelNetworkState(MediaType::VIDEO, kNetworkUp);
- if (num_audio_streams_ > 0)
- receiver_call_->SignalChannelNetworkState(MediaType::AUDIO, kNetworkUp);
- } else {
- // Sender-only call delivers to itself.
- send_transport_->SetReceiver(sender_call_->Receiver());
- receive_transport_->SetReceiver(nullptr);
- }
+ if (test->ShouldCreateReceivers()) {
+ send_transport_->SetReceiver(receiver_call_->Receiver());
+ receive_transport_->SetReceiver(sender_call_->Receiver());
+ if (num_video_streams_ > 0)
+ receiver_call_->SignalChannelNetworkState(MediaType::VIDEO, kNetworkUp);
+ if (num_audio_streams_ > 0)
+ receiver_call_->SignalChannelNetworkState(MediaType::AUDIO, kNetworkUp);
+ } else {
+ // Sender-only call delivers to itself.
+ send_transport_->SetReceiver(sender_call_->Receiver());
+ receive_transport_->SetReceiver(nullptr);
+ }
- CreateSendConfig(num_video_streams_, num_audio_streams_, num_flexfec_streams_,
- send_transport_.get());
- if (test->ShouldCreateReceivers()) {
- CreateMatchingReceiveConfigs(receive_transport_.get());
- }
- if (num_video_streams_ > 0) {
- test->ModifyVideoConfigs(&video_send_config_, &video_receive_configs_,
- &video_encoder_config_);
- }
- if (num_audio_streams_ > 0) {
- test->ModifyAudioConfigs(&audio_send_config_, &audio_receive_configs_);
- }
- if (num_flexfec_streams_ > 0) {
- test->ModifyFlexfecConfigs(&flexfec_receive_configs_);
- }
+ CreateSendConfig(num_video_streams_, num_audio_streams_,
+ num_flexfec_streams_, send_transport_.get());
+ if (test->ShouldCreateReceivers()) {
+ CreateMatchingReceiveConfigs(receive_transport_.get());
+ }
+ if (num_video_streams_ > 0) {
+ test->ModifyVideoConfigs(&video_send_config_, &video_receive_configs_,
+ &video_encoder_config_);
+ }
+ if (num_audio_streams_ > 0) {
+ test->ModifyAudioConfigs(&audio_send_config_, &audio_receive_configs_);
+ }
+ if (num_flexfec_streams_ > 0) {
+ test->ModifyFlexfecConfigs(&flexfec_receive_configs_);
+ }
- if (num_flexfec_streams_ > 0) {
- CreateFlexfecStreams();
- test->OnFlexfecStreamsCreated(flexfec_receive_streams_);
- }
- if (num_video_streams_ > 0) {
- CreateVideoStreams();
- test->OnVideoStreamsCreated(video_send_stream_, video_receive_streams_);
- }
- if (num_audio_streams_ > 0) {
- CreateAudioStreams();
- test->OnAudioStreamsCreated(audio_send_stream_, audio_receive_streams_);
- }
+ if (num_flexfec_streams_ > 0) {
+ CreateFlexfecStreams();
+ test->OnFlexfecStreamsCreated(flexfec_receive_streams_);
+ }
+ if (num_video_streams_ > 0) {
+ CreateVideoStreams();
+ test->OnVideoStreamsCreated(video_send_stream_, video_receive_streams_);
+ }
+ if (num_audio_streams_ > 0) {
+ CreateAudioStreams();
+ test->OnAudioStreamsCreated(audio_send_stream_, audio_receive_streams_);
+ }
- if (num_video_streams_ > 0) {
- int width = kDefaultWidth;
- int height = kDefaultHeight;
- int frame_rate = kDefaultFramerate;
- test->ModifyVideoCaptureStartResolution(&width, &height, &frame_rate);
- CreateFrameGeneratorCapturer(frame_rate, width, height);
- test->OnFrameGeneratorCapturerCreated(frame_generator_capturer_.get());
- }
+ if (num_video_streams_ > 0) {
+ int width = kDefaultWidth;
+ int height = kDefaultHeight;
+ int frame_rate = kDefaultFramerate;
+ test->ModifyVideoCaptureStartResolution(&width, &height, &frame_rate);
+ CreateFrameGeneratorCapturer(frame_rate, width, height);
+ test->OnFrameGeneratorCapturerCreated(frame_generator_capturer_.get());
+ }
- Start();
+ Start();
+ });
+
test->PerformTest();
- send_transport_->StopSending();
- receive_transport_->StopSending();
- Stop();
- DestroyStreams();
- DestroyCalls();
- if (num_audio_streams_ > 0)
- DestroyVoiceEngines();
+ task_queue_.SendTask([this]() {
+ Stop();
+ DestroyStreams();
+ send_transport_.reset();
+ receive_transport_.reset();
+ DestroyCalls();
+ if (num_audio_streams_ > 0)
+ DestroyVoiceEngines();
+ });
test->OnTestFinished();
}
@@ -517,16 +532,19 @@
void BaseTest::OnCallsCreated(Call* sender_call, Call* receiver_call) {
}
-test::PacketTransport* BaseTest::CreateSendTransport(Call* sender_call) {
- return new PacketTransport(sender_call, this, test::PacketTransport::kSender,
- CallTest::payload_type_map_,
- FakeNetworkPipe::Config());
+test::PacketTransport* BaseTest::CreateSendTransport(
+ SingleThreadedTaskQueueForTesting* task_queue,
+ Call* sender_call) {
+ return new PacketTransport(
+ task_queue, sender_call, this, test::PacketTransport::kSender,
+ CallTest::payload_type_map_, FakeNetworkPipe::Config());
}
-test::PacketTransport* BaseTest::CreateReceiveTransport() {
- return new PacketTransport(nullptr, this, test::PacketTransport::kReceiver,
- CallTest::payload_type_map_,
- FakeNetworkPipe::Config());
+test::PacketTransport* BaseTest::CreateReceiveTransport(
+ SingleThreadedTaskQueueForTesting* task_queue) {
+ return new PacketTransport(
+ task_queue, nullptr, this, test::PacketTransport::kReceiver,
+ CallTest::payload_type_map_, FakeNetworkPipe::Config());
}
size_t BaseTest::GetNumVideoStreams() const {
diff --git a/webrtc/test/call_test.h b/webrtc/test/call_test.h
index 5186afa..b0ae3f6 100644
--- a/webrtc/test/call_test.h
+++ b/webrtc/test/call_test.h
@@ -23,6 +23,7 @@
#include "webrtc/test/fake_videorenderer.h"
#include "webrtc/test/frame_generator_capturer.h"
#include "webrtc/test/rtp_rtcp_observer.h"
+#include "webrtc/test/single_threaded_task_queue.h"
namespace webrtc {
@@ -136,6 +137,8 @@
rtc::scoped_refptr<AudioEncoderFactory> encoder_factory_;
test::FakeVideoRenderer fake_renderer_;
+ SingleThreadedTaskQueueForTesting task_queue_;
+
private:
// TODO(holmer): Remove once VoiceEngine is fully refactored to the new API.
// These methods are used to set up legacy voice engines and channels which is
@@ -188,8 +191,11 @@
RtpTransportControllerSend* controller);
virtual void OnCallsCreated(Call* sender_call, Call* receiver_call);
- virtual test::PacketTransport* CreateSendTransport(Call* sender_call);
- virtual test::PacketTransport* CreateReceiveTransport();
+ virtual test::PacketTransport* CreateSendTransport(
+ SingleThreadedTaskQueueForTesting* task_queue,
+ Call* sender_call);
+ virtual test::PacketTransport* CreateReceiveTransport(
+ SingleThreadedTaskQueueForTesting* task_queue);
virtual void ModifyVideoConfigs(
VideoSendStream::Config* send_config,
diff --git a/webrtc/test/direct_transport.cc b/webrtc/test/direct_transport.cc
index 370425c..81f3b69 100644
--- a/webrtc/test/direct_transport.cc
+++ b/webrtc/test/direct_transport.cc
@@ -10,7 +10,9 @@
#include "webrtc/test/direct_transport.h"
#include "webrtc/call/call.h"
+#include "webrtc/rtc_base/ptr_util.h"
#include "webrtc/system_wrappers/include/clock.h"
+#include "webrtc/test/single_threaded_task_queue.h"
namespace webrtc {
namespace test {
@@ -32,36 +34,71 @@
DirectTransport::DirectTransport(const FakeNetworkPipe::Config& config,
Call* send_call,
std::unique_ptr<Demuxer> demuxer)
+ : DirectTransport(nullptr, config, send_call, std::move(demuxer)) {}
+
+DirectTransport::DirectTransport(
+ SingleThreadedTaskQueueForTesting* task_queue,
+ Call* send_call,
+ const std::map<uint8_t, MediaType>& payload_type_map)
+ : DirectTransport(task_queue,
+ FakeNetworkPipe::Config(),
+ send_call,
+ payload_type_map) {
+}
+
+DirectTransport::DirectTransport(
+ SingleThreadedTaskQueueForTesting* task_queue,
+ const FakeNetworkPipe::Config& config,
+ Call* send_call,
+ const std::map<uint8_t, MediaType>& payload_type_map)
+ : DirectTransport(
+ task_queue,
+ config,
+ send_call,
+ std::unique_ptr<Demuxer>(new DemuxerImpl(payload_type_map))) {
+}
+
+DirectTransport::DirectTransport(SingleThreadedTaskQueueForTesting* task_queue,
+ const FakeNetworkPipe::Config& config,
+ Call* send_call,
+ std::unique_ptr<Demuxer> demuxer)
: send_call_(send_call),
- packet_event_(false, false),
- thread_(NetworkProcess, this, "NetworkProcess"),
clock_(Clock::GetRealTimeClock()),
- shutting_down_(false),
+ task_queue_(task_queue),
fake_network_(clock_, config, std::move(demuxer)) {
- thread_.Start();
+ // TODO(eladalon): When the deprecated ctors are removed, this check
+ // can be restored. https://bugs.chromium.org/p/webrtc/issues/detail?id=8125
+ // RTC_DCHECK(task_queue);
+ if (!task_queue) {
+ deprecated_task_queue_ =
+ rtc::MakeUnique<SingleThreadedTaskQueueForTesting>("deprecated_queue");
+ task_queue_ = deprecated_task_queue_.get();
+ }
+
if (send_call_) {
send_call_->SignalChannelNetworkState(MediaType::AUDIO, kNetworkUp);
send_call_->SignalChannelNetworkState(MediaType::VIDEO, kNetworkUp);
}
+ SendPackets();
}
-DirectTransport::~DirectTransport() { StopSending(); }
+DirectTransport::~DirectTransport() {
+ RTC_DCHECK_CALLED_SEQUENTIALLY(&sequence_checker_);
+ // Constructor updates |next_scheduled_task_|, so it's guaranteed to
+ // be initialized.
+ task_queue_->CancelTask(next_scheduled_task_);
+}
void DirectTransport::SetConfig(const FakeNetworkPipe::Config& config) {
fake_network_.SetConfig(config);
}
void DirectTransport::StopSending() {
- {
- rtc::CritScope crit(&lock_);
- shutting_down_ = true;
- }
-
- packet_event_.Set();
- thread_.Stop();
+ task_queue_->CancelTask(next_scheduled_task_);
}
void DirectTransport::SetReceiver(PacketReceiver* receiver) {
+ RTC_DCHECK_CALLED_SEQUENTIALLY(&sequence_checker_);
fake_network_.SetReceiver(receiver);
}
@@ -74,13 +111,11 @@
send_call_->OnSentPacket(sent_packet);
}
fake_network_.SendPacket(data, length);
- packet_event_.Set();
return true;
}
bool DirectTransport::SendRtcp(const uint8_t* data, size_t length) {
fake_network_.SendPacket(data, length);
- packet_event_.Set();
return true;
}
@@ -104,18 +139,15 @@
packet->data_length(), packet_time);
}
-bool DirectTransport::NetworkProcess(void* transport) {
- return static_cast<DirectTransport*>(transport)->SendPackets();
-}
+void DirectTransport::SendPackets() {
+ RTC_DCHECK_CALLED_SEQUENTIALLY(&sequence_checker_);
-bool DirectTransport::SendPackets() {
fake_network_.Process();
- int64_t wait_time_ms = fake_network_.TimeUntilNextProcess();
- if (wait_time_ms > 0) {
- packet_event_.Wait(static_cast<int>(wait_time_ms));
- }
- rtc::CritScope crit(&lock_);
- return shutting_down_ ? false : true;
+
+ int64_t delay_ms = fake_network_.TimeUntilNextProcess();
+ next_scheduled_task_ = task_queue_->PostDelayedTask([this]() {
+ SendPackets();
+ }, delay_ms);
}
} // namespace test
} // namespace webrtc
diff --git a/webrtc/test/direct_transport.h b/webrtc/test/direct_transport.h
index 0698992..93ba971 100644
--- a/webrtc/test/direct_transport.h
+++ b/webrtc/test/direct_transport.h
@@ -12,14 +12,13 @@
#include <assert.h>
-#include <deque>
+#include <memory>
#include "webrtc/api/call/transport.h"
#include "webrtc/call/call.h"
-#include "webrtc/rtc_base/criticalsection.h"
-#include "webrtc/rtc_base/event.h"
-#include "webrtc/rtc_base/platform_thread.h"
+#include "webrtc/rtc_base/sequenced_task_checker.h"
#include "webrtc/test/fake_network_pipe.h"
+#include "webrtc/test/single_threaded_task_queue.h"
namespace webrtc {
@@ -30,14 +29,17 @@
class DirectTransport : public Transport {
public:
- DirectTransport(Call* send_call,
- const std::map<uint8_t, MediaType>& payload_type_map);
- DirectTransport(const FakeNetworkPipe::Config& config,
- Call* send_call,
- const std::map<uint8_t, MediaType>& payload_type_map);
- DirectTransport(const FakeNetworkPipe::Config& config,
- Call* send_call,
- std::unique_ptr<Demuxer> demuxer);
+ RTC_DEPRECATED DirectTransport(
+ Call* send_call,
+ const std::map<uint8_t, MediaType>& payload_type_map);
+ RTC_DEPRECATED DirectTransport(
+ const FakeNetworkPipe::Config& config,
+ Call* send_call,
+ const std::map<uint8_t, MediaType>& payload_type_map);
+ RTC_DEPRECATED DirectTransport(
+ const FakeNetworkPipe::Config& config,
+ Call* send_call,
+ std::unique_ptr<Demuxer> demuxer);
// This deprecated variant always uses MediaType::VIDEO.
RTC_DEPRECATED explicit DirectTransport(Call* send_call)
@@ -46,11 +48,26 @@
send_call,
std::unique_ptr<Demuxer>(new ForceDemuxer(MediaType::VIDEO))) {}
+ DirectTransport(SingleThreadedTaskQueueForTesting* task_queue,
+ Call* send_call,
+ const std::map<uint8_t, MediaType>& payload_type_map);
+
+ DirectTransport(SingleThreadedTaskQueueForTesting* task_queue,
+ const FakeNetworkPipe::Config& config,
+ Call* send_call,
+ const std::map<uint8_t, MediaType>& payload_type_map);
+
+ DirectTransport(SingleThreadedTaskQueueForTesting* task_queue,
+ const FakeNetworkPipe::Config& config,
+ Call* send_call,
+ std::unique_ptr<Demuxer> demuxer);
+
~DirectTransport() override;
void SetConfig(const FakeNetworkPipe::Config& config);
- virtual void StopSending();
+ RTC_DEPRECATED void StopSending();
+
// TODO(holmer): Look into moving this to the constructor.
virtual void SetReceiver(PacketReceiver* receiver);
@@ -77,18 +94,25 @@
RTC_DISALLOW_COPY_AND_ASSIGN(ForceDemuxer);
};
- static bool NetworkProcess(void* transport);
- bool SendPackets();
+ void SendPackets();
- rtc::CriticalSection lock_;
Call* const send_call_;
- rtc::Event packet_event_;
- rtc::PlatformThread thread_;
Clock* const clock_;
- bool shutting_down_;
+ // TODO(eladalon): Make |task_queue_| const.
+ // https://bugs.chromium.org/p/webrtc/issues/detail?id=8125
+ SingleThreadedTaskQueueForTesting* task_queue_;
+ SingleThreadedTaskQueueForTesting::TaskId next_scheduled_task_;
FakeNetworkPipe fake_network_;
+
+ rtc::SequencedTaskChecker sequence_checker_;
+
+ // TODO(eladalon): https://bugs.chromium.org/p/webrtc/issues/detail?id=8125
+ // Deprecated versions of the ctor don't get the task queue passed in from
+ // outside. We'll create one locally for them. This is deprecated, and will
+ // be removed as soon as the need for those ctors is removed.
+ std::unique_ptr<SingleThreadedTaskQueueForTesting> deprecated_task_queue_;
};
} // namespace test
} // namespace webrtc
diff --git a/webrtc/test/layer_filtering_transport.cc b/webrtc/test/layer_filtering_transport.cc
index 00fe5f9..7d7288a 100644
--- a/webrtc/test/layer_filtering_transport.cc
+++ b/webrtc/test/layer_filtering_transport.cc
@@ -21,6 +21,7 @@
namespace test {
LayerFilteringTransport::LayerFilteringTransport(
+ SingleThreadedTaskQueueForTesting* task_queue,
const FakeNetworkPipe::Config& config,
Call* send_call,
uint8_t vp8_video_payload_type,
@@ -28,7 +29,7 @@
int selected_tl,
int selected_sl,
const std::map<uint8_t, MediaType>& payload_type_map)
- : test::DirectTransport(config, send_call, payload_type_map),
+ : DirectTransport(task_queue, config, send_call, payload_type_map),
vp8_video_payload_type_(vp8_video_payload_type),
vp9_video_payload_type_(vp9_video_payload_type),
selected_tl_(selected_tl),
diff --git a/webrtc/test/layer_filtering_transport.h b/webrtc/test/layer_filtering_transport.h
index 488f764..32eb4c5 100644
--- a/webrtc/test/layer_filtering_transport.h
+++ b/webrtc/test/layer_filtering_transport.h
@@ -13,6 +13,7 @@
#include "webrtc/call/call.h"
#include "webrtc/test/direct_transport.h"
#include "webrtc/test/fake_network_pipe.h"
+#include "webrtc/test/single_threaded_task_queue.h"
#include <map>
@@ -22,7 +23,8 @@
class LayerFilteringTransport : public test::DirectTransport {
public:
- LayerFilteringTransport(const FakeNetworkPipe::Config& config,
+ LayerFilteringTransport(SingleThreadedTaskQueueForTesting* task_queue,
+ const FakeNetworkPipe::Config& config,
Call* send_call,
uint8_t vp8_video_payload_type,
uint8_t vp9_video_payload_type,
diff --git a/webrtc/test/rtp_rtcp_observer.h b/webrtc/test/rtp_rtcp_observer.h
index 54db770..4154010 100644
--- a/webrtc/test/rtp_rtcp_observer.h
+++ b/webrtc/test/rtp_rtcp_observer.h
@@ -32,6 +32,7 @@
namespace test {
class PacketTransport;
+class SingleThreadedTaskQueueForTesting;
class RtpRtcpObserver {
public:
@@ -91,12 +92,16 @@
public:
enum TransportType { kReceiver, kSender };
- PacketTransport(Call* send_call,
+ PacketTransport(SingleThreadedTaskQueueForTesting* task_queue,
+ Call* send_call,
RtpRtcpObserver* observer,
TransportType transport_type,
const std::map<uint8_t, MediaType>& payload_type_map,
const FakeNetworkPipe::Config& configuration)
- : test::DirectTransport(configuration, send_call, payload_type_map),
+ : test::DirectTransport(task_queue,
+ configuration,
+ send_call,
+ payload_type_map),
observer_(observer),
transport_type_(transport_type) {}
diff --git a/webrtc/test/single_threaded_task_queue.cc b/webrtc/test/single_threaded_task_queue.cc
new file mode 100644
index 0000000..bee5981
--- /dev/null
+++ b/webrtc/test/single_threaded_task_queue.cc
@@ -0,0 +1,144 @@
+/*
+ * Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "webrtc/test/single_threaded_task_queue.h"
+
+#include <utility>
+
+#include "webrtc/rtc_base/checks.h"
+#include "webrtc/rtc_base/ptr_util.h"
+#include "webrtc/rtc_base/safe_conversions.h"
+#include "webrtc/rtc_base/timeutils.h"
+
+namespace webrtc {
+namespace test {
+
+SingleThreadedTaskQueueForTesting::QueuedTask::QueuedTask(
+ SingleThreadedTaskQueueForTesting::TaskId task_id,
+ int64_t earliest_execution_time,
+ SingleThreadedTaskQueueForTesting::Task task)
+ : task_id(task_id),
+ earliest_execution_time(earliest_execution_time),
+ task(task) {}
+
+SingleThreadedTaskQueueForTesting::QueuedTask::~QueuedTask() = default;
+
+SingleThreadedTaskQueueForTesting::SingleThreadedTaskQueueForTesting(
+ const char* name)
+ : thread_(Run, this, name),
+ running_(true),
+ next_task_id_(0),
+ wake_up_(false, false) {
+ thread_.Start();
+}
+
+SingleThreadedTaskQueueForTesting::~SingleThreadedTaskQueueForTesting() {
+ RTC_DCHECK_RUN_ON(&owner_thread_checker_);
+ {
+ rtc::CritScope lock(&cs_);
+ running_ = false;
+ }
+ wake_up_.Set();
+ thread_.Stop();
+}
+
+SingleThreadedTaskQueueForTesting::TaskId
+SingleThreadedTaskQueueForTesting::PostTask(Task task) {
+ return PostDelayedTask(task, 0);
+}
+
+SingleThreadedTaskQueueForTesting::TaskId
+SingleThreadedTaskQueueForTesting::PostDelayedTask(Task task,
+ int64_t delay_ms) {
+ int64_t earliest_exec_time = rtc::TimeAfter(delay_ms);
+
+ rtc::CritScope lock(&cs_);
+
+ TaskId id = next_task_id_++;
+
+ // Insert after any other tasks with an earlier-or-equal target time.
+ auto it = tasks_.begin();
+ for (; it != tasks_.end(); it++) {
+ if (earliest_exec_time < (*it)->earliest_execution_time) {
+ break;
+ }
+ }
+ tasks_.insert(it, rtc::MakeUnique<QueuedTask>(id, earliest_exec_time, task));
+
+ // This class is optimized for simplicty, not for performance. This will wake
+ // the thread up even if the next task in the queue is only scheduled for
+ // quite some time from now. In that case, the thread will just send itself
+ // back to sleep.
+ wake_up_.Set();
+
+ return id;
+}
+
+void SingleThreadedTaskQueueForTesting::SendTask(Task task) {
+ rtc::Event done(true, false);
+ PostTask([&task, &done]() {
+ task();
+ done.Set();
+ });
+ done.Wait(rtc::Event::kForever);
+}
+
+bool SingleThreadedTaskQueueForTesting::CancelTask(TaskId task_id) {
+ rtc::CritScope lock(&cs_);
+ for (auto it = tasks_.begin(); it != tasks_.end(); it++) {
+ if ((*it)->task_id == task_id) {
+ tasks_.erase(it);
+ return true;
+ }
+ }
+ return false;
+}
+
+void SingleThreadedTaskQueueForTesting::Run(void* obj) {
+ static_cast<SingleThreadedTaskQueueForTesting*>(obj)->RunLoop();
+}
+
+void SingleThreadedTaskQueueForTesting::RunLoop() {
+ while (true) {
+ std::unique_ptr<QueuedTask> queued_task;
+
+ // An empty queue would lead to sleeping until the queue becoems non-empty.
+ // A queue where the earliest task is shceduled for later than now, will
+ // lead to sleeping until the time of the next scheduled task (or until
+ // more tasks are scheduled).
+ int wait_time = rtc::Event::kForever;
+
+ {
+ rtc::CritScope lock(&cs_);
+ if (!running_) {
+ return;
+ }
+ if (!tasks_.empty()) {
+ int64_t remaining_delay_ms = rtc::TimeDiff(
+ tasks_.front()->earliest_execution_time, rtc::TimeMillis());
+ if (remaining_delay_ms <= 0) {
+ queued_task = std::move(tasks_.front());
+ tasks_.pop_front();
+ } else {
+ wait_time = rtc::saturated_cast<int>(remaining_delay_ms);
+ }
+ }
+ }
+
+ if (queued_task) {
+ queued_task->task();
+ } else {
+ wake_up_.Wait(wait_time);
+ }
+ }
+}
+
+} // namespace test
+} // namespace webrtc
diff --git a/webrtc/test/single_threaded_task_queue.h b/webrtc/test/single_threaded_task_queue.h
new file mode 100644
index 0000000..ecb22b3
--- /dev/null
+++ b/webrtc/test/single_threaded_task_queue.h
@@ -0,0 +1,97 @@
+/*
+ * Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+#ifndef WEBRTC_TEST_SINGLE_THREADED_TASK_QUEUE_H_
+#define WEBRTC_TEST_SINGLE_THREADED_TASK_QUEUE_H_
+
+#include <functional>
+#include <list>
+#include <memory>
+
+#include "webrtc/rtc_base/criticalsection.h"
+#include "webrtc/rtc_base/event.h"
+#include "webrtc/rtc_base/platform_thread.h"
+#include "webrtc/rtc_base/thread_checker.h"
+
+namespace webrtc {
+namespace test {
+
+// This class gives capabilities similar to rtc::TaskQueue, but ensures
+// everything happens on the same thread. This is intended to make the
+// threading model of unit-tests (specifically end-to-end tests) more closely
+// resemble that of real WebRTC, thereby allowing us to replace some critical
+// sections by thread-checkers.
+// This task is NOT tuned for performance, but rather for simplicity.
+class SingleThreadedTaskQueueForTesting {
+ public:
+ using Task = std::function<void()>;
+ using TaskId = size_t;
+
+ explicit SingleThreadedTaskQueueForTesting(const char* name);
+ ~SingleThreadedTaskQueueForTesting();
+
+ // Sends one task to the task-queue, and returns a handle by which the
+ // task can be cancelled.
+ // This mimics the behavior of TaskQueue, but only for lambdas, rather than
+ // for both lambdas and QueuedTask objects.
+ TaskId PostTask(Task task);
+
+ // Same as PostTask(), but ensures that the task will not begin execution
+ // less than |delay_ms| milliseconds after being posted; an upper bound
+ // is not provided.
+ TaskId PostDelayedTask(Task task, int64_t delay_ms);
+
+ // Send one task to the queue. The function does not return until the task
+ // has finished executing. No support for canceling the task.
+ void SendTask(Task task);
+
+ // Given an identifier to the task, attempts to eject it from the queue.
+ // Returns true if the task was found and cancelled. Failure possible
+ // only for invalid task IDs, or for tasks which have already been executed.
+ bool CancelTask(TaskId task_id);
+
+ private:
+ struct QueuedTask {
+ QueuedTask(TaskId task_id, int64_t earliest_execution_time, Task task);
+ ~QueuedTask();
+
+ TaskId task_id;
+ int64_t earliest_execution_time;
+ Task task;
+ };
+
+ static void Run(void* obj);
+
+ void RunLoop();
+
+ rtc::CriticalSection cs_;
+ std::list<std::unique_ptr<QueuedTask>> tasks_ GUARDED_BY(cs_);
+ rtc::ThreadChecker owner_thread_checker_;
+ rtc::PlatformThread thread_;
+ bool running_ GUARDED_BY(cs_);
+
+ TaskId next_task_id_;
+
+ // The task-queue will sleep when not executing a task. Wake up occurs when:
+ // * Upon destruction, to make sure that the |thead_| terminates, so that it
+ // may be joined. [Event will be set.]
+ // * New task added. Because we optimize for simplicity rahter than for
+ // performance (this class is a testing facility only), waking up occurs
+ // when we get a new task even if it is scheduled with a delay. The RunLoop
+ // is in charge of sending itself back to sleep if the next task is only
+ // to be executed at a later time. [Event will be set.]
+ // * When the next task in the queue is a delayed-task, and the time for
+ // its execution has come. [Event will time-out.]
+ rtc::Event wake_up_;
+};
+
+} // namespace test
+} // namespace webrtc
+
+#endif // WEBRTC_TEST_SINGLE_THREADED_TASK_QUEUE_H_
diff --git a/webrtc/test/single_threaded_task_queue_unittest.cc b/webrtc/test/single_threaded_task_queue_unittest.cc
new file mode 100644
index 0000000..8ad8b4f
--- /dev/null
+++ b/webrtc/test/single_threaded_task_queue_unittest.cc
@@ -0,0 +1,364 @@
+/*
+ * Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "webrtc/test/single_threaded_task_queue.h"
+
+#include <atomic>
+#include <memory>
+#include <vector>
+
+#include "webrtc/rtc_base/event.h"
+#include "webrtc/rtc_base/ptr_util.h"
+#include "webrtc/test/gtest.h"
+
+namespace webrtc {
+namespace test {
+
+namespace {
+
+using TaskId = SingleThreadedTaskQueueForTesting::TaskId;
+
+// Test should not rely on the object under test not being faulty. If the task
+// queue ever blocks forever, we want the tests to fail, rather than hang.
+constexpr int kMaxWaitTimeMs = 10000;
+
+TEST(SingleThreadedTaskQueueForTestingTest, SanityConstructionDestruction) {
+ SingleThreadedTaskQueueForTesting task_queue("task_queue");
+}
+
+TEST(SingleThreadedTaskQueueForTestingTest, ExecutesPostedTasks) {
+ SingleThreadedTaskQueueForTesting task_queue("task_queue");
+
+ std::atomic<bool> executed(false);
+ rtc::Event done(true, false);
+
+ task_queue.PostTask([&executed, &done]() {
+ executed.store(true);
+ done.Set();
+ });
+ ASSERT_TRUE(done.Wait(kMaxWaitTimeMs));
+
+ EXPECT_TRUE(executed.load());
+}
+
+TEST(SingleThreadedTaskQueueForTestingTest,
+ PostMultipleTasksFromSameExternalThread) {
+ SingleThreadedTaskQueueForTesting task_queue("task_queue");
+
+ constexpr size_t kCount = 3;
+ std::atomic<bool> executed[kCount];
+ for (std::atomic<bool>& exec : executed) {
+ exec.store(false);
+ }
+
+ std::vector<std::unique_ptr<rtc::Event>> done_events;
+ for (size_t i = 0; i < kCount; i++) {
+ done_events.emplace_back(rtc::MakeUnique<rtc::Event>(false, false));
+ }
+
+ // To avoid the tasks which comprise the actual test from running before they
+ // have all be posted, which could result in only one task ever being in the
+ // queue at any given time, post one waiting task that would block the
+ // task-queue, and unblock only after all tasks have been posted.
+ rtc::Event rendezvous(true, false);
+ task_queue.PostTask([&rendezvous]() {
+ ASSERT_TRUE(rendezvous.Wait(kMaxWaitTimeMs));
+ });
+
+ // Post the tasks which comprise the test.
+ for (size_t i = 0; i < kCount; i++) {
+ task_queue.PostTask([&executed, &done_events, i]() { // |i| by value.
+ executed[i].store(true);
+ done_events[i]->Set();
+ });
+ }
+
+ rendezvous.Set(); // Release the task-queue.
+
+ // Wait until the task queue has executed all the tasks.
+ for (size_t i = 0; i < kCount; i++) {
+ ASSERT_TRUE(done_events[i]->Wait(kMaxWaitTimeMs));
+ }
+
+ for (size_t i = 0; i < kCount; i++) {
+ EXPECT_TRUE(executed[i].load());
+ }
+}
+
+TEST(SingleThreadedTaskQueueForTestingTest, PostToTaskQueueFromOwnThread) {
+ SingleThreadedTaskQueueForTesting task_queue("task_queue");
+
+ std::atomic<bool> executed(false);
+ rtc::Event done(true, false);
+
+ auto internally_posted_task = [&executed, &done]() {
+ executed.store(true);
+ done.Set();
+ };
+
+ auto externally_posted_task = [&task_queue, &internally_posted_task]() {
+ task_queue.PostTask(internally_posted_task);
+ };
+
+ task_queue.PostTask(externally_posted_task);
+
+ ASSERT_TRUE(done.Wait(kMaxWaitTimeMs));
+ EXPECT_TRUE(executed.load());
+}
+
+TEST(SingleThreadedTaskQueueForTestingTest, TasksExecutedInSequence) {
+ SingleThreadedTaskQueueForTesting task_queue("task_queue");
+
+ // The first task would perform:
+ // accumulator = 10 * accumulator + i
+ // Where |i| is 1, 2 and 3 for the 1st, 2nd and 3rd tasks, respectively.
+ // The result would be 123 if and only iff the tasks were executed in order.
+ size_t accumulator = 0;
+ size_t expected_value = 0; // Updates to the correct value.
+
+ // Prevent the chain from being set in motion before we've had time to
+ // schedule it all, lest the queue only contain one task at a time.
+ rtc::Event rendezvous(true, false);
+ task_queue.PostTask([&rendezvous]() {
+ ASSERT_TRUE(rendezvous.Wait(kMaxWaitTimeMs));
+ });
+
+ for (size_t i = 0; i < 3; i++) {
+ task_queue.PostTask([&accumulator, i]() { // |i| passed by value.
+ accumulator = 10 * accumulator + i;
+ });
+ expected_value = 10 * expected_value + i;
+ }
+
+ // The test will wait for the task-queue to finish.
+ rtc::Event done(true, false);
+ task_queue.PostTask([&done]() {
+ done.Set();
+ });
+
+ rendezvous.Set(); // Set the chain in motion.
+
+ ASSERT_TRUE(done.Wait(kMaxWaitTimeMs));
+
+ EXPECT_EQ(accumulator, expected_value);
+}
+
+TEST(SingleThreadedTaskQueueForTestingTest, ExecutesPostedDelayedTask) {
+ SingleThreadedTaskQueueForTesting task_queue("task_queue");
+
+ std::atomic<bool> executed(false);
+ rtc::Event done(true, false);
+
+ constexpr int64_t delay_ms = 20;
+ static_assert(delay_ms < kMaxWaitTimeMs / 2, "Delay too long for tests.");
+
+ task_queue.PostDelayedTask([&executed, &done]() {
+ executed.store(true);
+ done.Set();
+ }, delay_ms);
+ ASSERT_TRUE(done.Wait(kMaxWaitTimeMs));
+
+ EXPECT_TRUE(executed.load());
+}
+
+TEST(SingleThreadedTaskQueueForTestingTest, DoesNotExecuteDelayedTaskTooSoon) {
+ SingleThreadedTaskQueueForTesting task_queue("task_queue");
+
+ std::atomic<bool> executed(false);
+
+ constexpr int64_t delay_ms = 2000;
+ static_assert(delay_ms < kMaxWaitTimeMs / 2, "Delay too long for tests.");
+
+ task_queue.PostDelayedTask([&executed]() {
+ executed.store(true);
+ }, delay_ms);
+
+ // Wait less than is enough, make sure the task was not yet executed.
+ rtc::Event not_done(true, false);
+ ASSERT_FALSE(not_done.Wait(delay_ms / 2));
+ EXPECT_FALSE(executed.load());
+}
+
+TEST(SingleThreadedTaskQueueForTestingTest,
+ TaskWithLesserDelayPostedAfterFirstDelayedTaskExectuedBeforeFirst) {
+ SingleThreadedTaskQueueForTesting task_queue("task_queue");
+
+ std::atomic<bool> earlier_executed(false);
+ constexpr int64_t earlier_delay_ms = 500;
+
+ std::atomic<bool> later_executed(false);
+ constexpr int64_t later_delay_ms = 1000;
+
+ static_assert(earlier_delay_ms + later_delay_ms < kMaxWaitTimeMs / 2,
+ "Delay too long for tests.");
+
+ rtc::Event done(true, false);
+
+ auto earlier_task = [&earlier_executed, &later_executed]() {
+ EXPECT_FALSE(later_executed.load());
+ earlier_executed.store(true);
+ };
+
+ auto later_task = [&earlier_executed, &later_executed, &done]() {
+ EXPECT_TRUE(earlier_executed.load());
+ later_executed.store(true);
+ done.Set();
+ };
+
+ task_queue.PostDelayedTask(later_task, later_delay_ms);
+ task_queue.PostDelayedTask(earlier_task, earlier_delay_ms);
+
+ ASSERT_TRUE(done.Wait(kMaxWaitTimeMs));
+ ASSERT_TRUE(earlier_executed);
+ ASSERT_TRUE(later_executed);
+}
+
+TEST(SingleThreadedTaskQueueForTestingTest,
+ TaskWithGreaterDelayPostedAfterFirstDelayedTaskExectuedAfterFirst) {
+ SingleThreadedTaskQueueForTesting task_queue("task_queue");
+
+ std::atomic<bool> earlier_executed(false);
+ constexpr int64_t earlier_delay_ms = 500;
+
+ std::atomic<bool> later_executed(false);
+ constexpr int64_t later_delay_ms = 1000;
+
+ static_assert(earlier_delay_ms + later_delay_ms < kMaxWaitTimeMs / 2,
+ "Delay too long for tests.");
+
+ rtc::Event done(true, false);
+
+ auto earlier_task = [&earlier_executed, &later_executed]() {
+ EXPECT_FALSE(later_executed.load());
+ earlier_executed.store(true);
+ };
+
+ auto later_task = [&earlier_executed, &later_executed, &done]() {
+ EXPECT_TRUE(earlier_executed.load());
+ later_executed.store(true);
+ done.Set();
+ };
+
+ task_queue.PostDelayedTask(earlier_task, earlier_delay_ms);
+ task_queue.PostDelayedTask(later_task, later_delay_ms);
+
+ ASSERT_TRUE(done.Wait(kMaxWaitTimeMs));
+ ASSERT_TRUE(earlier_executed);
+ ASSERT_TRUE(later_executed);
+}
+
+TEST(SingleThreadedTaskQueueForTestingTest, ExternalThreadCancelsTask) {
+ SingleThreadedTaskQueueForTesting task_queue("task_queue");
+
+ rtc::Event done(true, false);
+
+ // Prevent the to-be-cancelled task from being executed before we've had
+ // time to cancel it.
+ rtc::Event rendezvous(true, false);
+ task_queue.PostTask([&rendezvous]() {
+ ASSERT_TRUE(rendezvous.Wait(kMaxWaitTimeMs));
+ });
+
+ TaskId cancelled_task_id = task_queue.PostTask([]() {
+ EXPECT_TRUE(false);
+ });
+ task_queue.PostTask([&done]() {
+ done.Set();
+ });
+
+ task_queue.CancelTask(cancelled_task_id);
+
+ // Set the tasks in motion; the cancelled task does not run (otherwise the
+ // test would fail). The last task ends the test, showing that the queue
+ // progressed beyond the cancelled task.
+ rendezvous.Set();
+ ASSERT_TRUE(done.Wait(kMaxWaitTimeMs));
+}
+
+// In this test, we'll set off a chain where the first task cancels the second
+// task, then a third task runs (showing that we really cancelled the task,
+// rather than just halted the task-queue).
+TEST(SingleThreadedTaskQueueForTestingTest, InternalThreadCancelsTask) {
+ SingleThreadedTaskQueueForTesting task_queue("task_queue");
+
+ rtc::Event done(true, false);
+
+ // Prevent the chain from being set-off before we've set everything up.
+ rtc::Event rendezvous(true, false);
+ task_queue.PostTask([&rendezvous]() {
+ ASSERT_TRUE(rendezvous.Wait(kMaxWaitTimeMs));
+ });
+
+ // This is the canceller-task. It takes cancelled_task_id by reference,
+ // because the ID will only become known after the cancelled task is
+ // scheduled.
+ TaskId cancelled_task_id;
+ auto canceller_task = [&task_queue, &cancelled_task_id]() {
+ task_queue.CancelTask(cancelled_task_id);
+ };
+ task_queue.PostTask(canceller_task);
+
+ // This task will be cancelled by the task before it.
+ auto cancelled_task = []() {
+ EXPECT_TRUE(false);
+ };
+ cancelled_task_id = task_queue.PostTask(cancelled_task);
+
+ // When this task runs, it will allow the test to be finished.
+ auto completion_marker_task = [&done]() {
+ done.Set();
+ };
+ task_queue.PostTask(completion_marker_task);
+
+ rendezvous.Set(); // Set the chain in motion.
+
+ ASSERT_TRUE(done.Wait(kMaxWaitTimeMs));
+}
+
+TEST(SingleThreadedTaskQueueForTestingTest, SendTask) {
+ SingleThreadedTaskQueueForTesting task_queue("task_queue");
+
+ std::atomic<bool> executed(false);
+
+ task_queue.SendTask([&executed]() {
+ // Intentionally delay, so that if SendTask didn't block, the sender thread
+ // would have time to read |executed|.
+ rtc::Event delay(true, false);
+ ASSERT_FALSE(delay.Wait(1000));
+ executed.store(true);
+ });
+
+ EXPECT_TRUE(executed);
+}
+
+TEST(SingleThreadedTaskQueueForTestingTest,
+ DestructTaskQueueWhileTasksPending) {
+ auto task_queue =
+ rtc::MakeUnique<SingleThreadedTaskQueueForTesting>("task_queue");
+
+ std::atomic<size_t> counter(0);
+
+ constexpr size_t tasks = 10;
+ for (size_t i = 0; i < tasks; i++) {
+ task_queue->PostTask([&counter]() {
+ std::atomic_fetch_add(&counter, static_cast<size_t>(1));
+ rtc::Event delay(true, false);
+ ASSERT_FALSE(delay.Wait(500));
+ });
+ }
+
+ task_queue.reset();
+
+ EXPECT_LT(counter, tasks);
+}
+
+} // namespace
+} // namespace test
+} // namespace webrtc