Revert "Stop using and delete DEPRECATED_SingleThreadedTaskQueueForTesting"
This reverts commit b1c1f6907fec2d18ae8b00ebc44975cb46a95b11.
Reason for revert: It may be the cause of iOS64 Debug flakyness.
Original change's description:
> Stop using and delete DEPRECATED_SingleThreadedTaskQueueForTesting
>
> Bug: webrtc:10933
> Change-Id: I8307e2aad06d3f3f367af122e43ecc088b52f2d6
> Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/157896
> Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
> Reviewed-by: Sebastian Jansson <srte@webrtc.org>
> Cr-Commit-Position: refs/heads/master@{#29713}
TBR=danilchap@webrtc.org,srte@webrtc.org
# Not skipping CQ checks because original CL landed > 1 day ago.
Bug: webrtc:10933
Change-Id: I94c86ebbae414a7569f253d199efbde6ac4c3765
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/159101
Reviewed-by: Honghai Zhang <honghaiz@webrtc.org>
Commit-Queue: Honghai Zhang <honghaiz@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#29737}
diff --git a/test/BUILD.gn b/test/BUILD.gn
index f4c1fc6..49c76b0 100644
--- a/test/BUILD.gn
+++ b/test/BUILD.gn
@@ -389,6 +389,7 @@
"../rtc_base:rtc_base_approved",
"../rtc_base:task_queue_for_test",
"../rtc_base/system:file_wrapper",
+ "../test:single_threaded_task_queue",
"pc/e2e:e2e_unittests",
"peer_scenario/tests",
"scenario:scenario_unittests",
@@ -407,6 +408,7 @@
"frame_generator_unittest.cc",
"rtp_file_reader_unittest.cc",
"rtp_file_writer_unittest.cc",
+ "single_threaded_task_queue_unittest.cc",
"testsupport/perf_test_unittest.cc",
"testsupport/test_artifacts_unittest.cc",
"testsupport/video_frame_writer_unittest.cc",
@@ -625,6 +627,22 @@
]
}
+rtc_library("single_threaded_task_queue") {
+ testonly = true
+ sources = [
+ "single_threaded_task_queue.cc",
+ "single_threaded_task_queue.h",
+ ]
+ deps = [
+ "../api/task_queue",
+ "../rtc_base:checks",
+ "../rtc_base:deprecation",
+ "../rtc_base:rtc_base_approved",
+ "../rtc_base:task_queue_for_test",
+ "../rtc_base/task_utils:to_queued_task",
+ ]
+}
+
rtc_library("fake_video_codecs") {
allow_poison = [ "software_video_codecs" ]
visibility = [ "*" ]
@@ -730,6 +748,7 @@
":fake_video_codecs",
":fileutils",
":rtp_test_utils",
+ ":single_threaded_task_queue",
":test_support",
":video_test_common",
"../api:rtp_headers",
diff --git a/test/call_test.cc b/test/call_test.cc
index 9f26cc6..d83f87a 100644
--- a/test/call_test.cc
+++ b/test/call_test.cc
@@ -56,9 +56,7 @@
num_flexfec_streams_(0),
audio_decoder_factory_(CreateBuiltinAudioDecoderFactory()),
audio_encoder_factory_(CreateBuiltinAudioEncoderFactory()),
- task_queue_(task_queue_factory_->CreateTaskQueue(
- "CallTestTaskQueue",
- TaskQueueFactory::Priority::NORMAL)) {}
+ task_queue_("CallTestTaskQueue") {}
CallTest::~CallTest() = default;
@@ -86,7 +84,7 @@
}
void CallTest::RunBaseTest(BaseTest* test) {
- SendTask(RTC_FROM_HERE, task_queue(), [this, test]() {
+ SendTask(RTC_FROM_HERE, &task_queue_, [this, test]() {
num_video_streams_ = test->GetNumVideoStreams();
num_audio_streams_ = test->GetNumAudioStreams();
num_flexfec_streams_ = test->GetNumFlexfecStreams();
@@ -125,9 +123,9 @@
CreateReceiverCall(recv_config);
}
test->OnCallsCreated(sender_call_.get(), receiver_call_.get());
- receive_transport_ = test->CreateReceiveTransport(task_queue());
+ receive_transport_ = test->CreateReceiveTransport(&task_queue_);
send_transport_ =
- test->CreateSendTransport(task_queue(), sender_call_.get());
+ test->CreateSendTransport(&task_queue_, sender_call_.get());
if (test->ShouldCreateReceivers()) {
send_transport_->SetReceiver(receiver_call_->Receiver());
@@ -186,7 +184,7 @@
test->PerformTest();
- SendTask(RTC_FROM_HERE, task_queue(), [this, test]() {
+ SendTask(RTC_FROM_HERE, &task_queue_, [this, test]() {
Stop();
test->OnStreamsStopped();
DestroyStreams();
diff --git a/test/call_test.h b/test/call_test.h
index ba9740d..6224a6e 100644
--- a/test/call_test.h
+++ b/test/call_test.h
@@ -30,6 +30,7 @@
#include "test/fake_vp8_encoder.h"
#include "test/frame_generator_capturer.h"
#include "test/rtp_rtcp_observer.h"
+#include "test/single_threaded_task_queue.h"
namespace webrtc {
namespace test {
@@ -173,7 +174,7 @@
void SetVideoEncoderConfig(const VideoEncoderConfig& config);
VideoSendStream* GetVideoSendStream();
FlexfecReceiveStream::Config* GetFlexFecConfig();
- TaskQueueBase* task_queue() { return task_queue_.get(); }
+ TaskQueueBase* task_queue() { return &task_queue_; }
Clock* const clock_;
@@ -229,7 +230,7 @@
void AddRtpExtensionByUri(const std::string& uri,
std::vector<RtpExtension>* extensions) const;
- std::unique_ptr<TaskQueueBase, TaskQueueDeleter> task_queue_;
+ DEPRECATED_SingleThreadedTaskQueueForTesting task_queue_;
std::vector<RtpExtension> rtp_extensions_;
rtc::scoped_refptr<AudioProcessing> apm_send_;
rtc::scoped_refptr<AudioProcessing> apm_recv_;
diff --git a/test/single_threaded_task_queue.cc b/test/single_threaded_task_queue.cc
new file mode 100644
index 0000000..c3aac1c
--- /dev/null
+++ b/test/single_threaded_task_queue.cc
@@ -0,0 +1,161 @@
+/*
+ * Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "test/single_threaded_task_queue.h"
+
+#include <memory>
+#include <utility>
+
+#include "rtc_base/checks.h"
+#include "rtc_base/numerics/safe_conversions.h"
+#include "rtc_base/time_utils.h"
+
+namespace webrtc {
+namespace test {
+
+DEPRECATED_SingleThreadedTaskQueueForTesting::StoredTask::StoredTask(
+ DEPRECATED_SingleThreadedTaskQueueForTesting::TaskId task_id,
+ std::unique_ptr<QueuedTask> task)
+ : task_id(task_id), task(std::move(task)) {}
+
+DEPRECATED_SingleThreadedTaskQueueForTesting::StoredTask::~StoredTask() =
+ default;
+
+DEPRECATED_SingleThreadedTaskQueueForTesting::
+ DEPRECATED_SingleThreadedTaskQueueForTesting(const char* name)
+ : thread_(Run, this, name), running_(true), next_task_id_(0) {
+ thread_.Start();
+}
+
+DEPRECATED_SingleThreadedTaskQueueForTesting::
+ ~DEPRECATED_SingleThreadedTaskQueueForTesting() {
+ Stop();
+}
+
+DEPRECATED_SingleThreadedTaskQueueForTesting::TaskId
+DEPRECATED_SingleThreadedTaskQueueForTesting::PostDelayed(
+ std::unique_ptr<QueuedTask> task,
+ int64_t delay_ms) {
+ int64_t earliest_exec_time = rtc::TimeAfter(delay_ms);
+
+ rtc::CritScope lock(&cs_);
+ if (!running_)
+ return kInvalidTaskId;
+
+ TaskId id = next_task_id_++;
+
+ // Insert after any other tasks with an earlier-or-equal target time.
+ // Note: multimap has promise "The order of the key-value pairs whose keys
+ // compare equivalent is the order of insertion and does not change."
+ tasks_.emplace(std::piecewise_construct,
+ std::forward_as_tuple(earliest_exec_time),
+ std::forward_as_tuple(id, std::move(task)));
+
+ // This class is optimized for simplicty, not for performance. This will wake
+ // the thread up even if the next task in the queue is only scheduled for
+ // quite some time from now. In that case, the thread will just send itself
+ // back to sleep.
+ wake_up_.Set();
+
+ return id;
+}
+
+bool DEPRECATED_SingleThreadedTaskQueueForTesting::CancelTask(TaskId task_id) {
+ rtc::CritScope lock(&cs_);
+ for (auto it = tasks_.begin(); it != tasks_.end(); it++) {
+ if (it->second.task_id == task_id) {
+ tasks_.erase(it);
+ return true;
+ }
+ }
+ return false;
+}
+
+bool DEPRECATED_SingleThreadedTaskQueueForTesting::IsCurrent() {
+ return rtc::IsThreadRefEqual(thread_.GetThreadRef(), rtc::CurrentThreadRef());
+}
+
+bool DEPRECATED_SingleThreadedTaskQueueForTesting::IsRunning() {
+ RTC_DCHECK_RUN_ON(&owner_thread_checker_);
+ // We could check the |running_| flag here, but this is equivalent for the
+ // purposes of this function.
+ return thread_.IsRunning();
+}
+
+bool DEPRECATED_SingleThreadedTaskQueueForTesting::HasPendingTasks() const {
+ rtc::CritScope lock(&cs_);
+ return !tasks_.empty();
+}
+
+void DEPRECATED_SingleThreadedTaskQueueForTesting::Stop() {
+ RTC_DCHECK_RUN_ON(&owner_thread_checker_);
+ if (!thread_.IsRunning())
+ return;
+
+ {
+ rtc::CritScope lock(&cs_);
+ running_ = false;
+ }
+
+ wake_up_.Set();
+ thread_.Stop();
+}
+
+void DEPRECATED_SingleThreadedTaskQueueForTesting::Run(void* obj) {
+ static_cast<DEPRECATED_SingleThreadedTaskQueueForTesting*>(obj)->RunLoop();
+}
+
+void DEPRECATED_SingleThreadedTaskQueueForTesting::RunLoop() {
+ CurrentTaskQueueSetter set_current(this);
+ while (true) {
+ std::unique_ptr<QueuedTask> queued_task;
+
+ // An empty queue would lead to sleeping until the queue becoems non-empty.
+ // A queue where the earliest task is scheduled for later than now, will
+ // lead to sleeping until the time of the next scheduled task (or until
+ // more tasks are scheduled).
+ int wait_time = rtc::Event::kForever;
+
+ {
+ rtc::CritScope lock(&cs_);
+ if (!running_) {
+ return;
+ }
+ if (!tasks_.empty()) {
+ auto next_delayed_task = tasks_.begin();
+ int64_t earliest_exec_time = next_delayed_task->first;
+ int64_t remaining_delay_ms =
+ rtc::TimeDiff(earliest_exec_time, rtc::TimeMillis());
+ if (remaining_delay_ms <= 0) {
+ queued_task = std::move(next_delayed_task->second.task);
+ tasks_.erase(next_delayed_task);
+ } else {
+ wait_time = rtc::saturated_cast<int>(remaining_delay_ms);
+ }
+ }
+ }
+
+ if (queued_task) {
+ if (!queued_task->Run()) {
+ queued_task.release();
+ }
+ } else {
+ wake_up_.Wait(wait_time);
+ }
+ }
+}
+
+void DEPRECATED_SingleThreadedTaskQueueForTesting::Delete() {
+ Stop();
+ delete this;
+}
+
+} // namespace test
+} // namespace webrtc
diff --git a/test/single_threaded_task_queue.h b/test/single_threaded_task_queue.h
new file mode 100644
index 0000000..3845829
--- /dev/null
+++ b/test/single_threaded_task_queue.h
@@ -0,0 +1,135 @@
+/*
+ * Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+#ifndef TEST_SINGLE_THREADED_TASK_QUEUE_H_
+#define TEST_SINGLE_THREADED_TASK_QUEUE_H_
+
+#include <functional>
+#include <map>
+#include <memory>
+#include <utility>
+
+#include "api/task_queue/task_queue_base.h"
+#include "rtc_base/critical_section.h"
+#include "rtc_base/deprecation.h"
+#include "rtc_base/event.h"
+#include "rtc_base/platform_thread.h"
+#include "rtc_base/task_queue_for_test.h"
+#include "rtc_base/task_utils/to_queued_task.h"
+#include "rtc_base/thread_checker.h"
+
+namespace webrtc {
+namespace test {
+
+// DEPRECATED. This class doesn't striclty follow rtc::TaskQueue semantics,
+// which makes it surprising and hard to use correctly.
+// Please use TaskQueueForTest instead.
+
+// This class gives capabilities similar to rtc::TaskQueue, but ensures
+// everything happens on the same thread. This is intended to make the
+// threading model of unit-tests (specifically end-to-end tests) more closely
+// resemble that of real WebRTC, thereby allowing us to replace some critical
+// sections by thread-checkers.
+// This task is NOT tuned for performance, but rather for simplicity.
+class DEPRECATED_SingleThreadedTaskQueueForTesting : public TaskQueueBase {
+ public:
+ using Task = std::function<void()>;
+ using TaskId = size_t;
+ constexpr static TaskId kInvalidTaskId = static_cast<TaskId>(-1);
+
+ explicit DEPRECATED_SingleThreadedTaskQueueForTesting(const char* name);
+ ~DEPRECATED_SingleThreadedTaskQueueForTesting() override;
+
+ // Sends one task to the task-queue, and returns a handle by which the
+ // task can be cancelled.
+ // This mimics the behavior of TaskQueue, but only for lambdas, rather than
+ // for both lambdas and QueuedTask objects.
+ TaskId PostTask(Task task) {
+ return PostDelayed(ToQueuedTask(std::move(task)), /*delay_ms=*/0);
+ }
+
+ // Same as PostTask(), but ensures that the task will not begin execution
+ // less than |delay_ms| milliseconds after being posted; an upper bound
+ // is not provided.
+ TaskId PostDelayedTask(Task task, int64_t delay_ms) {
+ return PostDelayed(ToQueuedTask(std::move(task)), delay_ms);
+ }
+
+ // Given an identifier to the task, attempts to eject it from the queue.
+ // Returns true if the task was found and cancelled. Failure possible
+ // only for invalid task IDs, or for tasks which have already been executed.
+ bool CancelTask(TaskId task_id);
+
+ // Returns true iff called on the thread associated with the task queue.
+ bool IsCurrent();
+
+ // Returns true iff the task queue is actively being serviced.
+ bool IsRunning();
+
+ bool HasPendingTasks() const;
+
+ void Stop();
+
+ // Implements TaskQueueBase.
+ void Delete() override;
+
+ void PostTask(std::unique_ptr<QueuedTask> task) override {
+ PostDelayed(std::move(task), /*delay_ms=*/0);
+ }
+
+ void PostDelayedTask(std::unique_ptr<QueuedTask> task,
+ uint32_t delay_ms) override {
+ PostDelayed(std::move(task), delay_ms);
+ }
+
+ private:
+ struct StoredTask {
+ StoredTask(TaskId task_id, std::unique_ptr<QueuedTask> task);
+ ~StoredTask();
+
+ TaskId task_id;
+ std::unique_ptr<QueuedTask> task;
+ };
+
+ TaskId PostDelayed(std::unique_ptr<QueuedTask> task, int64_t delay_ms);
+
+ static void Run(void* obj);
+
+ void RunLoop();
+
+ rtc::CriticalSection cs_;
+ // Tasks are ordered by earliest execution time.
+ std::multimap<int64_t, StoredTask> tasks_ RTC_GUARDED_BY(cs_);
+ rtc::ThreadChecker owner_thread_checker_;
+ rtc::PlatformThread thread_;
+ bool running_ RTC_GUARDED_BY(cs_);
+
+ TaskId next_task_id_;
+
+ // The task-queue will sleep when not executing a task. Wake up occurs when:
+ // * Upon destruction, to make sure that the |thead_| terminates, so that it
+ // may be joined. [Event will be set.]
+ // * New task added. Because we optimize for simplicity rahter than for
+ // performance (this class is a testing facility only), waking up occurs
+ // when we get a new task even if it is scheduled with a delay. The RunLoop
+ // is in charge of sending itself back to sleep if the next task is only
+ // to be executed at a later time. [Event will be set.]
+ // * When the next task in the queue is a delayed-task, and the time for
+ // its execution has come. [Event will time-out.]
+ rtc::Event wake_up_;
+};
+
+// Warn if new usage.
+typedef DEPRECATED_SingleThreadedTaskQueueForTesting RTC_DEPRECATED
+ SingleThreadedTaskQueueForTesting;
+
+} // namespace test
+} // namespace webrtc
+
+#endif // TEST_SINGLE_THREADED_TASK_QUEUE_H_
diff --git a/test/single_threaded_task_queue_unittest.cc b/test/single_threaded_task_queue_unittest.cc
new file mode 100644
index 0000000..9e2304d
--- /dev/null
+++ b/test/single_threaded_task_queue_unittest.cc
@@ -0,0 +1,375 @@
+/*
+ * Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "test/single_threaded_task_queue.h"
+
+#include <atomic>
+#include <memory>
+#include <vector>
+
+#include "api/task_queue/task_queue_test.h"
+#include "rtc_base/event.h"
+#include "rtc_base/task_queue_for_test.h"
+#include "test/gtest.h"
+
+namespace webrtc {
+namespace test {
+
+namespace {
+
+using TaskId = DEPRECATED_SingleThreadedTaskQueueForTesting::TaskId;
+
+// Test should not rely on the object under test not being faulty. If the task
+// queue ever blocks forever, we want the tests to fail, rather than hang.
+constexpr int kMaxWaitTimeMs = 10000;
+
+TEST(DEPRECATED_SingleThreadedTaskQueueForTestingTest,
+ SanityConstructionDestruction) {
+ DEPRECATED_SingleThreadedTaskQueueForTesting task_queue("task_queue");
+}
+
+TEST(DEPRECATED_SingleThreadedTaskQueueForTestingTest, ExecutesPostedTasks) {
+ DEPRECATED_SingleThreadedTaskQueueForTesting task_queue("task_queue");
+
+ std::atomic<bool> executed(false);
+ rtc::Event done;
+
+ task_queue.PostTask([&executed, &done]() {
+ executed.store(true);
+ done.Set();
+ });
+ ASSERT_TRUE(done.Wait(kMaxWaitTimeMs));
+
+ EXPECT_TRUE(executed.load());
+}
+
+TEST(DEPRECATED_SingleThreadedTaskQueueForTestingTest,
+ PostMultipleTasksFromSameExternalThread) {
+ DEPRECATED_SingleThreadedTaskQueueForTesting task_queue("task_queue");
+
+ constexpr size_t kCount = 3;
+ std::atomic<bool> executed[kCount];
+ for (std::atomic<bool>& exec : executed) {
+ exec.store(false);
+ }
+
+ std::vector<std::unique_ptr<rtc::Event>> done_events;
+ for (size_t i = 0; i < kCount; i++) {
+ done_events.emplace_back(std::make_unique<rtc::Event>());
+ }
+
+ // To avoid the tasks which comprise the actual test from running before they
+ // have all be posted, which could result in only one task ever being in the
+ // queue at any given time, post one waiting task that would block the
+ // task-queue, and unblock only after all tasks have been posted.
+ rtc::Event rendezvous;
+ task_queue.PostTask(
+ [&rendezvous]() { ASSERT_TRUE(rendezvous.Wait(kMaxWaitTimeMs)); });
+
+ // Post the tasks which comprise the test.
+ for (size_t i = 0; i < kCount; i++) {
+ task_queue.PostTask([&executed, &done_events, i]() { // |i| by value.
+ executed[i].store(true);
+ done_events[i]->Set();
+ });
+ }
+
+ rendezvous.Set(); // Release the task-queue.
+
+ // Wait until the task queue has executed all the tasks.
+ for (size_t i = 0; i < kCount; i++) {
+ ASSERT_TRUE(done_events[i]->Wait(kMaxWaitTimeMs));
+ }
+
+ for (size_t i = 0; i < kCount; i++) {
+ EXPECT_TRUE(executed[i].load());
+ }
+}
+
+TEST(DEPRECATED_SingleThreadedTaskQueueForTestingTest,
+ PostToTaskQueueFromOwnThread) {
+ DEPRECATED_SingleThreadedTaskQueueForTesting task_queue("task_queue");
+
+ std::atomic<bool> executed(false);
+ rtc::Event done;
+
+ auto internally_posted_task = [&executed, &done]() {
+ executed.store(true);
+ done.Set();
+ };
+
+ auto externally_posted_task = [&task_queue, &internally_posted_task]() {
+ task_queue.PostTask(internally_posted_task);
+ };
+
+ task_queue.PostTask(externally_posted_task);
+
+ ASSERT_TRUE(done.Wait(kMaxWaitTimeMs));
+ EXPECT_TRUE(executed.load());
+}
+
+TEST(DEPRECATED_SingleThreadedTaskQueueForTestingTest,
+ TasksExecutedInSequence) {
+ DEPRECATED_SingleThreadedTaskQueueForTesting task_queue("task_queue");
+
+ // The first task would perform:
+ // accumulator = 10 * accumulator + i
+ // Where |i| is 1, 2 and 3 for the 1st, 2nd and 3rd tasks, respectively.
+ // The result would be 123 if and only iff the tasks were executed in order.
+ size_t accumulator = 0;
+ size_t expected_value = 0; // Updates to the correct value.
+
+ // Prevent the chain from being set in motion before we've had time to
+ // schedule it all, lest the queue only contain one task at a time.
+ rtc::Event rendezvous;
+ task_queue.PostTask(
+ [&rendezvous]() { ASSERT_TRUE(rendezvous.Wait(kMaxWaitTimeMs)); });
+
+ for (size_t i = 0; i < 3; i++) {
+ task_queue.PostTask([&accumulator, i]() { // |i| passed by value.
+ accumulator = 10 * accumulator + i;
+ });
+ expected_value = 10 * expected_value + i;
+ }
+
+ // The test will wait for the task-queue to finish.
+ rtc::Event done;
+ task_queue.PostTask([&done]() { done.Set(); });
+
+ rendezvous.Set(); // Set the chain in motion.
+
+ ASSERT_TRUE(done.Wait(kMaxWaitTimeMs));
+
+ EXPECT_EQ(accumulator, expected_value);
+}
+
+TEST(DEPRECATED_SingleThreadedTaskQueueForTestingTest,
+ ExecutesPostedDelayedTask) {
+ DEPRECATED_SingleThreadedTaskQueueForTesting task_queue("task_queue");
+
+ std::atomic<bool> executed(false);
+ rtc::Event done;
+
+ constexpr int64_t delay_ms = 20;
+ static_assert(delay_ms < kMaxWaitTimeMs / 2, "Delay too long for tests.");
+
+ task_queue.PostDelayedTask(
+ [&executed, &done]() {
+ executed.store(true);
+ done.Set();
+ },
+ delay_ms);
+ ASSERT_TRUE(done.Wait(kMaxWaitTimeMs));
+
+ EXPECT_TRUE(executed.load());
+}
+
+TEST(DEPRECATED_SingleThreadedTaskQueueForTestingTest,
+ DoesNotExecuteDelayedTaskTooSoon) {
+ DEPRECATED_SingleThreadedTaskQueueForTesting task_queue("task_queue");
+
+ std::atomic<bool> executed(false);
+
+ constexpr int64_t delay_ms = 2000;
+ static_assert(delay_ms < kMaxWaitTimeMs / 2, "Delay too long for tests.");
+
+ task_queue.PostDelayedTask([&executed]() { executed.store(true); }, delay_ms);
+
+ // Wait less than is enough, make sure the task was not yet executed.
+ rtc::Event not_done;
+ ASSERT_FALSE(not_done.Wait(delay_ms / 2));
+ EXPECT_FALSE(executed.load());
+}
+
+TEST(DEPRECATED_SingleThreadedTaskQueueForTestingTest,
+ TaskWithLesserDelayPostedAfterFirstDelayedTaskExectuedBeforeFirst) {
+ DEPRECATED_SingleThreadedTaskQueueForTesting task_queue("task_queue");
+
+ std::atomic<bool> earlier_executed(false);
+ constexpr int64_t earlier_delay_ms = 500;
+
+ std::atomic<bool> later_executed(false);
+ constexpr int64_t later_delay_ms = 1000;
+
+ static_assert(earlier_delay_ms + later_delay_ms < kMaxWaitTimeMs / 2,
+ "Delay too long for tests.");
+
+ rtc::Event done;
+
+ auto earlier_task = [&earlier_executed, &later_executed]() {
+ EXPECT_FALSE(later_executed.load());
+ earlier_executed.store(true);
+ };
+
+ auto later_task = [&earlier_executed, &later_executed, &done]() {
+ EXPECT_TRUE(earlier_executed.load());
+ later_executed.store(true);
+ done.Set();
+ };
+
+ task_queue.PostDelayedTask(later_task, later_delay_ms);
+ task_queue.PostDelayedTask(earlier_task, earlier_delay_ms);
+
+ ASSERT_TRUE(done.Wait(kMaxWaitTimeMs));
+ ASSERT_TRUE(earlier_executed);
+ ASSERT_TRUE(later_executed);
+}
+
+TEST(DEPRECATED_SingleThreadedTaskQueueForTestingTest,
+ TaskWithGreaterDelayPostedAfterFirstDelayedTaskExectuedAfterFirst) {
+ DEPRECATED_SingleThreadedTaskQueueForTesting task_queue("task_queue");
+
+ std::atomic<bool> earlier_executed(false);
+ constexpr int64_t earlier_delay_ms = 500;
+
+ std::atomic<bool> later_executed(false);
+ constexpr int64_t later_delay_ms = 1000;
+
+ static_assert(earlier_delay_ms + later_delay_ms < kMaxWaitTimeMs / 2,
+ "Delay too long for tests.");
+
+ rtc::Event done;
+
+ auto earlier_task = [&earlier_executed, &later_executed]() {
+ EXPECT_FALSE(later_executed.load());
+ earlier_executed.store(true);
+ };
+
+ auto later_task = [&earlier_executed, &later_executed, &done]() {
+ EXPECT_TRUE(earlier_executed.load());
+ later_executed.store(true);
+ done.Set();
+ };
+
+ task_queue.PostDelayedTask(earlier_task, earlier_delay_ms);
+ task_queue.PostDelayedTask(later_task, later_delay_ms);
+
+ ASSERT_TRUE(done.Wait(kMaxWaitTimeMs));
+ ASSERT_TRUE(earlier_executed);
+ ASSERT_TRUE(later_executed);
+}
+
+TEST(DEPRECATED_SingleThreadedTaskQueueForTestingTest,
+ ExternalThreadCancelsTask) {
+ DEPRECATED_SingleThreadedTaskQueueForTesting task_queue("task_queue");
+
+ rtc::Event done;
+
+ // Prevent the to-be-cancelled task from being executed before we've had
+ // time to cancel it.
+ rtc::Event rendezvous;
+ task_queue.PostTask(
+ [&rendezvous]() { ASSERT_TRUE(rendezvous.Wait(kMaxWaitTimeMs)); });
+
+ TaskId cancelled_task_id = task_queue.PostTask([]() { EXPECT_TRUE(false); });
+ task_queue.PostTask([&done]() { done.Set(); });
+
+ task_queue.CancelTask(cancelled_task_id);
+
+ // Set the tasks in motion; the cancelled task does not run (otherwise the
+ // test would fail). The last task ends the test, showing that the queue
+ // progressed beyond the cancelled task.
+ rendezvous.Set();
+ ASSERT_TRUE(done.Wait(kMaxWaitTimeMs));
+}
+
+// In this test, we'll set off a chain where the first task cancels the second
+// task, then a third task runs (showing that we really cancelled the task,
+// rather than just halted the task-queue).
+TEST(DEPRECATED_SingleThreadedTaskQueueForTestingTest,
+ InternalThreadCancelsTask) {
+ DEPRECATED_SingleThreadedTaskQueueForTesting task_queue("task_queue");
+
+ rtc::Event done;
+
+ // Prevent the chain from being set-off before we've set everything up.
+ rtc::Event rendezvous;
+ task_queue.PostTask(
+ [&rendezvous]() { ASSERT_TRUE(rendezvous.Wait(kMaxWaitTimeMs)); });
+
+ // This is the canceller-task. It takes cancelled_task_id by reference,
+ // because the ID will only become known after the cancelled task is
+ // scheduled.
+ TaskId cancelled_task_id;
+ auto canceller_task = [&task_queue, &cancelled_task_id]() {
+ task_queue.CancelTask(cancelled_task_id);
+ };
+ task_queue.PostTask(canceller_task);
+
+ // This task will be cancelled by the task before it.
+ auto cancelled_task = []() { EXPECT_TRUE(false); };
+ cancelled_task_id = task_queue.PostTask(cancelled_task);
+
+ // When this task runs, it will allow the test to be finished.
+ auto completion_marker_task = [&done]() { done.Set(); };
+ task_queue.PostTask(completion_marker_task);
+
+ rendezvous.Set(); // Set the chain in motion.
+
+ ASSERT_TRUE(done.Wait(kMaxWaitTimeMs));
+}
+
+TEST(DEPRECATED_SingleThreadedTaskQueueForTestingTest, SendTask) {
+ DEPRECATED_SingleThreadedTaskQueueForTesting task_queue("task_queue");
+
+ std::atomic<bool> executed(false);
+
+ SendTask(RTC_FROM_HERE, &task_queue, [&executed]() {
+ // Intentionally delay, so that if SendTask didn't block, the sender thread
+ // would have time to read |executed|.
+ rtc::Event delay;
+ ASSERT_FALSE(delay.Wait(1000));
+ executed.store(true);
+ });
+
+ EXPECT_TRUE(executed);
+}
+
+TEST(DEPRECATED_SingleThreadedTaskQueueForTestingTest,
+ DestructTaskQueueWhileTasksPending) {
+ auto task_queue =
+ std::make_unique<DEPRECATED_SingleThreadedTaskQueueForTesting>(
+ "task_queue");
+
+ std::atomic<size_t> counter(0);
+
+ constexpr size_t tasks = 10;
+ for (size_t i = 0; i < tasks; i++) {
+ task_queue->PostTask([&counter]() {
+ std::atomic_fetch_add(&counter, static_cast<size_t>(1));
+ rtc::Event delay;
+ ASSERT_FALSE(delay.Wait(500));
+ });
+ }
+
+ task_queue.reset();
+
+ EXPECT_LT(counter, tasks);
+}
+
+class SingleThreadedTaskQueueForTestingFactory : public TaskQueueFactory {
+ public:
+ std::unique_ptr<TaskQueueBase, TaskQueueDeleter> CreateTaskQueue(
+ absl::string_view /* name */,
+ Priority /*priority*/) const override {
+ return std::unique_ptr<TaskQueueBase, TaskQueueDeleter>(
+ new DEPRECATED_SingleThreadedTaskQueueForTesting("noname"));
+ }
+};
+
+INSTANTIATE_TEST_SUITE_P(
+ DeprecatedSingleThreadedTaskQueueForTesting,
+ TaskQueueTest,
+ ::testing::Values(
+ std::make_unique<SingleThreadedTaskQueueForTestingFactory>));
+
+} // namespace
+} // namespace test
+} // namespace webrtc