Migrate stdlib task queue to TaskQueueBase interface

Bug: webrtc:10191
Change-Id: I16e13b69dce7cafa545977e9ac253b6e57312690
Reviewed-on: https://webrtc-review.googlesource.com/c/123760
Reviewed-by: Karl Wiberg <kwiberg@webrtc.org>
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#26796}
diff --git a/api/task_queue/BUILD.gn b/api/task_queue/BUILD.gn
index 01fdab5..578a212 100644
--- a/api/task_queue/BUILD.gn
+++ b/api/task_queue/BUILD.gn
@@ -106,9 +106,9 @@
     deps += [ "../../rtc_base:rtc_task_queue_win" ]
   } else {
     sources = [
-      "default_task_queue_factory_unimplemented.cc",
+      "default_task_queue_factory_stdlib.cc",
     ]
-    deps += [ "../../rtc_base:checks" ]
+    deps += [ "../../rtc_base:rtc_task_queue_stdlib" ]
   }
 }
 
diff --git a/api/task_queue/default_task_queue_factory_unimplemented.cc b/api/task_queue/default_task_queue_factory_stdlib.cc
similarity index 74%
rename from api/task_queue/default_task_queue_factory_unimplemented.cc
rename to api/task_queue/default_task_queue_factory_stdlib.cc
index d4020f8..ca7d720 100644
--- a/api/task_queue/default_task_queue_factory_unimplemented.cc
+++ b/api/task_queue/default_task_queue_factory_stdlib.cc
@@ -10,14 +10,12 @@
 #include <memory>
 
 #include "api/task_queue/task_queue_factory.h"
-#include "rtc_base/checks.h"
+#include "rtc_base/task_queue_stdlib.h"
 
 namespace webrtc {
 
 std::unique_ptr<TaskQueueFactory> CreateDefaultTaskQueueFactory() {
-  RTC_CHECK(false)
-      << "Default task queue is not implemented for current platform, "
-         "overwrite the task queue implementation by setting global factory.";
+  return CreateTaskQueueStdlibFactory();
 }
 
 }  // namespace webrtc
diff --git a/rtc_base/BUILD.gn b/rtc_base/BUILD.gn
index 13c9957..63b4f3f 100644
--- a/rtc_base/BUILD.gn
+++ b/rtc_base/BUILD.gn
@@ -576,9 +576,10 @@
 }
 
 rtc_source_set("rtc_task_queue_stdlib") {
-  visibility = [ ":rtc_task_queue_impl" ]
+  visibility = [ "../api/task_queue:default_task_queue_factory_impl" ]
   sources = [
     "task_queue_stdlib.cc",
+    "task_queue_stdlib.h",
   ]
   deps = [
     ":checks",
@@ -586,30 +587,23 @@
     ":logging",
     ":macromagic",
     ":platform_thread",
-    ":refcount",
     ":rtc_event",
     ":rtc_task_queue_api",
     ":safe_conversions",
     ":timeutils",
-    "../api:scoped_refptr",
+    "../api/task_queue",
+    "../api/task_queue:task_queue_factory",
+    "//third_party/abseil-cpp/absl/memory",
+    "//third_party/abseil-cpp/absl/strings",
   ]
 }
 
 rtc_source_set("rtc_task_queue_impl") {
   visibility = [ "*" ]
-  if (rtc_enable_libevent || is_mac || is_ios ||
-      (is_win && current_os != "winuwp")) {
-    deps = [
-      "../api/task_queue:default_task_queue_factory_impl",
-      "../api/task_queue:global_task_queue_factory",
-    ]
-  } else {
-    if (is_win && current_os == "winuwp") {
-      deps = [
-        ":rtc_task_queue_stdlib",
-      ]
-    }
-  }
+  deps = [
+    "../api/task_queue:default_task_queue_factory_impl",
+    "../api/task_queue:global_task_queue_factory",
+  ]
 }
 
 rtc_source_set("sequenced_task_checker") {
diff --git a/rtc_base/task_queue_stdlib.cc b/rtc_base/task_queue_stdlib.cc
index 2b9d5a2..88128b5 100644
--- a/rtc_base/task_queue_stdlib.cc
+++ b/rtc_base/task_queue_stdlib.cc
@@ -8,83 +8,55 @@
  *  be found in the AUTHORS file in the root of the source tree.
  */
 
-#include "rtc_base/task_queue.h"
+#include "rtc_base/task_queue_stdlib.h"
 
 #include <string.h>
 #include <algorithm>
-#include <atomic>
-#include <condition_variable>
 #include <map>
 #include <queue>
 #include <utility>
 
+#include "absl/memory/memory.h"
+#include "absl/strings/string_view.h"
+#include "api/task_queue/queued_task.h"
+#include "api/task_queue/task_queue_base.h"
 #include "rtc_base/checks.h"
 #include "rtc_base/critical_section.h"
 #include "rtc_base/event.h"
 #include "rtc_base/logging.h"
 #include "rtc_base/platform_thread.h"
-#include "rtc_base/ref_count.h"
-#include "rtc_base/ref_counted_object.h"
 #include "rtc_base/thread_annotations.h"
 #include "rtc_base/time_utils.h"
 
-namespace rtc {
+namespace webrtc {
 namespace {
 
-using Priority = TaskQueue::Priority;
-
-ThreadPriority TaskQueuePriorityToThreadPriority(Priority priority) {
+rtc::ThreadPriority TaskQueuePriorityToThreadPriority(
+    TaskQueueFactory::Priority priority) {
   switch (priority) {
-    case Priority::HIGH:
-      return kRealtimePriority;
-    case Priority::LOW:
-      return kLowPriority;
-    case Priority::NORMAL:
-      return kNormalPriority;
+    case TaskQueueFactory::Priority::HIGH:
+      return rtc::kRealtimePriority;
+    case TaskQueueFactory::Priority::LOW:
+      return rtc::kLowPriority;
+    case TaskQueueFactory::Priority::NORMAL:
+      return rtc::kNormalPriority;
     default:
       RTC_NOTREACHED();
-      return kNormalPriority;
+      return rtc::kNormalPriority;
   }
-  return kNormalPriority;
 }
 
-}  // namespace
-
-class TaskQueue::Impl : public RefCountInterface {
+class TaskQueueStdlib final : public TaskQueueBase {
  public:
-  Impl(const char* queue_name, TaskQueue* queue, Priority priority);
-  ~Impl() override;
+  TaskQueueStdlib(absl::string_view queue_name, rtc::ThreadPriority priority);
+  ~TaskQueueStdlib() override = default;
 
-  static TaskQueue::Impl* Current();
-  static TaskQueue* CurrentQueue();
+  void Delete() override;
+  void PostTask(std::unique_ptr<QueuedTask> task) override;
+  void PostDelayedTask(std::unique_ptr<QueuedTask> task,
+                       uint32_t milliseconds) override;
 
-  // Used for DCHECKing the current queue.
-  bool IsCurrent() const;
-
-  template <class Closure,
-            typename std::enable_if<!std::is_convertible<
-                Closure,
-                std::unique_ptr<QueuedTask>>::value>::type* = nullptr>
-  void PostTask(Closure&& closure) {
-    PostTask(NewClosure(std::forward<Closure>(closure)));
-  }
-
-  void PostTask(std::unique_ptr<QueuedTask> task);
-  void PostTaskAndReply(std::unique_ptr<QueuedTask> task,
-                        std::unique_ptr<QueuedTask> reply,
-                        TaskQueue::Impl* reply_queue);
-
-  void PostDelayedTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds);
-
-  class WorkerThread : public PlatformThread {
-   public:
-    WorkerThread(ThreadRunFunction func,
-                 void* obj,
-                 const char* thread_name,
-                 ThreadPriority priority)
-        : PlatformThread(func, obj, thread_name, priority) {}
-  };
-
+ private:
   using OrderId = uint64_t;
 
   struct DelayedEntryTimeout {
@@ -103,38 +75,26 @@
     int64_t sleep_time_ms_{};
   };
 
- protected:
   NextTask GetNextTask();
 
- private:
-  // The ThreadQueue::Current() method requires that the current thread
-  // returns the task queue if the current thread is the active task
-  // queue and this variable holds the value needed in thread_local to
-  // on the initialized worker thread holding the queue.
-  static thread_local TaskQueue::Impl* thread_context_;
-
   static void ThreadMain(void* context);
 
   void ProcessTasks();
 
   void NotifyWake();
 
-  // The back pointer from the owner task queue object
-  // from this implementation detail.
-  TaskQueue* const queue_;
-
   // Indicates if the thread has started.
-  Event started_;
+  rtc::Event started_;
 
   // Indicates if the thread has stopped.
-  Event stopped_;
+  rtc::Event stopped_;
 
   // Signaled whenever a new task is pending.
-  Event flag_notify_;
+  rtc::Event flag_notify_;
 
   // Contains the active worker thread assigned to processing
   // tasks (including delayed tasks).
-  WorkerThread thread_;
+  rtc::PlatformThread thread_;
 
   rtc::CriticalSection pending_lock_;
 
@@ -160,57 +120,34 @@
       RTC_GUARDED_BY(pending_lock_);
 };
 
-// static
-thread_local TaskQueue::Impl* TaskQueue::Impl::thread_context_ = nullptr;
-
-TaskQueue::Impl::Impl(const char* queue_name,
-                      TaskQueue* queue,
-                      Priority priority)
-    : queue_(queue),
-      started_(/*manual_reset=*/false, /*initially_signaled=*/false),
+TaskQueueStdlib::TaskQueueStdlib(absl::string_view queue_name,
+                                 rtc::ThreadPriority priority)
+    : started_(/*manual_reset=*/false, /*initially_signaled=*/false),
       stopped_(/*manual_reset=*/false, /*initially_signaled=*/false),
       flag_notify_(/*manual_reset=*/false, /*initially_signaled=*/false),
-      thread_(&TaskQueue::Impl::ThreadMain,
-              this,
-              queue_name,
-              TaskQueuePriorityToThreadPriority(priority)) {
-  RTC_DCHECK(queue_name);
+      thread_(&TaskQueueStdlib::ThreadMain, this, queue_name, priority) {
   thread_.Start();
-  started_.Wait(Event::kForever);
+  started_.Wait(rtc::Event::kForever);
 }
 
-TaskQueue::Impl::~Impl() {
+void TaskQueueStdlib::Delete() {
   RTC_DCHECK(!IsCurrent());
 
   {
-    CritScope lock(&pending_lock_);
+    rtc::CritScope lock(&pending_lock_);
     thread_should_quit_ = true;
   }
 
   NotifyWake();
 
-  stopped_.Wait(Event::kForever);
+  stopped_.Wait(rtc::Event::kForever);
   thread_.Stop();
+  delete this;
 }
 
-// static
-TaskQueue::Impl* TaskQueue::Impl::Current() {
-  return thread_context_;
-}
-
-// static
-TaskQueue* TaskQueue::Impl::CurrentQueue() {
-  TaskQueue::Impl* current = Current();
-  return current ? current->queue_ : nullptr;
-}
-
-bool TaskQueue::Impl::IsCurrent() const {
-  return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef());
-}
-
-void TaskQueue::Impl::PostTask(std::unique_ptr<QueuedTask> task) {
+void TaskQueueStdlib::PostTask(std::unique_ptr<QueuedTask> task) {
   {
-    CritScope lock(&pending_lock_);
+    rtc::CritScope lock(&pending_lock_);
     OrderId order = thread_posting_order_++;
 
     pending_queue_.push(std::pair<OrderId, std::unique_ptr<QueuedTask>>(
@@ -220,7 +157,7 @@
   NotifyWake();
 }
 
-void TaskQueue::Impl::PostDelayedTask(std::unique_ptr<QueuedTask> task,
+void TaskQueueStdlib::PostDelayedTask(std::unique_ptr<QueuedTask> task,
                                       uint32_t milliseconds) {
   auto fire_at = rtc::TimeMillis() + milliseconds;
 
@@ -228,7 +165,7 @@
   delay.next_fire_at_ms_ = fire_at;
 
   {
-    CritScope lock(&pending_lock_);
+    rtc::CritScope lock(&pending_lock_);
     delay.order_ = ++thread_posting_order_;
     delayed_queue_[delay] = std::move(task);
   }
@@ -236,25 +173,12 @@
   NotifyWake();
 }
 
-void TaskQueue::Impl::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
-                                       std::unique_ptr<QueuedTask> reply,
-                                       TaskQueue::Impl* reply_queue) {
-  QueuedTask* task_ptr = task.release();
-  QueuedTask* reply_task_ptr = reply.release();
-  PostTask([task_ptr, reply_task_ptr, reply_queue]() {
-    if (task_ptr->Run())
-      delete task_ptr;
-
-    reply_queue->PostTask(std::unique_ptr<QueuedTask>(reply_task_ptr));
-  });
-}
-
-TaskQueue::Impl::NextTask TaskQueue::Impl::GetNextTask() {
+TaskQueueStdlib::NextTask TaskQueueStdlib::GetNextTask() {
   NextTask result{};
 
   auto tick = rtc::TimeMillis();
 
-  CritScope lock(&pending_lock_);
+  rtc::CritScope lock(&pending_lock_);
 
   if (thread_should_quit_) {
     result.final_task_ = true;
@@ -295,13 +219,13 @@
 }
 
 // static
-void TaskQueue::Impl::ThreadMain(void* context) {
-  TaskQueue::Impl* me = static_cast<TaskQueue::Impl*>(context);
+void TaskQueueStdlib::ThreadMain(void* context) {
+  TaskQueueStdlib* me = static_cast<TaskQueueStdlib*>(context);
+  CurrentTaskQueueSetter set_current(me);
   me->ProcessTasks();
 }
 
-void TaskQueue::Impl::ProcessTasks() {
-  thread_context_ = this;
+void TaskQueueStdlib::ProcessTasks() {
   started_.Set();
 
   while (true) {
@@ -321,7 +245,7 @@
     }
 
     if (0 == task.sleep_time_ms_)
-      flag_notify_.Wait(Event::kForever);
+      flag_notify_.Wait(rtc::Event::kForever);
     else
       flag_notify_.Wait(task.sleep_time_ms_);
   }
@@ -329,7 +253,7 @@
   stopped_.Set();
 }
 
-void TaskQueue::Impl::NotifyWake() {
+void TaskQueueStdlib::NotifyWake() {
   // The queue holds pending tasks to complete. Either tasks are to be
   // executed immediately or tasks are to be run at some future delayed time.
   // For immediate tasks the task queue's thread is busy running the task and
@@ -357,43 +281,20 @@
   flag_notify_.Set();
 }
 
-// Boilerplate for the PIMPL pattern.
-TaskQueue::TaskQueue(const char* queue_name, Priority priority)
-    : impl_(new RefCountedObject<TaskQueue::Impl>(queue_name, this, priority)) {
+class TaskQueueStdlibFactory final : public TaskQueueFactory {
+ public:
+  std::unique_ptr<TaskQueueBase, TaskQueueDeleter> CreateTaskQueue(
+      absl::string_view name,
+      Priority priority) const override {
+    return std::unique_ptr<TaskQueueBase, TaskQueueDeleter>(
+        new TaskQueueStdlib(name, TaskQueuePriorityToThreadPriority(priority)));
+  }
+};
+
+}  // namespace
+
+std::unique_ptr<TaskQueueFactory> CreateTaskQueueStdlibFactory() {
+  return absl::make_unique<TaskQueueStdlibFactory>();
 }
 
-TaskQueue::~TaskQueue() {}
-
-// static
-TaskQueue* TaskQueue::Current() {
-  return TaskQueue::Impl::CurrentQueue();
-}
-
-// Used for DCHECKing the current queue.
-bool TaskQueue::IsCurrent() const {
-  return impl_->IsCurrent();
-}
-
-void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) {
-  return TaskQueue::impl_->PostTask(std::move(task));
-}
-
-void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
-                                 std::unique_ptr<QueuedTask> reply,
-                                 TaskQueue* reply_queue) {
-  return TaskQueue::impl_->PostTaskAndReply(std::move(task), std::move(reply),
-                                            reply_queue->impl_.get());
-}
-
-void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
-                                 std::unique_ptr<QueuedTask> reply) {
-  return TaskQueue::impl_->PostTaskAndReply(std::move(task), std::move(reply),
-                                            impl_.get());
-}
-
-void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task,
-                                uint32_t milliseconds) {
-  return TaskQueue::impl_->PostDelayedTask(std::move(task), milliseconds);
-}
-
-}  // namespace rtc
+}  // namespace webrtc
diff --git a/api/task_queue/default_task_queue_factory_unimplemented.cc b/rtc_base/task_queue_stdlib.h
similarity index 65%
copy from api/task_queue/default_task_queue_factory_unimplemented.cc
copy to rtc_base/task_queue_stdlib.h
index d4020f8..fb03dff 100644
--- a/api/task_queue/default_task_queue_factory_unimplemented.cc
+++ b/rtc_base/task_queue_stdlib.h
@@ -7,17 +7,18 @@
  *  in the file PATENTS.  All contributing project authors may
  *  be found in the AUTHORS file in the root of the source tree.
  */
+
+#ifndef RTC_BASE_TASK_QUEUE_STDLIB_H_
+#define RTC_BASE_TASK_QUEUE_STDLIB_H_
+
 #include <memory>
 
 #include "api/task_queue/task_queue_factory.h"
-#include "rtc_base/checks.h"
 
 namespace webrtc {
 
-std::unique_ptr<TaskQueueFactory> CreateDefaultTaskQueueFactory() {
-  RTC_CHECK(false)
-      << "Default task queue is not implemented for current platform, "
-         "overwrite the task queue implementation by setting global factory.";
-}
+std::unique_ptr<TaskQueueFactory> CreateTaskQueueStdlibFactory();
 
 }  // namespace webrtc
+
+#endif  // RTC_BASE_TASK_QUEUE_STDLIB_H_