Implement TaskQueueBase interface by SingleThreadedTaskQueueForTesting

that allows to use SingleThreadedTaskQueueForTesting as regular TaskQueue.
which allows components that currently depend on SingleThreadedTaskQueueForTesting
to depend on TaskQueueBase interface instead.
Those updates can be done one-by-one and in the end would allow to stop
using SingleThreadedTaskQueueForTesting in favor of other TaskQueue implementations.

Bug: webrtc:10933
Change-Id: I3e642c88c968012588b9d9c09918340f37bbedbd
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/154352
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Reviewed-by: Elad Alon <eladalon@webrtc.org>
Reviewed-by: Yves Gerey <yvesg@google.com>
Reviewed-by: Sebastian Jansson <srte@webrtc.org>
Reviewed-by: Niels Moller <nisse@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#29307}
diff --git a/test/BUILD.gn b/test/BUILD.gn
index 7eae9b9..8c1d25f 100644
--- a/test/BUILD.gn
+++ b/test/BUILD.gn
@@ -381,6 +381,7 @@
       "../api:create_simulcast_test_fixture_api",
       "../api:scoped_refptr",
       "../api:simulcast_test_fixture_api",
+      "../api/task_queue:task_queue_test",
       "../api/test/video:function_video_factory",
       "../api/video:builtin_video_bitrate_allocator_factory",
       "../api/video:video_frame",
@@ -622,9 +623,11 @@
     "single_threaded_task_queue.h",
   ]
   deps = [
+    "../api/task_queue",
     "../rtc_base:checks",
     "../rtc_base:deprecation",
     "../rtc_base:rtc_base_approved",
+    "../rtc_base/task_utils:to_queued_task",
   ]
 }
 
diff --git a/test/single_threaded_task_queue.cc b/test/single_threaded_task_queue.cc
index 24b9038..9fbb24a 100644
--- a/test/single_threaded_task_queue.cc
+++ b/test/single_threaded_task_queue.cc
@@ -20,15 +20,12 @@
 namespace webrtc {
 namespace test {
 
-DEPRECATED_SingleThreadedTaskQueueForTesting::QueuedTask::QueuedTask(
+DEPRECATED_SingleThreadedTaskQueueForTesting::StoredTask::StoredTask(
     DEPRECATED_SingleThreadedTaskQueueForTesting::TaskId task_id,
-    int64_t earliest_execution_time,
-    DEPRECATED_SingleThreadedTaskQueueForTesting::Task task)
-    : task_id(task_id),
-      earliest_execution_time(earliest_execution_time),
-      task(task) {}
+    std::unique_ptr<QueuedTask> task)
+    : task_id(task_id), task(std::move(task)) {}
 
-DEPRECATED_SingleThreadedTaskQueueForTesting::QueuedTask::~QueuedTask() =
+DEPRECATED_SingleThreadedTaskQueueForTesting::StoredTask::~StoredTask() =
     default;
 
 DEPRECATED_SingleThreadedTaskQueueForTesting::
@@ -43,13 +40,8 @@
 }
 
 DEPRECATED_SingleThreadedTaskQueueForTesting::TaskId
-DEPRECATED_SingleThreadedTaskQueueForTesting::PostTask(Task task) {
-  return PostDelayedTask(task, 0);
-}
-
-DEPRECATED_SingleThreadedTaskQueueForTesting::TaskId
-DEPRECATED_SingleThreadedTaskQueueForTesting::PostDelayedTask(
-    Task task,
+DEPRECATED_SingleThreadedTaskQueueForTesting::PostDelayed(
+    std::unique_ptr<QueuedTask> task,
     int64_t delay_ms) {
   int64_t earliest_exec_time = rtc::TimeAfter(delay_ms);
 
@@ -60,13 +52,11 @@
   TaskId id = next_task_id_++;
 
   // Insert after any other tasks with an earlier-or-equal target time.
-  auto it = tasks_.begin();
-  for (; it != tasks_.end(); it++) {
-    if (earliest_exec_time < (*it)->earliest_execution_time) {
-      break;
-    }
-  }
-  tasks_.insert(it, std::make_unique<QueuedTask>(id, earliest_exec_time, task));
+  // Note: multimap has promise "The order of the key-value pairs whose keys
+  // compare equivalent is the order of insertion and does not change."
+  tasks_.emplace(std::piecewise_construct,
+                 std::forward_as_tuple(earliest_exec_time),
+                 std::forward_as_tuple(id, std::move(task)));
 
   // This class is optimized for simplicty, not for performance. This will wake
   // the thread up even if the next task in the queue is only scheduled for
@@ -93,7 +83,7 @@
 bool DEPRECATED_SingleThreadedTaskQueueForTesting::CancelTask(TaskId task_id) {
   rtc::CritScope lock(&cs_);
   for (auto it = tasks_.begin(); it != tasks_.end(); it++) {
-    if ((*it)->task_id == task_id) {
+    if (it->second.task_id == task_id) {
       tasks_.erase(it);
       return true;
     }
@@ -136,6 +126,7 @@
 }
 
 void DEPRECATED_SingleThreadedTaskQueueForTesting::RunLoop() {
+  CurrentTaskQueueSetter set_current(this);
   while (true) {
     std::unique_ptr<QueuedTask> queued_task;
 
@@ -151,11 +142,13 @@
         return;
       }
       if (!tasks_.empty()) {
-        int64_t remaining_delay_ms = rtc::TimeDiff(
-            tasks_.front()->earliest_execution_time, rtc::TimeMillis());
+        auto next_delayed_task = tasks_.begin();
+        int64_t earliest_exec_time = next_delayed_task->first;
+        int64_t remaining_delay_ms =
+            rtc::TimeDiff(earliest_exec_time, rtc::TimeMillis());
         if (remaining_delay_ms <= 0) {
-          queued_task = std::move(tasks_.front());
-          tasks_.pop_front();
+          queued_task = std::move(next_delayed_task->second.task);
+          tasks_.erase(next_delayed_task);
         } else {
           wait_time = rtc::saturated_cast<int>(remaining_delay_ms);
         }
@@ -163,12 +156,19 @@
     }
 
     if (queued_task) {
-      queued_task->task();
+      if (!queued_task->Run()) {
+        queued_task.release();
+      }
     } else {
       wake_up_.Wait(wait_time);
     }
   }
 }
 
+void DEPRECATED_SingleThreadedTaskQueueForTesting::Delete() {
+  Stop();
+  delete this;
+}
+
 }  // namespace test
 }  // namespace webrtc
diff --git a/test/single_threaded_task_queue.h b/test/single_threaded_task_queue.h
index 0012673..52316c6 100644
--- a/test/single_threaded_task_queue.h
+++ b/test/single_threaded_task_queue.h
@@ -11,13 +11,15 @@
 #define TEST_SINGLE_THREADED_TASK_QUEUE_H_
 
 #include <functional>
-#include <list>
+#include <map>
 #include <memory>
 
+#include "api/task_queue/task_queue_base.h"
 #include "rtc_base/critical_section.h"
 #include "rtc_base/deprecation.h"
 #include "rtc_base/event.h"
 #include "rtc_base/platform_thread.h"
+#include "rtc_base/task_utils/to_queued_task.h"
 #include "rtc_base/thread_checker.h"
 
 namespace webrtc {
@@ -33,25 +35,29 @@
 // resemble that of real WebRTC, thereby allowing us to replace some critical
 // sections by thread-checkers.
 // This task is NOT tuned for performance, but rather for simplicity.
-class DEPRECATED_SingleThreadedTaskQueueForTesting {
+class DEPRECATED_SingleThreadedTaskQueueForTesting : public TaskQueueBase {
  public:
   using Task = std::function<void()>;
   using TaskId = size_t;
   constexpr static TaskId kInvalidTaskId = static_cast<TaskId>(-1);
 
   explicit DEPRECATED_SingleThreadedTaskQueueForTesting(const char* name);
-  ~DEPRECATED_SingleThreadedTaskQueueForTesting();
+  ~DEPRECATED_SingleThreadedTaskQueueForTesting() override;
 
   // Sends one task to the task-queue, and returns a handle by which the
   // task can be cancelled.
   // This mimics the behavior of TaskQueue, but only for lambdas, rather than
   // for both lambdas and QueuedTask objects.
-  TaskId PostTask(Task task);
+  TaskId PostTask(Task task) {
+    return PostDelayed(ToQueuedTask(std::move(task)), /*delay_ms=*/0);
+  }
 
   // Same as PostTask(), but ensures that the task will not begin execution
   // less than |delay_ms| milliseconds after being posted; an upper bound
   // is not provided.
-  TaskId PostDelayedTask(Task task, int64_t delay_ms);
+  TaskId PostDelayedTask(Task task, int64_t delay_ms) {
+    return PostDelayed(ToQueuedTask(std::move(task)), delay_ms);
+  }
 
   // Send one task to the queue. The function does not return until the task
   // has finished executing. No support for canceling the task.
@@ -72,22 +78,36 @@
 
   void Stop();
 
+  // Implements TaskQueueBase.
+  void Delete() override;
+
+  void PostTask(std::unique_ptr<QueuedTask> task) override {
+    PostDelayed(std::move(task), /*delay_ms=*/0);
+  }
+
+  void PostDelayedTask(std::unique_ptr<QueuedTask> task,
+                       uint32_t delay_ms) override {
+    PostDelayed(std::move(task), delay_ms);
+  }
+
  private:
-  struct QueuedTask {
-    QueuedTask(TaskId task_id, int64_t earliest_execution_time, Task task);
-    ~QueuedTask();
+  struct StoredTask {
+    StoredTask(TaskId task_id, std::unique_ptr<QueuedTask> task);
+    ~StoredTask();
 
     TaskId task_id;
-    int64_t earliest_execution_time;
-    Task task;
+    std::unique_ptr<QueuedTask> task;
   };
 
+  TaskId PostDelayed(std::unique_ptr<QueuedTask> task, int64_t delay_ms);
+
   static void Run(void* obj);
 
   void RunLoop();
 
   rtc::CriticalSection cs_;
-  std::list<std::unique_ptr<QueuedTask>> tasks_ RTC_GUARDED_BY(cs_);
+  // Tasks are ordered by earliest execution time.
+  std::multimap<int64_t, StoredTask> tasks_ RTC_GUARDED_BY(cs_);
   rtc::ThreadChecker owner_thread_checker_;
   rtc::PlatformThread thread_;
   bool running_ RTC_GUARDED_BY(cs_);
diff --git a/test/single_threaded_task_queue_unittest.cc b/test/single_threaded_task_queue_unittest.cc
index b945bc0..dedc78b 100644
--- a/test/single_threaded_task_queue_unittest.cc
+++ b/test/single_threaded_task_queue_unittest.cc
@@ -14,6 +14,7 @@
 #include <memory>
 #include <vector>
 
+#include "api/task_queue/task_queue_test.h"
 #include "rtc_base/event.h"
 #include "test/gtest.h"
 
@@ -352,6 +353,22 @@
   EXPECT_LT(counter, tasks);
 }
 
+class SingleThreadedTaskQueueForTestingFactory : public TaskQueueFactory {
+ public:
+  std::unique_ptr<TaskQueueBase, TaskQueueDeleter> CreateTaskQueue(
+      absl::string_view /* name */,
+      Priority /*priority*/) const override {
+    return std::unique_ptr<TaskQueueBase, TaskQueueDeleter>(
+        new DEPRECATED_SingleThreadedTaskQueueForTesting("noname"));
+  }
+};
+
+INSTANTIATE_TEST_SUITE_P(
+    DeprecatedSingleThreadedTaskQueueForTesting,
+    TaskQueueTest,
+    ::testing::Values(
+        std::make_unique<SingleThreadedTaskQueueForTestingFactory>));
+
 }  // namespace
 }  // namespace test
 }  // namespace webrtc